ESA2GJK1DH1K基礎篇: 來吧! 徹底了解一下MQTT


 

 

先來體驗一下利用MQTT通信

  一,打開我寫的C#版本的連接MQTT實現通信的軟件

    

    

 

 

    

 

 

  二,MQTT軟件主要是做數據中轉的,所以需要打開兩個測試

    按照紅線填寫(連接我的MQTT服務器)

 

    

 

 

  三,我現在希望上面的那個調試助手,把信息發給下面的調試助手

 

    3.1 上面的那個調試助手填寫如下

      發布的主題那里填寫 aaaaa 

      然后點擊連接

 

      

 

 

    3.2 下面的調試助手填寫如下

      3.2.1 訂閱那里填寫 aaaaa

      3.2.2 點擊連接

      3.2.3 然后點擊訂閱

 

      

 

 

    3.3 最終的樣子

 

      

 

 

    3.4 現在第一個客戶端發信息

 

      

 

 

    3.5 說明

 

      其實過程是這樣的

      兩個客戶端都連接了一個MQTT服務器(這個只是個軟件,后面章節會告訴大家怎么安裝)

      上面的客戶端發布的主題那一欄填寫的是 aaaaa  然后發送的消息是 123456

      點擊發送的時候實際上該消息就發給了MQTT服務器

      整個消息格式呢大概是這樣的 XXXXaaaaaXXXX123456

      XXXX呢代表其它信息,方便服務器區分出來整個消息中的

      發布的主題(aaaaa)和發布的消息(123456)

      其實 aaaaa 就充當了這個消息的標識

 

      下面的客戶端的訂閱那一項填寫的是 aaaaa

      其實就是在告訴服務器,我需要數據標識是 aaaaa的消息

      既然你告訴了服務器了,那么服務器只要接收到數據標識是 aaaaa

      的消息,那么就會主動把消息發給你

 

 

 

    3.6 同理,讓下面的客戶端把消息發給上面的客戶端

 

      

      這個C#版本的軟件是我自己寫的,上面的顯示信息方式我做成了上面那個樣子

    3.7 稍微總結

      只要是兩個設備之間訂閱和發布的主題對上了

      那么這兩個設備就可以通信了

      對於初學者可能疑惑,你軟件點點點到底內部是怎么做到的

      如果你想知道更多就接着看

 

    

首先你需要知道MQTT絕對的需要會用,它只是一個軟件,其實就是個TCP服務器軟件

一,既然是TCP服務器,這個TCP服務器和咱平時做的有什么不一樣呢.

  首先,平時的時候咱做的TCP服務器都是,一個或者多個客戶端連接咱做的TCP服務器,然后TCP服務器處理客戶端的數據.

  現在呢!需求變了!

  假設我有5個網絡設備,3個手機.我現在想讓網絡設備把數據遠程傳給手機.而且我還需要記錄網絡設備上傳的數據.

  假設通信是這樣的(而且后期還會不停的增加設備和手機)

  

 

 

二,咋辦???

  1. 需要記錄所有設備的數據

  2. 設備和手機之間存在多對一和一對多

  所以,必須需要個公共的服務器進行數據的中轉.

  假設你就把這個服務器做成TCP服務器,有人問,你咋不做成UDP呢?UDP他妹的發送數據不好判斷是不是發送成功.

  難道我還每次都讓服務器給我回數據不成,我還是少找些麻煩!

  還有就是要實現遠程,有個公網IP就可以,可以自己買個服務器,上網絡公司拉一根專網

  或者用自己電腦,用花生殼映射

  還是用雲服務器吧!就是運行在別人的服務器上的一台電腦(就是一台電腦),IP地址直接是公網.方便.

 

