我們已經把相關的連接報文搞定了。筆者想來想去還是決定先講解一下訂閱報文(SUBSCRIBE )。如果傳統的通信方式是客戶端和服務端之間一般就直接傳輸信息。但是MQTT的通信方式是通過發布/訂閱的方式進行的。筆者不知道他是否跟設計模式中的發布訂閱模式有沒有關系。可是他們思想卻有一點相似之處。
客戶端知道服務上有很多個主題。就好比如說有很多消息的分類一樣子。有社會新聞、體育講壇等。那么客戶端只要找到自己感興趣的進行訂閱就可以了。一個客戶端可以向服務器訂閱多個主題。而所謂的發布就是客戶端對不同的主題進行發布信息。即好比如新聞的發布者一樣子。這個時候只要訂閱這個主題的客戶端就可以接收到來自服務端的新聞。我們的手機常常會接收到一些推送的信息。事實上有很多App應用都是用MQTT協議來進行的。所以不難看出服務端主要是負責客戶端和客戶端的之間信息的傳輸和信息管理。大至如圖下
注意:發布者也是客戶端。訂閱者也是客戶端
主題(Topic )
如果主題只是一個字符串值的話,那么顯然會比較單調。這樣子功能也顯得比較無力。所以在主題上面就了所謂的分隔符和通配符的說法(個人想法)。分隔符的意思就是讓主題可以分層次。就好如說主題“體育講壇/籃球/NBA”。看到這樣子的主題,請問一下你還有什么不明白的話。是不是感覺很有層次感。剩下只有一個問題?如果我們訂閱了主題“體育講壇/籃球/NBA”,並向主題“體育講壇/籃球”發布一個信息。那么已經訂閱主題“體育講壇/籃球/NBA”的客戶端們是不是可以接受到信息呢?反過來講如果我們訂閱了主題“體育講壇/籃球”,向主題“體育講壇/籃球/NBA”發信息,客戶端們是否又能接受信息呢?
筆者就以HiveMQ作服務器來做一下上面的小實驗。如下
客驗結果顯然是失敗的——訂閱主題“體育講壇/籃球/NBA”的客戶端根本收不到來自主題“體育講壇/籃球“的發布信息。說明分隔符就是用於主題名的分層次。沒有別的意思。
通過上面的實驗我們知道如果想要收到NBA就是必須訂閱主題“體育講壇/籃球/NBA”。可是總是有一些人只要是籃球的新聞有喜歡。怎么辦。通配符的功能就出來了。通配符有倆種——"+"和“#”。+為單層的通配符。表示當前這一層的全都合非。這樣子以上面的說到的例子來做實驗。我們訂閱一個主題為“體育講壇/籃球/+”。按照理解的意思就是只要是在“體育講壇/籃球”的信息都是我們想要的。結果如下
我們可以看到筆者在“體育講壇/籃球/NBA”和“體育講壇/籃球/ABC”各發布了信息。結果他都能收到。那么如果我們對主題“體育講壇/籃球”或是主題“體育講壇/籃球/NBA/福州專場”發布信息呢?筆者試過了很可惜都不行。
記得我們上面說到有一些人只要跟籃球相關的都喜歡。可是如果使用通配符“+”是可以接近我們的要求。注意是接近。“+”通配符只是表示當前一層的。從當前的第二層就不行了。而本身的層也不算。就像上面的。只有籃球下的子一層才是合非的。講到這里大家一定會想到用“#“通配符試試。沒有錯。“#“通配符就是表示當前本身和下面子層所有。如下
實驗的結果很終滿足了。
對於主題,在文檔中有一個要求——主題不能以 ”#“ "+" "$" 為開頭。對於”#“ ” +“的話,大家都好理解。那么”$“又是什么鬼。在文檔我們可以看到這樣子的字符"$SYS"。事實上他們是想說”$“開頭的主題一般用於系統內部的一些主題。你們可以去找一些第三方的MQTT服務器。都會有很多以”$“開頭的主題。
SUBSCRIBE報文
通過上面的介紹。筆者想你們一定對MQTT通信方式有了一定的概念。而本章的訂閱報文就是用於告訴服務器我想要什么的主題了。通過前面幾章的了解。我們知道報文的固定報頭是少了的。筆者就以MQTT 3.1.1來介紹吧。如下
SUBSCRIBE報文的INT值是8。所以對應的二進制為1000。后面的DUP QOS RETAIN對應是0010。其中QOS是必須是01。對訂閱者來講,他一定希望自己的訂閱是成功的。所以訂閱報文的QOS是01就相當好理解了。如果不理解QOS是什么的話,請看一下前面幾章。
訂閱報文也有可變報頭,可變報頭只有一個消息ID。消息ID是從客端端開始分配的。筆者為什么樣子認為呢?主要是看到客戶端在發布信息的時候就要求消息ID。所以筆者才會覺得消息ID在客戶端進行分配的。當然也不是什么報文都會消息ID的。但是有消息ID一般QOS大於0。
訂閱報文的有效載荷里面存在了相關的訂閱訂題列表。前面說過可以支持一個客戶端多個訂閱。列表里面每有一主題項只有倆個值。一個表示主題名,一個表示服務質量要求(Requested QoS)。這里的服務質量要求(Requested QoS)和 固定報頭的服務質量的值是一樣子。但是用意卻是不一樣子。這里是指這個訂閱者接收這主題的服務質量最大等級。舉個列子吧。筆者訂閱了一個主題主題“體育講壇/籃球/NBA”,同時他的服務質量要求(Requested QoS)的值為1。這個時候有一個發布者在這個主題上發布一個服務質QOS為2。筆者還是可以收到這個發布者發來的信息。只是信息的服務質量QOS卻變為1了。要明白QOS(1)和QOS(2)的執行行為是不樣子的。這個后面章節會講到。當然如果發布者在這個主題上發布一個服務質QOS為0。這就沒有什么區別了。如下
對於有效載荷筆者這里就不多講解了。也沒有什么可說的。看文檔的圖片就夠了如下。
宏觀上:
微觀上:
列表出我們可以看他訂閱了倆個主題。一個主題”a/b“,一個主題”c/b“。上面列出大概的圖片(宏觀上)和比較細的圖片(微觀上)。如果看不懂也沒有關系。筆者接下來會用代碼來抓一包看看。相信在對照一下就明白列表出畫的是什么。
現在讓我們好好想想當服務器接收到來自客戶端的訂閱報文的時候要做些什么樣子的反應呢?首先我們要明白如果服務端接收到一個訂閱報文,第一步想到一定是查看訂閱報文的格式是不是正確的。相關的主題名是不是為空的。主題名的寫法是不是非法。這些一定離不開。當然對應的一些共有的驗證筆者就不說了。一切沒有問題的情況下,服務器會去看一下當前訂閱者前面有沒有訂閱過相同的主題。如果有就替換當前的。如果沒有就創建一下新的。然后服務器在根據當前主題查找一下符合保留的信息。如果有,就發送給當前的訂閱者。然后發送一個訂閱報文確定(SUBACK )。當然這前后沒有規定。先發送一個訂閱報文確定(SUBACK ),在處理保留的信息也是可以的。
注意:在發送符合保留的信息就要對QOS進行處理。上面筆者也講過了。
SUBACK 報文
當服務端處理SUBSCRIBE報文的時候,都會生成一個SUBACK 報文來回應訂閱者。筆者這里不想對他太過的講解。他的內容也很簡單。如下
對於SUBACK 報文的可變報頭里面也只有一個消息ID。而且跟SUBSCRIBE報文的消息ID是一樣子的。有效載何的內容存放是訂閱主題的服務質量要求(Requested QoS)。筆者在MQTT 3.1 文檔時面可以看到有多個主題的列子。可是在MQTT 3.1.1里面卻沒有。那么筆者就把MQTT 3.1.1的放在下里吧。讀者們可以自行查看。
上面列表里面顯示返回碼,事實上是主題相關的服務質量要求(Requested QoS)。所以就可以知道他可以會返回四個值。如下
QOS 0:0x00
QOS 1:0x01
QOS2 :0x02
Failure :0x80
代碼實現
有了上面的了解之后,筆者就想在通過一些代碼來加深理解。當然重新寫那是不可能的。筆者就用上一章的代碼。並加上訂閱報文相關的處理。如下
1 private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { 2 3 if (!this.connected) { 4 ctx.close(); 5 return; 6 } 7 int messageId = msg.variableHeader().messageId(); 8 9 List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); 10 11 for (MqttTopicSubscription subscription : requestSubscriptions) { 12 13 if (StringUtils.isEmpty(subscription.topicName())) { 14 ctx.close(); 15 return; 16 } 17 } 18 19 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 20 21 requestSubscriptions.forEach(subscription -> { 22 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 23 else grantedQosLevels.add(subscription.qualityOfService().value()); 24 }); 25 26 27 BrokerSessionHelper.sendMessage( 28 ctx, 29 MqttMessageFactory.newMessage( 30 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 31 MqttMessageIdVariableHeader.from(messageId), 32 new MqttSubAckPayload(grantedQosLevels)), 33 this.clientId, 34 messageId, 35 true); 36 37 for (int i = 0; i < requestSubscriptions.size(); i++) { 38 39 MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); 40 String topic = requestSubscriptions.get(i).topicName(); 41 42 //1。查看以前有沒有訂閱過相同的主題,如果有就替換。 43 //2。查看有沒有符合的保留信息,有發送 44 //讀者們自行去實現。是要用redis,還是要用sqllite自去實現。 45 46 } 47 }
訂閱報文的實現並不難。難就在對於對保留信息的處理。還有就是服務端要對當前的客戶端的訂閱進行保留。那么筆者這邊做的事情比較簡單。主要是為了學習查看相關的報文格式。但是筆者還是要列出來一下。如下
1.判斷是否發生過連接。即是連接報文的處理。如果沒有的話,斷開連接。
if (!this.connected) { ctx.close(); return; }
2.獲得報文的消息ID和相關的訂閱主題。判斷主題不為空。當然你也可自定義主題的驗證合法規則。筆者這里就不多說了。
int messageId = msg.variableHeader().messageId(); List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); for (MqttTopicSubscription subscription : requestSubscriptions) { if (StringUtils.isEmpty(subscription.topicName())) { ctx.close(); return; } }
3.獲得相關主題的服務質量要求,用於返回碼和處理保留的消息。並返回SUBACK報文
1 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 2 3 requestSubscriptions.forEach(subscription -> { 4 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 5 else grantedQosLevels.add(subscription.qualityOfService().value()); 6 }); 7 8 9 BrokerSessionHelper.sendMessage( 10 ctx, 11 MqttMessageFactory.newMessage( 12 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 13 MqttMessageIdVariableHeader.from(messageId), 14 new MqttSubAckPayload(grantedQosLevels)), 15 this.clientId, 16 messageId, 17 true);
4.處理保留的信息。這里筆者並沒實現。因為這里要接合相關的數據庫或是NOSQL。所以這里筆者沒有去做。因這里太多的東西的。而且不同的人實現和想法也不一樣子。所以筆者就沒有列出來。
for (int i = 0; i < requestSubscriptions.size(); i++) { MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); String topic = requestSubscriptions.get(i).topicName(); //1。查看以前有沒有訂閱過相同的主題,如果有就替換。 //2。查看有沒有符合的保留信息,有發送 //讀者們自行去實現。是要用redis,還是要用sqllite自去實現。 }
筆者把相關的抓到的報文格列出來。如下
SUBSCRIBE報文:
筆者已經把SUBSCRIBE報文的各個部分用不同的顏色標出耿了。其中的黃色線表示下同主題的長度。就是上面微觀圖片里面的MSB和LSB。其他的也沒有什么。 只是要注意最后一個值也就是服務質量要求(Requested QoS)。筆者這邊是1。所以最后的二進制是00000001。
SUBACK 報文:
我們可以看到SUBACK 報文的消息ID和SUBSCRIBE報文的消息是一樣子的。還有就是記得最后的服務質量要求。