三,怎么設計這個TCP服務器???

  1.為了應對這種通信,首先設備發送的數據決不能是單單的數據,必須加點東西

  2.如果把發送的數據帶上標識呢? 假設設備1發送的數據是   (aaaaa數據 )  aaaaa是數據標識,后面是真實數據

  3.然后呢!假設手機1就接收數據標識是aaaaa的數據,怎么讓服務器轉發給它呢???

  4.如果手機1在連接上TCP服務器的時候 告訴TCP服務器我接收數據標識是 aaaaa的數據

  5.通過上面的方式是不是有點眉頭了????

  咱呢姑且把 "告訴TCP服務器我接收數據標識是 aaaaa的數據"  這個事情呢,起個名字     訂閱的主題是 aaaaa

  把 "假設設備1發送的數據是   (aaaaa數據 )"    消息前面的 aaaaa 叫做 發布的主題是aaaaa

 

四,總結上面的就是

  手機1先連接TCP服務器,然后呢,規定個協議,告訴TCP服務器我訂閱的主題是aaaaa

  這樣呢服務器就記住了,當出現消息前面的主題是aaaaa的消息的時候,他就把這個消息發給手機1

 

  當然咱假設,設備1連接上TCP服務器,然后,告訴TCP服務器我訂閱的主題是wwww

  這樣呢服務器就記住了,當出現消息前面的主題是wwww的消息的時候,他就把這個消息發給設備1

 

  然后設備1連接上TCP服務器以后呢,這樣發送信息(假設發送的消息是123456):   aaaaa123456

  服務器一接收到客戶端的消息,就取出來這個消息的標識是什么,取出來的是 aaaaa

  然后呢,看下記錄的誰需要消息標識是aaaaa的消息,然后找到了手機1

  最后把這個消息發送給手機1這個客戶端,然后手機1就接收到了1123456這個消息

 

  同理:手機1發送  wwww998877  然后這個消息就會發給設備1 ,設備1就會收到 998877

 

五,總結

  這個服務器道理上是這樣,服務器記錄各個設備的信息,各個設備訂閱的主題,然后呢,判斷這個消息然后進行轉發

  但是...咱做個簡單的完全可以做出來,但是要想做的完善,而且要支持龐大消息數量的設備(來個百萬級).....不是一朝一夕就可以的.

  其實很長時間以前,人們就有這種需求了.多對一和一對多通信

  所以呢,一些組織和單位就開始解決這種問題,開始做這種軟件,所以MQTT就誕生了.

  之所以叫MQTT是因為是外國人做的這種TCP服務器,外國人呢,為實現這種功能的TCP服務器取了個名字叫

  Message Queuing Telemetry Transport

  然后取每個首字母  就叫 MQTT了

  其實有很多家做MQTT軟件,但是呢,我比較喜歡用emqtt

  

看看怎么連接上他們做的MQTT軟件

  一,首先咱知道就是個TCP服務器,所以呢,需要先用TCP連接上他們的服務器.

  二,咱用Android ,C#,QT,網頁等等連接MQTT服務器的時候有現成的封裝好的庫可以用

    其實說白了就是調用函數而已.....

  三,但是對於單片機而言要想實現MQTT通信,那么就需要借助網絡模塊

    大部分的網絡模塊都可以實現TCP通信

    咱呢,就需要在TCP的基礎上按照MQTT協議封裝下咱的數據

    注:(其實官方給了現成的MQTT的封裝數據和解析數據的程序)

 

  四,咱利用網絡模塊的TCP連接上以后

    然后需要發送第一條消息(注:並不是上來就可以訂閱主題的)

    MQTT軟件規定呢,你發送的第一條信息是連接信息(相當於咱要先登錄)

    他規定了幾個參數!

    ClientID: 各個客戶端必須設定一個ID,各個客戶端必須都不一樣               假設是 123456

    用戶名: 咱安裝MQTT軟件的時候可以設置MQTT軟件的登錄的用戶名     假設是yang

    密碼: 咱安裝MQTT軟件的時候可以設置MQTT軟件的登錄的密碼             假設是 11223344

    下面是我當初研究MQTT的協議寫的,然后把上面三個參數填進去

    注意這節我粘貼的代碼只是為了大家了解協議,該文章封裝的包不完整,請不要使用

 

              https://docs.emqx.io/sdk_tools?category=MQTT_Clients   (官方提供的各個開發的庫)

  

 

 

  單片機用下面這個,我當前MQTT程序的庫就是用的這個

  

 

 

 

/**
* @brief  連接服務器的打包函數
* @param  
* @retval 
* @example 
**/
int ConnectMqtt(char *ClientID,char *Username,char *Password)
{
    int ClientIDLen = strlen(ClientID);
    int UsernameLen    = strlen(Username);
    int PasswordLen = strlen(Password);
    int DataLen = 0;
    int Index = 2;
    int i = 0;
    DataLen = 12 + 2+2+ClientIDLen+UsernameLen+PasswordLen;
    MqttSendData[0] = 0x10;                //MQTT Message Type CONNECT
    MqttSendData[1] = DataLen;    //剩余長度(不包括固定頭部)
    MqttSendData[Index++] = 0;        // Protocol Name Length MSB    
    MqttSendData[Index++] = 4;        // Protocol Name Length LSB    
    MqttSendData[Index++] = 'M';        // ASCII Code for M    
    MqttSendData[Index++] = 'Q';        // ASCII Code for Q    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 4;        // MQTT Protocol version = 4    
    MqttSendData[Index++] = 0xc2;        // conn flags 
    MqttSendData[Index++] = 0;        // Keep-alive Time Length MSB    
    MqttSendData[Index++] = 60;        // Keep-alive Time Length LSB  60S心跳包  
    MqttSendData[Index++] = (0xff00&ClientIDLen)>>8;// Client ID length MSB    
    MqttSendData[Index++] = 0xff&ClientIDLen;    // Client ID length LSB  

    for(i = 0; i < ClientIDLen; i++)
    {
        MqttSendData[Index + i] = ClientID[i];          
    }
    Index = Index + ClientIDLen;
    
    if(UsernameLen > 0)
    {   
        MqttSendData[Index++] = (0xff00&UsernameLen)>>8;//username length MSB    
        MqttSendData[Index++] = 0xff&UsernameLen;    //username length LSB    
        for(i = 0; i < UsernameLen ; i++)
        {
            MqttSendData[Index + i] = Username[i];    
        }
        Index = Index + UsernameLen;
    }
    
    if(PasswordLen > 0)
    {    
        MqttSendData[Index++] = (0xff00&PasswordLen)>>8;//password length MSB    
        MqttSendData[Index++] = 0xff&PasswordLen;    //password length LSB    
        for(i = 0; i < PasswordLen ; i++)
        {
            MqttSendData[Index + i] = Password[i];    
        }
        Index = Index + PasswordLen; 
    }    
    return Index;
}

 

得到以下數據,然后把這個數據發給TCP 服務器,如果沒有錯誤,服務器就會回 20 02 00 00

10 22 00 04 4D 51 54 54 04 C2 00 78 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34

咱可以用TCP調試助手試一試

先連接我的哈,后面章節有說明怎么安裝MQTT服務器

 

IP地址: 47.92.31.46  

端口號: 1883

 

 

先說一件事情  所有的MQTT數據哈  第一個數據是說明整個數據是干什么的數據   第二個是說它后面的數據的總個數

10 : 固定,MQTT規定的連接用0x10

22: 是說0x22后面有0x22個數據  34個

00 04: 后面記錄MQTT版本號的字節個數  

4D 51 54 54: M Q T T  版本號字符         這個是4版本,不同版本不一樣 3版本的是MQIsdp 額,了解就可以

04: 版本號是 0x04 

C2:這個呢想了解具體呢,需要看協議  http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028

 

bit7 bit6:是否有用戶名和密碼

bit5 :遺囑是否需要服務器保留

bit4 bit3:遺囑的消息等級

bit2:是否設置了遺囑

bit1:是否清除以前的連接信息 

bit0:保留,默認0 

 

 

上面就是說有用戶名和密碼,每次連接的時候清除連接信息,沒有設置遺囑(后面會說)

00 78: 心跳包是120S一次(這個自己連接的時候自己設置),

 

MQTT規定必須客戶端必須發心跳包,客戶端發送的心跳包數據是 0xC0 0x00,這是MQTT規定的

如果心跳包間隔了你設定心跳包的1.5倍時間,你沒有發給服務器,服務器就認為你掉線了,然后還有個遺囑問題,,后面會說

你發給服務器 0xC0 0x00  服務器會回你 0xD0  0x00  這個知道就行了

 

00 06:客戶端的ClientId有6位

后面的  31 32 33 34 35 36    就是ClientId ,這是MQTT服務器規定的,每隔客戶端必須有各自的ClientId

 

00 04: MQTT的用戶名  

79 61 6E 67  我安裝MQTT的時候設置的MQTT的用戶名是yang

 

00 08: MQTT的密碼

31 31 32 32 33 33 34 34  我安裝MQTT的時候設置的MQTT密碼

 

好了,連接上TCP服務器 然后發送

10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34

服務器呢就會回你  20 02 00 00

20: 固定

02: 后面有兩個數據

00 00

注意后面的第一個數據 00

如果你設置了 Clean Session為1 :便會回復 01

 

最后一個數據呢,有幾個返回值,0就說明成功,其它就是有各種問題

比如說回的是 20 02 00 04  就說明用戶名或者密碼有問題.

 

 

 

 

現在服務器認咱了,該告訴服務器我訂閱的主題了

假設告訴服務器我訂閱的是2222

假設訂閱的時候訂閱的主題的消息標識是1,消息等級是0

那么打包以后就是 82 09 00 01 00 04 32 32 32 32 00  然后把這個數據發給TCP服務器

 

讓測試用TCP調試助手訂閱,然后用咱的MQTT調試助手發信息給咱的TCP調試助手

注意:現在咱的TCP可能已經斷開了,因為咱的TCP調試助手沒有在規定時間內發送心跳包

咱呢這次測試就利索點

首先准備好

 

 

 

首先連接

10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34

 

 

 

然后訂閱

82 09 00 01 00 04 32 32 32 32 00

 

 

 

然后用MQTT調試助手發消息 

 

 

 

 

 

 

 

 

/**
* @brief  MQTT訂閱/取消訂閱數據打包函數
* @param  SendData 
* @param  topic                主題 
* @param  qos         消息等級 
* @param  whether     訂閱/取消訂閱請求包
* @retval 
* @example 
**/
int MqttSubscribeTopic(char *topic,u8 qos,u8 whether)
{    
    int topiclen = strlen(topic);
    int i=0,index = 0;
  
    if(whether)
        MqttSendData[index++] = 0x82;                        //0x82 //消息類型和標志 SUBSCRIBE 訂閱
    else
        MqttSendData[index++] = 0xA2;                        //0xA2 取消訂閱
    MqttSendData[index++] = topiclen + 5;                //剩余長度(不包括固定頭部)
    MqttSendData[index++] = 0;                          //消息標識符,高位
    MqttSendData[index++] = 0x01;                    //消息標識符,低位
    MqttSendData[index++] = (0xff00&topiclen)>>8;    //主題長度(高位在前,低位在后)
    MqttSendData[index++] = 0xff&topiclen;              //主題長度 
    
    for (i = 0;i < topiclen; i++)
    {
        MqttSendData[index + i] = topic[i];
    }
    index = index + topiclen;
    
    if(whether)
    {
        MqttSendData[index] = qos;//QoS級別
        index++;
    }
    return index;
}

 

 

 

 

0x82: 告訴MQTT服務器,我要訂閱主題

0x09: 后面的數據個數

0x00  0x01  注意哈,訂閱主題的時候可以設置了標識  標識呢  1-65535

之所以有這個家伙:咱訂閱的時候怎么判斷訂閱成功了呢???

訂閱成功以后呢!服務器會返回咱訂閱成功的回復,回復里面就包含着咱寫的這個標識

咱呢可以對比下這個標識,然后呢就知道到底是不是訂閱成功了.

0x00  0x04  后面訂閱主題的長度

32 32 32 32 訂閱的主題是 2222

最后一個 00 是說消息等級,一般呢,訂閱設置為0 就可以

那就說一下這個消息等級有什么用吧!

咱發送數據的時候也會攜帶一個消息等級

假設是0  那么這條消息是不是真的發給MQTT服務器(Broker)了,就不知道了,

如果設備多個,還真不敢保證真的發給服務器了

 

假設是1 那么一個客戶端發送消息以后呢,服務器一看消息等級是1,那么就會回給那個發送消息的客戶端一個應答消息

客戶端可以根據有沒有回復應答確認發沒發送成功

 

假設是2 這個呢就是消息一定要到達MQTT服務器.這個很苛刻,也比較占用內存

 

 

如果按照上面發呢,服務器會回

90 03 00 01 00 

90:固定

03:后面的數據長度

00 01:這條主題的標識

00:消息等級

 

如果訂閱多個主題假設訂閱兩個主題 消息等級第一個是0 第二個是1

90 04 00 01 00 01

90:固定

03:后面的數據長度

00 01:這條主題的標識

00:消息等級

01:消息等級

 

假設訂閱失敗

后面的消息等級就會變為 0x80 (訂閱一個主題)

90 03 00 01 00 

90:固定

03:后面的數據長度

00 01:這條主題的標識

80:消息等級變為0x80

 

后面的消息等級就會變為 0x80 (訂閱兩個主題,第一個訂閱失敗)

90 03 00 01 00 

90:固定

04:后面的數據長度

00 01:這條主題的標識

80:消息等級

00:消息等級

 

 

 

然后看發布

長話短說

發布的時候呢,信息里面都有這些內容

發布的主題,消息,消息等級,是不是需要服務器保留消息,消息的標識

/**
* @brief  MQTT發布數據打包函數
* @param  mqtt_message 
* @param  topic                主題 
* @param  qos         消息等級 
* @retval 
* @example 
**/
int MqttPublishData(char * topic, char * message, u8 qos)
{  
    int topic_length = strlen(topic);    
    int message_length = strlen(message);  
    int i,index=0;    
    static u16 id=0;
    
    MqttSendData[index++] = 0x30;    // MQTT Message Type PUBLISH  30:消息等級是0  32消息等級是1  34消息等級是2

  
    if(qos)
        MqttSendData[index++] = 2 + topic_length + 2 + message_length;//數據長度
    else
        MqttSendData[index++] = 2 + topic_length + message_length;   // Remaining length  

  
    MqttSendData[index++] = (0xff00&topic_length)>>8;//主題長度
    MqttSendData[index++] = 0xff&topic_length;
         
    for(i = 0; i < topic_length; i++)
    {
        MqttSendData[index + i] = topic[i];//拷貝主題
    }
    index += topic_length;
        
    if(qos)
    {
        MqttSendData[index++] = (0xff00&id)>>8;
        MqttSendData[index++] = 0xff&id;
        id++;
    }
  
    for(i = 0; i < message_length; i++)
    {
        MqttSendData[index + i] = message[i];//拷貝數據
    }
    index += message_length;
        
    return index;
}

 

發布的主題: 誰訂閱了這個主題,消息就傳給誰

消息等級:上面說了

是不是需要服務器保留消息:一會和遺囑一塊說

消息的標識:每條消息加個標識,用來區分消息

 

具體的消息就不說了哈,其實咱不必關心這些,因為咱以后使用的時候是直接調用

API 函數,它自己給咱封裝好.咱只需要填寫發布的主題,消息,消息等級等

 

遺囑

還記得上面

 

 

 

我直接說遺囑是啥意思哈!

假設我手機和一個設備訂閱主題和發布主題對應,我就能和這個設備通信了

但是,我怎么知道這個設備掉線了呢?

當然完全可以自己發信息給那個設備,如果不回復,就說明掉線了

但是呢!MQTT服務器提供了一種方式

假設我設置好一個設備的遺囑消息是  offline    遺囑發布的主題是 aaaaa
另一個設備訂閱的主題是 aaaaa
如果設備掉線,服務器就會給訂閱了aaaaa的設備發送  offline

 

還記得上面說的不   服務器如果在你設置的心跳包時間的1.5倍收不到心跳包就認為你掉線了.

 

心跳包

  MQTT規定的,發送完連接協議之后

  發送的心跳包數據是C0 00

  發送時間:連接協議里面的心跳包時間(你可以提前發)

  然后服務器回復 D0 00

  

 

 

 

擴展

  有人會問,如果我想監控所有設備的數據應該怎么做

  就是說,我有個所有設備都可以管理的后台

  假設我是用C#做了一個MQTT的上位機,監控所有的數據

  

  笨法:

    你訂閱的時候把所有設備發布的主題全部訂閱一遍

    假設現在其中一個設備,想獲取其它連個設備的數據

    其它兩個設備發布的主題如下:

    

 

      另一個設備

    訂閱 aaaaa 然后再訂閱 wwww

    

 

 

    

 

     然后就可以了

    

 

 

    另一個客戶端

    

 

 

  MQTT自帶的絕招:

    先說一下哈

    假設一個客戶端發布的主題是 tttt/aaaaa

    還有一個客戶端發布的主題是 tttt/wwww

    如果想讓有一個客戶端接收他倆的數據

    你只需要訂閱  tttt/#

 

    

 

 

    

 

    

 

 

    

 

    

 

補充(關於MQTT消息等級,DUP)

1.1假設客戶端1 發布的主題是 1111 ;消息等級是:0  ;發送的消息是999  最終發送的信息如下:

  30 0b 00 04 31 31 31 31 39 39 39

  消息等級是0是說明該消息發送出去就完事了,服務器不會回復任何應答信息.

  至於該消息發沒發給服務器,不知道!

1.2假設客戶端2 訂閱的主題是:1111  消息等級是 0

  假設客戶端1 確實把消息發給了服務器

  客戶端2 收到消息以后,不需要做任何操作

  

  

 

2.1 假設客戶端1 發布的主題是 1111 ;消息等級是:1  ;發送的消息是999  最終發送的信息如下:

  32 0b 00 04 31 31 31 31 XX XX 39 39 39

  XX XX是在發送的時候需要加上的消息標識符:

  消息標識符XX XX隨意即可:范圍1-65535

  假設消息標識符是 00 01

  

  發送完以上消息以后,服務器會回復: (PUBACK) 告訴客戶端我收到了

  40 02 00 01

  (00 01就是咱上面發送的消息標識符)

  這樣就證明消息確實送達給了服務器

 

 

 

  如果客戶端1 發布完消息以后沒有接收到服務器的應答

  則可以重新發布消息

  32 0b 00 04 31 31 31 31 XX XX 39 39 39

  XX XX可以和上次的一樣,也可以不一樣

 

2.2 假設客戶端2訂閱了主題是 1111 ;消息等級是:1 

  服務器接收到客戶端1發送的消息之后,轉發給客戶端2

  32 0b 00 04 31 31 31 31 XX XX 39 39 39

  注意現在的XX XX(消息標識符)是服務器自己隨機生成的了

  假設標識符是 00 02

  客戶端2在接收到消息之后需要返回應答(PUBACK) 告訴服務器我收到了

  40 02 00 02

 

  如果客戶端2不回復:40 02 00 02  (后面咱就叫 PUBACK)

  服務器便會一直發送消息給客戶端2

  3A 0b 00 04 31 31 31 31 00 02 39 39 39

  注意開頭變為了 3A (服務器自動會把重傳標志置一)

  

 

  高4位是 3 固定

  后面四位:

  第一位:DUP  標記這條消息是不是重傳的

  第2,3位:消息等級  01  :消息等級1   10:消息等級2

  最后一位:RETAIN 是否需要服務器保留這條消息

 

  本來是 32    0011 0010

  變為了 3A    0011 1010

 

 

  其實服務器加上DUP是為了讓客戶端知道,我這條消息是重傳的,

  因為服務器第一次發的時候客戶端沒有返回PUBACK

  但是服務器知道我確實是傳給了客戶端

  客戶端這邊假設真的是沒有及時的回復PUBACK

  那么有兩種方式處理

  2.2.1.再次接收到消息以后,無論消息有沒有DUP標志

  直接處理消息

  如果判斷這條消息是需要返回 PUBACK的

  那么直接根據消息里面的消息標識符返回 PUBACK 即可

  2.2.2.判斷下如果有DUP標志,那么再提取下消息標識

  看一下我先前是不是處理了有相同消息標識符的消息

  如果有就說明我已經處理了,只是沒有返回PUBACK

  那么我不去處理這條消息

  直接根據消息里面的消息標識符返回PUBACK就可以

 

  2.2.3 其實....

  但是整體來說,對於消息等級是1的消息統統處理即可

  然后根據消息里面的消息標識符返回PUBACK即可

  先說一下為什么

  其實在客戶端1發布消息等級是1的消息的時候,

  如果客戶端1由於某些原因沒有接收到服務器的PUBACK

  那么客戶端1還會再發布先前的消息

  其實現在就有兩條或者多條相同的消息在服務器里面

  這些相同的消息(標識符不一樣的消息)就會發給客戶端2

  如果客戶端2一直不應答(PUBACK),那么服務器便會把所有的沒有收到應答的消息

  的DUP標記置一以后不停的發給客戶端2...

  直至客戶端2應答了所有的消息,或者客戶端2斷線了

  服務器才停止發送

  對於單片機而言,這些處理只能自己去實現

  為了方便和節省內存,對於消息等級是1的消息

  可以直接根據消息里面的消息標識符返回PUBACK

 

     所以對於消息等級是1的消息,其實客戶端至少會接收到1次消息

  

 

 

3.1 假設客戶端1 發布的主題是 1111 ;消息等級是:2  ;發送的消息是999  最終發送的信息如下:

 

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

  XX XX是在發送的時候需要加上的消息標識符:

  消息標識符XX XX隨意即可:范圍1-65535

  假設消息標識符是 00 01

  注意:服務器接收到此消息以后並不會立即發送給訂閱了主題是1111,消息等級是2的客戶端

  

  服務器接收到以后會返回:(PUBREC) "告訴客戶端我收到了"

  50 02 00 01

 

  客戶端1需要返回:(PUBREL) "好的"

  62 02 00 01 

  注意:返回這個以后,消息才會下發給訂閱了主題是1111,消息等級是2的客戶端

 

  服務器接着會返回:(PUBCOMP)

  70 02 00 01

 

 

  很多人介紹QS2都說保證消息只傳輸一次,其實實際上是這樣保證的

  客戶端1在發送完消息以后

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

  服務器會返回(PUBREC)

 

  但是只要客戶端1不回復 (PUBREL)

  無論客戶端1現在發送多少條消息等級是2的消息

  服務器都不會理會,服務器只會記錄你發送的最后一條消息

  客戶端1只有回復了(PUBREL)

  服務器才會把最后一條消息轉發出去

  最后返回 (PUBCOMP)

 

   

 

3.2 假設客戶端2訂閱了主題是 1111 ;消息等級是:2 

 

  服務器接收到客戶端1發送的消息,  然后確認接收到客戶端1的(PUBREL)之后,

  轉發給客戶端2 :

 

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

 

  注意現在的XX XX(消息標識符)是服務器自己隨機生成的了

 

  假設標識符是 00 02

 

  客戶端2接收到以后需要返回:(PUBREC) "告訴服務器我收到了" 

  50 02 00 02

  注意:如果客戶端2不回復:(PUBREC),那么服務器會不停的發送

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

  直至客戶端2回復:(PUBREC)

 

 

  服務器接收到以后會返回:(PUBREL) "好的"

  62 02 00 02

 

  客戶端2最后需要返回:(PUBCOMP)

  70 02 00 02

  注意:即使客戶端2不返回(PUBCOMP),服務器隔一段時間也會默認客戶端2回復了(PUBCOMP)

 

 

補充(關於retain)

 

 

 

 

發布消息的時候,第一個字節的最后一位代表 Retain

意思是,是否需要服務器保留這個消息

如果設置了服務器保留了這個消息,那么只要客戶端訂閱了這個消息的主題

服務器就會立馬發送給客戶端保留的這個消息

詳細說明:

假設我發布的主題是:1111 消息是:999 ,消息等級隨意(假設是0)  然后設置了 Retain位置為1

31 09 00 04 31 31 31 31 39 39 39

客戶端1把這條消息發給了服務器

然后過了一會,客戶端2上線了

訂閱了主題 1111

因為上面的消息設置了讓服務器保留

所以只要客戶端2 一訂閱1111,便會收到 999這個消息

這就是Retain的作用

 

 

當然,有更巧妙的應用

我一般用這個來發送在線或者掉線,或者開關量數據

1.我設置客戶端1的遺囑發布的主題是 1111  遺囑消息是 offline   Retain為1

2.我設置客戶端1連接上服務器以后先發布一條消息

發布的主題也是 1111  消息是:online  Retain為1

 

注意:服務器最終只會保留最后一條需要保留的消息

 

只要是另一個客戶端訂閱 1111

如果客戶端1是掉線的,那么便會立即收到 offline

如果客戶端1是在線的,呢么便會立即收到 online

 

有些時候咱控制開關,咱打開上位機以后想立即知道開關的狀態

最好的方式就是把發送開關亮數據的主題 Retain設置為1

那么只要是上位機一訂閱設備發布的開關量主題,

便會立即得到開關量數據!

這樣便提高了用戶體驗.

 

 

結語

 以上說的,對於高級語言已經封裝好的包而言,發布或者接收消息等級1/2,

需要做的后續操作,高級語言里面已經做了處理

但是對於單片機而言,需要自己處理

 

 

補充:

昨天有個人測試MQTT,發現只要設備連接上MQTT,不需要訂閱,就能接收到服務器的消息

然后他就有點懵!

我說一下,其實這個功能也是屬於MQTT的范疇!

大家看MQTT協議,只知道訂閱了某個主題就可以收到某個主題的信息

注意:MQTT協議中並沒有說只有訂閱才可以收到!

 

 

我說一下:其實MQTT就是個TCP服務器,MQTT客戶端就是個TCP客戶端

其實大家要從大的方向上去考慮,整個的通信就分為兩層.

第一層是TCP通信

第二層是解析TCP數據

就是這樣而已!!!!

 

我問一下,我TCP服務器可以給TCP客戶端主動發信息吧???

其實就是上面說的,不需要訂閱就可以接收到信息!

有些人會想,怎么會?

不訂閱為啥可以接收?

其實你只是跟着別人學了個表面東西,然后給自己設定了一個錯誤的思路:只有訂閱才能接收信息!

 

服務器可以主動推送信息給各個客戶端!只要是信息格式是正確的就可以

因為TCP接收到信息以后就是解析一下是不是MQTT協議格式的數據而已!

有些人會想,為啥客戶端解析協議里面不判斷下是不是自己訂閱的呢??

這個是你自己根據自己的需求去做的事情!

 

 

你要想讓你的協議兼容到各個使用場景,你說你是做成開放形式還是封閉的??

你可能咋一聽感覺為啥還能這樣!

其實本來就是有的東西,只是你以前不了解而已!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM