50-STM32+ESP8266+AIR202基本控制篇-重點詳解-MQTT協議


 

 

 先來體驗一下MQTT通信

 

 


這是我制作的一個上位機MQTT調試助手,提供了源碼.

 

 


 

 




注:需要打開兩個,默認連接提供的服務器測試.

第一個配置如下:
發布的主題:aaaaa
訂閱的主題:Topic
點擊連接,然后點擊訂閱

 

 




第二個配置如下:
發布的主題:Topic
訂閱的主題:aaaaa
點擊連接,然后點擊訂閱

 

 




第一個軟件發消息:發送的消息123456,然后點擊發送


 

 



用戶會看到第二個軟件收到消息
提示:這個軟件是自己開發的,里面的顯示都是自己規定的.



其實過程是這樣的:
兩個客戶端都連接了一個MQTT服務器(這個只是個軟件,后面章節會告訴大家怎么安裝)
第一個客戶端發布的主題那一欄填寫的是 aaaaa  然后發送的消息是 123456
點擊發送的時候實際上該消息就發給了MQTT服務器
整個消息格式呢大概是這樣的 XXXXaaaaaXXXX123456
XXXX呢代表其它信息,方便服務器區分出來整個消息中的
發布的主題(aaaaa)和發布的消息(123456)
其實 aaaaa 就充當了這個消息的標識

 

第二個客戶端的訂閱那一項填寫的是 aaaaa
其實就是在告訴服務器,我需要數據標識是 aaaaa的消息
既然你告訴了服務器了,那么服務器只要接收到數據標識是 aaaaa
的消息,那么就會主動把消息發給你



同理,讓下面的客戶端把消息發給上面的客戶端


 

 




簡要說明:

連接上MQTT服務器以后,只要是兩個設備之間訂閱和發布的主題對上了
那么這兩個設備就可以通信了
對於初學者可能疑惑,你軟件點點點到底內部是怎么做到的
如果你想知道更多就接着看,我先說明一下過程.

 

其實MQTT就是一個TCP服務器,它是在TCP通信的時候封裝了一套協議.
咱們就叫它MQTT協議,注意本質上就是TCP傳輸數據,這個數據有格式而已!
首先是使用TCP連接,然后發送MQTT連接協議,然后發送MQTT訂閱主題的協議.
這樣的話,服務器就知道你需要哪種標識的數據了.
當服務器收到這種標識的數據的時候,服務器就會主動轉發給你.
其實MQTT服務器主要工作就是做數據轉發,但是你需要告訴它你需要什么樣的數據.



思考
1,其實理解一個東西最好的方式就是:你要設想如果讓你自己做一個這樣的服務器,你會怎么做.

 

2,現在需求是做一個負責數據轉發的軟件
首先,平時的時候咱做的TCP服務器都是,一個或者多個客戶端連接咱做的TCP服務器,然后TCP服務器處理客戶端的數據.
現在呢!需求變了!
假設我有5個網絡設備,3個手機.我現在想讓網絡設備把數據遠程傳給手機.而且我還需要記錄網絡設備上傳的數據.
假設通信是這樣的(而且后期還會不停的增加設備和手機)

 

 

3,咋辦???
  1. 需要記錄所有設備的數據
  2. 設備和手機之間存在多對一和一對多
  所以,必須需要個公共的服務器進行數據的中轉.
  假設你就把這個服務器做成TCP服務器,有人問,你咋不做成UDP呢?
       UDP他妹的是無連接狀態,發送數據不好判斷是不是發送成功,我還是少找些麻煩!
  還有就是要實現遠程,有個公網IP就可以,可以自己買個服務器,上網絡公司拉一根專網
  或者用自己電腦,用花生殼映射
  還是用雲服務器吧!就是運行在別人的服務器上的一台電腦(就是一台電腦),IP地址直接是公網.方便.

 

4,怎么設計這個TCP服務器???
  1.為了應對這種通信,首先設備發送的數據決不能是單單的數據,必須加點東西
  2.如果把發送的數據帶上標識呢? 假設設備1發送的數據是:   aaaaa數據   (aaaaa是數據標識,后面是真實數據)
  3.然后呢!假設手機1就接收數據標識是aaaaa的數據,怎么讓服務器轉發給它呢???
  4.如果手機1在連接上TCP服務器的時候 告訴TCP服務器我接收數據標識是 aaaaa的數據
  5.通過上面的方式是不是有點眉頭了????
  咱呢姑且把 "告訴TCP服務器我接收數據標識是 aaaaa的數據"  這個事情呢,起個名字     訂閱的主題是 aaaaa
  把 "假設設備1發送的數據是:  aaaaa數據 "    消息前面的 aaaaa 叫做 發布的主題是aaaaa

 

5,總結上面的就是
  手機1先連接TCP服務器,然后呢,規定個協議,告訴TCP服務器我訂閱的主題是aaaaa
  這樣呢服務器就記住了,當出現消息前面的主題是aaaaa的消息的時候,他就把這個消息發給手機1

 

  當然咱假設,設備1連接上TCP服務器,然后,告訴TCP服務器我訂閱的主題是wwww
  這樣呢服務器就記住了,當出現消息前面的主題是wwww的消息的時候,他就把這個消息發給設備1

 

  然后設備1連接上TCP服務器以后呢,這樣發送信息(假設發送的消息是123456):   aaaaa123456
  服務器一接收到客戶端的消息,就取出來這個消息的標識是什么,取出來的是 aaaaa
  然后呢,看下記錄的誰需要消息標識是aaaaa的消息,然后找到了手機1
  最后把這個消息發送給手機1這個客戶端,然后手機1就接收到了1123456這個消息

 

  同理:手機1發送  wwww998877  然后這個消息就會發給設備1 ,設備1就會收到 998877

 

6,總結
  這個服務器道理上是這樣,服務器記錄各個設備的信息,各個設備訂閱的主題,然后呢,判斷這個消息然后進行轉發
  但是...咱做個簡單的完全可以做出來,但是要想做的完善,而且要支持龐大消息數量的設備(來個百萬級).....不是一朝一夕就可以的.
  其實很長時間以前,人們就有這種需求了.多對一和一對多通信
  所以呢,一些組織和單位就開始解決這種問題,開始做這種軟件,所以MQTT就誕生了.
  之所以叫MQTT是因為是外國人做的這種TCP服務器,外國人呢,為實現這種功能的TCP服務器取了個名字叫
  Message Queuing Telemetry Transport
  然后取每個首字母  就叫 MQTT了
  其實有很多家做MQTT軟件,但是呢,我比較喜歡用emqtt

 

來聊一下具體的MQTT協議了
一,首先咱知道就是個TCP服務器,所以呢,需要先用TCP連接上他們的服務器.
二,咱用Android ,C#,QT,網頁等等連接MQTT服務器的時候有現成的封裝好的庫可以用
 其實說白了就是調用函數而已.....
三,但是對於單片機而言要想實現MQTT通信,那么就需要借助網絡模塊
    大部分的網絡模塊都可以實現TCP通信,咱呢,就需要在TCP的基礎上按照MQTT協議封裝下咱的數據
    注:其實官方給了現成的MQTT的封裝數據和解析數據的程序)

 

四,咱利用網絡模塊的TCP連接上以后
 然后需要發送第一條消息(注:並不是上來就可以訂閱主題的)
 MQTT軟件規定呢,你發送的第一條信息是連接信息(相當於咱要先登錄)
 他規定了幾個參數!
 ClientID: 各個客戶端必須設定一個ID,各個客戶端必須都不一樣               假設是 123456
 用戶名: 咱安裝MQTT軟件的時候可以設置MQTT軟件的登錄的用戶名     假設是yang
 密碼: 咱安裝MQTT軟件的時候可以設置MQTT軟件的登錄的密碼             假設是 11223344
 下面是我當初研究MQTT的協議寫的,然后把上面三個參數填進去
 注意這節我粘貼的代碼只是為了大家了解協議,該文章封裝的包不完整,請不要使用
 咱板子是用的STM32,里面使用的庫是我專門為單片機封裝的.
   https://docs.emqx.io/sdk_tools?category=MQTT_Clients   (官方提供的各個開發的庫)

 

 

 

 

單片機用下面這個,我當前MQTT程序的庫就是用的這個,不過后來舍棄了,重新自己封裝的
不是因為不好用,而是因為占用內存太大!

 

 



我做的最底層的mqtt消息打包,解析公開給大家

/**
  ******************************************************************************
  * @author  yang feng wu 
  * @version V1.0.0
  * @date    2019/12/15
  * @brief   
  ******************************************************************************
    
  ******************************************************************************
  */

#define MQTTCLIENT_C_//如果沒有定義

#include "mqtt_msg.h"
#include "string.h"
#include "stm32f10x.h"


#define MQTT_MAX_FIXED_HEADER_SIZE 3

uint16_t mqtt_message_id = 0;

enum mqtt_connect_flag
{
  MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
  MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
  MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
  MQTT_CONNECT_FLAG_WILL = 1 << 2,
  MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
};
//__attribute((__packed__))
struct  mqtt_connect_variable_header
{
  uint8_t lengthMsb;
  uint8_t lengthLsb;
  uint8_t magic[4];
  uint8_t version;
  uint8_t flags;
  uint8_t keepaliveMsb;
  uint8_t keepaliveLsb;
};


int mqtt_get_type(unsigned char* buffer) { return (buffer[0] & 0xf0) >> 4; }
int mqtt_get_connect_ret_code(unsigned char* buffer) { return (buffer[3]); }
int mqtt_get_qos(unsigned char* buffer) { return (buffer[0] & 0x06) >> 1; }


int append_string(int *length,unsigned  char* buffer,int buffer_length,unsigned  char* string, int len)
{
  if((*length) + len + 2 > buffer_length)//加上 ClientID 和 記錄 ClientID個數(兩位) 以后超出了數組
    return -1;

  buffer[(*length)++] = len >> 8;
  buffer[(*length)++] = len & 0xff;
  c_memcpy(buffer + (*length), string, len);
  (*length) += len;
  return len + 2;
}



uint16_t append_message_id(int *length,unsigned  char* buffer,int buffer_length, uint16_t message_id)
{
  // If message_id is zero then we should assign one, otherwise
  // we'll use the one supplied by the caller
  while(message_id == 0)
    message_id = ++mqtt_message_id;

  if((*length) + 2 > buffer_length)
    return 0;

  buffer[(*length)++] = message_id >> 8;
  buffer[(*length)++] = message_id & 0xff;
    
  return message_id;
}


int fini_message(unsigned char **data_ptr,int    length,unsigned char* buffer, int type, int dup, int qos, int retain)
{
  int remaining_length = length - MQTT_MAX_FIXED_HEADER_SIZE;
    
  if(remaining_length > 127)
  {
    buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    buffer[1] = 0x80 | (remaining_length % 128);
    buffer[2] = remaining_length / 128;
    length = remaining_length + 3;
    *data_ptr = buffer;
  }
  else
  {
    buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    buffer[2] = remaining_length;
    length = remaining_length + 2;
    *data_ptr = buffer + 1;
  }

  return length;
}



uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length)
{
  if(length < 1)
    return 0;
    
  switch(mqtt_get_type(buffer))
  {
    case MQTT_MSG_TYPE_PUBLISH:
    {
      int i;
      int topiclen;

      for(i = 1; i < length; ++i)
      {
        if((buffer[i] & 0x80) == 0)
        {
          ++i;
          break;
        }
      }

      if(i + 2 >= length)
        return 0;
      topiclen = buffer[i++] << 8;
      topiclen |= buffer[i++];

      if(i + topiclen >= length)
        return 0;
      i += topiclen;

      if(mqtt_get_qos(buffer) > 0)
      {
        if(i + 2 >= length)
          return 0;
        //i += 2;
      } else {
          return 0;
      }
      return (buffer[i] << 8) | buffer[i + 1];
    }
    case MQTT_MSG_TYPE_PUBACK:
    case MQTT_MSG_TYPE_PUBREC:
    case MQTT_MSG_TYPE_PUBREL:
    case MQTT_MSG_TYPE_PUBCOMP:
    case MQTT_MSG_TYPE_SUBACK:
    case MQTT_MSG_TYPE_UNSUBACK:
    case MQTT_MSG_TYPE_SUBSCRIBE:
    {
      // This requires the remaining length to be encoded in 1 byte,
      // which it should be.
      if(length >= 4 && (buffer[1] & 0x80) == 0)
        return (buffer[2] << 8) | buffer[3];
      else
        return 0;
    }

    default:
      return 0;
  }
}



/**
* @brief   獲取MQTT返回的數據長度(去掉1和2字節后面數據的長度)
* @param   buffer   MQTT返回的數據首地址
* @param   length   返回的數據個數
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_get_total_length(unsigned char* buffer, uint16_t length)
{
  int i;
  int totlen = 0;

  for(i = 1; i < length; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  return totlen;
}




/**
* @brief   打包連接MQTT指令
* @param   info     MQTT信息
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  struct mqtt_connect_variable_header* variable_header;
    
    mqtt_message_id = 0;
    
    length = MQTT_MAX_FIXED_HEADER_SIZE;//頭.連接類型1位,數據個數2位(如果大於127就需要兩位)
    
  if(length + sizeof(*variable_header) > buffer_length)//數組不夠存儲的
    return 0;
    
  variable_header = (void*)(buffer + length);//把數組分給這個結構體里面的變量
  length += sizeof(*variable_header);//存儲完 連接類型,整個數據個數,版本號個數,版本號,等
    
  variable_header->lengthMsb = 0;//版本名稱個數高位
  variable_header->lengthLsb = 4;//版本名稱個數低位
  c_memcpy(variable_header->magic, "MQTT", 4);//版本名稱MQTT
  variable_header->version = 4;//版本號
  variable_header->flags = 0;//先清零
  variable_header->keepaliveMsb = info->keepalive >> 8;//心跳包時間
  variable_header->keepaliveLsb = info->keepalive & 0xff;//心跳包時間

  if(info->clean_session)//清除連接信息
    variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;

  if(info->client_id != NULL && info->client_id[0] != '\0')//client_id
  {
    if(append_string(&length,buffer,buffer_length, info->client_id, c_strlen(info->client_id)) < 0)//拷貝
      return -1;//數組不夠用呀...
  }
  else
    return -2;//沒有設置client_id

  if(info->will_topic != NULL && info->will_topic[0] != '\0')//遺囑
  {
    if(append_string(&length,buffer,buffer_length , info->will_topic, c_strlen(info->will_topic)) < 0)//遺囑的主題
      return -3;

    if(append_string(&length,buffer,buffer_length , info->will_message, c_strlen(info->will_message)) < 0)//遺囑的消息
      return -4;

    variable_header->flags |= MQTT_CONNECT_FLAG_WILL;//需要遺囑
    if(info->will_retain)//遺囑是夠需要服務器保留
      variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;//保留遺囑
    variable_header->flags |= (info->will_qos & 3) << 3;//遺囑消息等級
  }
    
  if(info->username != NULL && info->username[0] != '\0')//username
  {
    if(append_string(&length,buffer,buffer_length, info->username, c_strlen(info->username)) < 0)//拷貝用戶名
      return -5;

    variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;//有用戶名
  }
    
  if(info->password != NULL && info->password[0] != '\0')//password
  {
    if(append_string(&length,buffer,buffer_length, info->password, c_strlen(info->password)) < 0)
      return -6;

    variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;//有密碼
  }

  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);//最終組合連接MQTT的指令
}


/**
* @brief  判斷是否連接上MQTT
* @param  服務器返回的數據
* @param  
* @retval 0 連接成功
* @example 
**/
int  mqtt_msg_connect_ack(unsigned char *buff)
{
    if(mqtt_get_type(buff) == MQTT_MSG_TYPE_CONNACK)
    {
        return mqtt_get_connect_ret_code(buff);
    }
    return -1;
}


/**
* @brief   斷開連接
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_disconnect(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}



/**
* @brief   訂閱主題
* @param   topic   訂閱的主題
* @param   qos     消息等級
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;
    
    if(topic == NULL || topic[0] == '\0')
        return -1;
    
    if((mqtt_message_id = append_message_id(&length, buffer, buffer_length, 0)) == 0)
        return -2;
    
    if(append_string(&length, buffer, buffer_length, topic, c_strlen(topic)) < 0)
        return -3;
    
    if(length + 1 > buffer_length)
    return -4;
  buffer[length++] = qos;
    
    return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}



/**
* @brief  判斷是否成功訂閱
* @param  buffer  服務器返回的數據
* @param  length  服務器返回的數據長度
* @retval 0:成功  1:失敗
* @example 
**/
int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length)
{
    if(mqtt_get_type(buffer) == MQTT_MSG_TYPE_SUBACK)
    {
        if(mqtt_get_id(buffer,length) == mqtt_message_id)
        {
            return 0;
        }
        else
        {
            return 1;
        }
    }
    else
    {
        return 1;
    }
}


/**
* @brief   發布消息
* @param   topic    主題
* @param   data     消息
* @param   data_length 消息長度
* @param   qos      消息等級
* @param   retain   是否需要保留消息
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_publish(unsigned char* topic,unsigned  char* date, int data_length, int qos, int retain,unsigned  char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;

  if(topic == NULL || topic[0] == '\0')
    return -1;

  if(append_string(&length, buffer, buffer_length, topic, strlen(topic)) < 0)
    return -2;

  if(qos > 0)
  {
    if((mqtt_message_id = append_message_id(&length, buffer, buffer_length,  0)) == 0)
      return -3;
  }
  else
    mqtt_message_id = 0;

  if(length + data_length > buffer_length)
    return -4;
  memcpy(buffer + length, date, data_length);
  length += data_length;

  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}



int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}


int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}


int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
    
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}


int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}


const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length)
{
  int i;
  int totlen = 0;
  int topiclen;

  for(i = 1; i < *length; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i -1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  if(i + 2 >= *length)
    return NULL;
  topiclen = buffer[i++] << 8;
  topiclen |= buffer[i++];

  if(i + topiclen > *length)
    return NULL;

  *length = topiclen;
  return (const char*)(buffer + i);
}


const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length)
{
  int i;
  int totlen = 0;
  int topiclen;
  int blength = *length;
  *length = 0;

  for(i = 1; i < blength; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  if(i + 2 >= blength)
    return NULL;
  topiclen = buffer[i++] << 8;
  topiclen |= buffer[i++];

  if(i + topiclen >= blength)
    return NULL;

  i += topiclen;

  if(mqtt_get_qos(buffer) > 0)
  {
    if(i + 2 >= blength)
      return NULL;
    i += 2;
  }

  if(totlen < i)
    return NULL;

  if(totlen <= blength)
    *length = totlen - i;
  else
    *length = blength - i;
  return (const char*)(buffer + i);
}

/**
* @brief   打包服務器返回的心跳包數據(用不到)
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_pingresp(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;    
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}

/**
* @brief   獲取發送給服務器的心跳包數據
* @param   data_ptr 打包的數據首地址
* @param   buffer   打包進的數組
* @param   buffer_length 數組長度
* @retval  數據長度 
* @warning None
* @example 
**/
int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;    
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}

 

 

#ifndef MQTTCLIENT_H_
#define MQTTCLIENT_H_


#ifndef MQTTCLIENT_C_//如果沒有定義
#define MQTTCLIENT_Cx_ extern
#else
#define MQTTCLIENT_Cx_
#endif

#include "string.h"
#include "stm32f10x.h"

#define c_memcpy memcpy
#define c_memset memset
#define c_strlen strlen


enum mqtt_message_type
{
  MQTT_MSG_TYPE_CONNECT = 1,
  MQTT_MSG_TYPE_CONNACK = 2,
  MQTT_MSG_TYPE_PUBLISH = 3,
  MQTT_MSG_TYPE_PUBACK = 4,
  MQTT_MSG_TYPE_PUBREC = 5,
  MQTT_MSG_TYPE_PUBREL = 6,
  MQTT_MSG_TYPE_PUBCOMP = 7,
  MQTT_MSG_TYPE_SUBSCRIBE = 8,
  MQTT_MSG_TYPE_SUBACK = 9,
  MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
  MQTT_MSG_TYPE_UNSUBACK = 11,
  MQTT_MSG_TYPE_PINGREQ = 12,
  MQTT_MSG_TYPE_PINGRESP = 13,
  MQTT_MSG_TYPE_DISCONNECT = 14
};

enum mqtt_connack_return_code
{
    MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5,
    MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4,
    MQTT_CONN_FAIL_DNS = -3,
    MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2,
    MQTT_CONN_FAIL_TIMEOUT_SENDING = -1,
    MQTT_CONNACK_ACCEPTED = 0,
    MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1,
    MQTT_CONNACK_REFUSED_ID_REJECTED = 2,
    MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
    MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4,
    MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5
};



//連接MQTT指令 
typedef struct mqtt_connect_info
{
  unsigned  char* client_id;
  unsigned  char* username;
  unsigned  char* password;
  unsigned  char* will_topic;
  unsigned  char* will_message;
  int keepalive;
  int will_qos;
  int will_retain;
  int clean_session;
    
} mqtt_connect_info_t;


int mqtt_get_type(unsigned char* buffer);
int mqtt_get_connect_ret_code(unsigned char* buffer);
int mqtt_get_qos(unsigned char* buffer);
uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length);

int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_connect_ack(unsigned char *buff);
int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length);
int mqtt_msg_publish(unsigned char* topic,unsigned char* date, int data_length, int qos, int retain,unsigned  char **data_ptr,unsigned char* buffer,int buffer_length);

int mqtt_get_total_length(unsigned char* buffer, uint16_t length);

int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);

const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length);
const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length);

int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length);

#endif

 

 

以下協議是我為了能夠讓大家好理解整個MQTT協議,所以再次做了精簡(切勿使用下面的作為工程項目)

 

五,MQTT連接協議
/**
* @brief  連接服務器的打包函數
* @param  
* @retval 
* @example 
**/
int ConnectMqtt(char *ClientID,char *Username,char *Password)
{
    int ClientIDLen = strlen(ClientID);
    int UsernameLen    = strlen(Username);
    int PasswordLen = strlen(Password);
    int DataLen = 0;
    int Index = 2;
    int i = 0;
    DataLen = 12 + 2+2+ClientIDLen+UsernameLen+PasswordLen;
    MqttSendData[0] = 0x10;                //MQTT Message Type CONNECT
    MqttSendData[1] = DataLen;    //剩余長度(不包括固定頭部)
    MqttSendData[Index++] = 0;        // Protocol Name Length MSB    
    MqttSendData[Index++] = 4;        // Protocol Name Length LSB    
    MqttSendData[Index++] = 'M';        // ASCII Code for M    
    MqttSendData[Index++] = 'Q';        // ASCII Code for Q    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 4;        // MQTT Protocol version = 4    
    MqttSendData[Index++] = 0xc2;        // conn flags 
    MqttSendData[Index++] = 0;        // Keep-alive Time Length MSB    
    MqttSendData[Index++] = 60;        // Keep-alive Time Length LSB  60S心跳包  
    MqttSendData[Index++] = (0xff00&ClientIDLen)>>8;// Client ID length MSB    
    MqttSendData[Index++] = 0xff&ClientIDLen;    // Client ID length LSB  

    for(i = 0; i < ClientIDLen; i++)
    {
        MqttSendData[Index + i] = ClientID[i];          
    }
    Index = Index + ClientIDLen;
    
    if(UsernameLen > 0)
    {   
        MqttSendData[Index++] = (0xff00&UsernameLen)>>8;//username length MSB    
        MqttSendData[Index++] = 0xff&UsernameLen;    //username length LSB    
        for(i = 0; i < UsernameLen ; i++)
        {
            MqttSendData[Index + i] = Username[i];    
        }
        Index = Index + UsernameLen;
    }
    
    if(PasswordLen > 0)
    {    
        MqttSendData[Index++] = (0xff00&PasswordLen)>>8;//password length MSB    
        MqttSendData[Index++] = 0xff&PasswordLen;    //password length LSB    
        for(i = 0; i < PasswordLen ; i++)
        {
            MqttSendData[Index + i] = Password[i];    
        }
        Index = Index + PasswordLen; 
    }    
    return Index;
}

 

 

假設我ClientID填寫的是:123456
UserName填寫的是:yang
Password填寫的是:11223344

 

執行以后得到以下數據
10 22 00 04 4D 51 54 54 04 C2 00 78 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34
然后把這個數據發給TCP 服務器,如果沒有錯誤,服務器就會回 20 02 00 00
咱可以用TCP調試助手試一試
先連接我的哈,后面章節有說明怎么安裝MQTT服務器
 

 

 


 

 

 


 

 


IP地址:47.92.31.46  注意:如果IP不可以連接可以填域名 mnifdv.cn
端口號:1883

<>

 

 

 

先說一件事情  所有的MQTT數據哈  第一個字節是說明整個數據是干什么的數據   第二個字節是說它后面的數據的總個數
10 : 固定,MQTT規定的連接用0x10
22: 是說0x22后面有0x22個數據  34個
00 04: 后面記錄MQTT版本號的字節個數  
4D 51 54 54: M Q T T  版本號字符         這個是4版本,不同版本不一樣 3版本的是MQIsdp 額,了解就可以
04: 版本號是 0x04


 

 



C2 :1100 0010

bit7 bit6:是否有用戶名和密碼
bit5 :遺囑是否需要服務器保留
bit4 bit3:遺囑的消息等級
bit2:是否設置了遺囑
bit1:是否清除以前的連接信息
bit0:保留,默認0




上面就是說有用戶名和密碼,每次連接的時候清除連接信息,沒有設置遺囑(后面會說)
00 78: 心跳包是120S一次(這個自己連接的時候自己設置),

 

MQTT規定客戶端必須發心跳包,客戶端發送的心跳包數據是 0xC0 0x00,這是MQTT規定的
如果心跳包間隔了你設定心跳包的1.5倍時間,你沒有發給服務器,服務器就認為你掉線了,這是關於遺囑的問題,,后面會說
你發給服務器 0xC0 0x00  服務器會回你 0xD0  0x00  這個知道就行了

 

00 06:客戶端的ClientId有6位
后面的  31 32 33 34 35 36    就是ClientId ,這是MQTT服務器規定的,每個客戶端必須有各自的ClientId

 

00 04: MQTT的用戶名  
79 61 6E 67  我安裝MQTT的時候設置的MQTT的用戶名是yang

 

00 08: MQTT的密碼
31 31 32 32 33 33 34 34  我安裝MQTT的時候設置的MQTT密碼

 

好了,總結下就是連接上TCP服務器 然后發送
10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34
服務器呢就會回你  20 02 00 00
20: 固定
02: 后面有兩個數據
00 00
注意后面的第一個數據 00 ,如果你設置了 Clean Session為1 :便會回復 01

 

最后一個數據呢,有幾個返回值,0就說明成功,其它就是有各種問題
比如說回的是 20 02 00 04  就說明用戶名或者密碼有問題.


 

 

 




六,訂閱主題

假設告訴服務器我訂閱的是2222
假設訂閱的時候訂閱的主題的消息標識是1,消息等級是0
那么打包以后就是 82 09 00 01 00 04 32 32 32 32 00  然后把這個數據發給TCP服務器

 

讓測試用TCP調試助手訂閱,然后用咱的MQTT調試助手發信息給咱的TCP調試助手
注意:現在咱的TCP可能已經斷開了,因為咱的TCP調試助手沒有在規定時間內發送心跳包
首先准備好


 

 

 



首先連接
10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34


 

 

 




然后訂閱
82 09 00 01 00 04 32 32 32 32 00


 

 

 



然后用MQTT調試助手發消息


 

 



 

 




訂閱主題

/**
* @brief  MQTT訂閱/取消訂閱數據打包函數
* @param  SendData 
* @param  topic                主題 
* @param  qos         消息等級 
* @param  whether     訂閱/取消訂閱請求包
* @retval 
* @example 
**/
int MqttSubscribeTopic(char *topic,u8 qos,u8 whether)
{    
    int topiclen = strlen(topic);
    int i=0,index = 0;
  
    if(whether)
        MqttSendData[index++] = 0x82;                        //0x82 //消息類型和標志 SUBSCRIBE 訂閱
    else
        MqttSendData[index++] = 0xA2;                        //0xA2 取消訂閱
    MqttSendData[index++] = topiclen + 5;                //剩余長度(不包括固定頭部)
    MqttSendData[index++] = 0;                          //消息標識符,高位
    MqttSendData[index++] = 0x01;                    //消息標識符,低位
    MqttSendData[index++] = (0xff00&topiclen)>>8;    //主題長度(高位在前,低位在后)
    MqttSendData[index++] = 0xff&topiclen;              //主題長度 
    
    for (i = 0;i < topiclen; i++)
    {
        MqttSendData[index + i] = topic[i];
    }
    index = index + topiclen;
    
    if(whether)
    {
        MqttSendData[index] = qos;//QoS級別
        index++;
    }
    return index;
}

 



假設上面的MqttSubscribeTopic("2222",0,1)

0x82: 告訴MQTT服務器,我要訂閱主題
0x09: 后面的數據個數
0x00  0x01  注意哈,訂閱主題的時候可以設置了標識  標識呢  1-65535
之所以有這個家伙:咱訂閱的時候怎么判斷訂閱成功了呢???
訂閱成功以后呢!服務器會返回咱訂閱成功的回復,回復里面就包含着咱寫的這個標識
咱呢可以對比下這個標識,然后呢就知道到底是不是訂閱成功了.
0x00  0x04  后面訂閱主題的長度
32 32 32 32 訂閱的主題是 2222
最后一個 00 是說消息等級,一般呢,訂閱設置為0 就可以
那就說一下這個消息等級有什么用吧!
咱發送數據的時候也會攜帶一個消息等級
假設是0  那么這條消息是不是真的發給MQTT服務器(Broker)了,就不知道了,

 

假設是1 那么一個客戶端發送消息以后呢,服務器一看消息等級是1,那么就會回給那個發送消息的客戶端一個應答消息
客戶端可以根據有沒有回復應答確認發沒發送成功

 

假設是2 這個呢服務器和客戶端之間會有雙向的應答!后面會詳細說.

 

如果按照上面發呢,服務器會回
90 03 00 01 00
90:固定
03:后面的數據長度
00 01:這條主題的標識
00:消息等級

 

如果訂閱多個主題假設訂閱兩個主題 消息等級第一個是0 第二個是1
90 04 00 01 00 01
90:固定
03:后面的數據長度
00 01:這條主題的標識
00:消息等級
01:消息等級

 

假設訂閱失敗
后面的消息等級就會變為 0x80 (訂閱一個主題)
90 03 00 01 00
90:固定
03:后面的數據長度
00 01:這條主題的標識
80:消息等級變為0x80



訂閱兩個主題,第一個訂閱失敗
后面的消息等級就會變為 0x80
90 03 00 01 00
90:固定
04:后面的數據長度
00 01:這條主題的標識
80:消息等級
00:消息等級




七,發布消息

長話短說
發布的時候呢,信息里面都有以下內容
發布的主題,消息,消息等級,是不是需要服務器保留消息,消息的標識
/**
* @brief  MQTT發布數據打包函數
* @param  mqtt_message 
* @param  topic                主題 
* @param  qos         消息等級 
* @retval 
* @example 
**/
int MqttPublishData(char * topic, char * message, u8 qos)
{  
    int topic_length = strlen(topic);    
    int message_length = strlen(message);  
    int i,index=0;    
    static u16 id=0;
    
    MqttSendData[index++] = 0x30;    // MQTT Message Type PUBLISH  30:消息等級是0  32消息等級是1  34消息等級是2

  
    if(qos)
        MqttSendData[index++] = 2 + topic_length + 2 + message_length;//數據長度
    else
        MqttSendData[index++] = 2 + topic_length + message_length;   // Remaining length  

  
    MqttSendData[index++] = (0xff00&topic_length)>>8;//主題長度
    MqttSendData[index++] = 0xff&topic_length;
         
    for(i = 0; i < topic_length; i++)
    {
        MqttSendData[index + i] = topic[i];//拷貝主題
    }
    index += topic_length;
        
    if(qos)
    {
        MqttSendData[index++] = (0xff00&id)>>8;
        MqttSendData[index++] = 0xff&id;
        id++;
    }
  
    for(i = 0; i < message_length; i++)
    {
        MqttSendData[index + i] = message[i];//拷貝數據
    }
    index += message_length;
        
    return index;
}

 



發布的主題: 誰訂閱了這個主題,服務器就會把相應的消息傳給誰
消息等級:上面說了
是不是需要服務器保留消息:一會和遺囑一塊說
消息的標識:每條消息加個標識,用來區分消息



八,遺囑

還記得上面

 

我直接說遺囑是啥意思哈!
假設我手機和一個設備訂閱主題和發布主題對應,我就能和這個設備通信了
但是,我怎么知道這個設備掉線了呢?
當然完全可以自己發信息給那個設備,如果不回復,就說明掉線了
但是呢!MQTT服務器提供了一種方式
假設我設置好一個設備的遺囑消息是  offline    遺囑發布的主題是 aaaaa
另一個設備訂閱的主題是 aaaaa
如果設備掉線,服務器就會給訂閱了aaaaa的設備發送  offline

 

還記得上面說的不   服務器如果在你設置的心跳包時間的1.5倍收不到心跳包就認為你掉線了.




九,心跳包

  MQTT規定的,發送完連接協議之后
  發送的心跳包數據是C0 00
  發送時間:連接協議里面的心跳包時間(你可以提前發)
  然后服務器回復 D0 00
  


擴展

   有人會問,如果我想監控所有設備的數據應該怎么做
  就是說,我有個所有設備都可以管理的后台
  假設我是用C#做了一個MQTT的上位機,監控所有的數據

 

  笨法:
    你訂閱的時候把所有設備發布的主題全部訂閱一遍
    假設現在其中一個設備,想獲取其它連個設備的數據
    其它兩個設備發布的主題如下:
    

 

      另一個設備
    訂閱 aaaaa 然后再訂閱 wwww
    



    

 

    然后就可以了
    



     另一個客戶端
    



   MQTT自帶的絕招:
    先說一下哈
    假設一個客戶端發布的主題是 tttt/aaaaa
    還有一個客戶端發布的主題是 tttt/wwww
    如果想讓有一個客戶端接收他倆的數據
    你只需要訂閱  tttt/#

 

    



    

 

    



    

 

    


補充(關於MQTT消息等級,DUP)

1.1假設客戶端1 發布的主題是 1111 ;消息等級是:0  ;發送的消息是999  最終發送的信息如下:
  30 0b 00 04 31 31 31 31 39 39 39
  消息等級是0是說明該消息發送出去就完事了,服務器不會回復任何應答信息.
  至於該消息發沒發給服務器,不知道!
1.2假設客戶端2 訂閱的主題是:1111  消息等級是 0
  假設客戶端1 確實把消息發給了服務器
  客戶端2 收到消息以后,不需要做任何操作
  
  

 

2.1 假設客戶端1 發布的主題是 1111 ;消息等級是:1  ;發送的消息是999  最終發送的信息如下:
  32 0b 00 04 31 31 31 31 XX XX 39 39 39
  XX XX是在發送的時候需要加上的消息標識符:
  消息標識符XX XX隨意即可:范圍1-65535
  假設消息標識符是 00 01

 

  發送完以上消息以后,服務器會回復: (PUBACK) 告訴客戶端我收到了
  40 02 00 01
  (00 01就是咱上面發送的消息標識符)
  這樣就證明消息確實送達給了服務器




  如果客戶端1 發布完消息以后沒有接收到服務器的應答
  則可以重新發布消息
  32 0b 00 04 31 31 31 31 XX XX 39 39 39
  XX XX可以和上次的一樣,也可以不一樣

 

2.2 假設客戶端2訂閱了主題是 1111 ;消息等級是:1
  服務器接收到客戶端1發送的消息之后,轉發給客戶端2
  32 0b 00 04 31 31 31 31 XX XX 39 39 39
  注意現在的XX XX(消息標識符)是服務器自己隨機生成的了
  假設標識符是 00 02
  客戶端2在接收到消息之后需要返回應答(PUBACK) 告訴服務器我收到了
  40 02 00 02

 

  如果客戶端2不回復:40 02 00 02  (后面咱就叫 PUBACK)
  服務器便會一直發送消息給客戶端2
  3A 0b 00 04 31 31 31 31 00 02 39 39 39
  注意開頭變為了 3A (服務器自動會把重傳標志置一)
  

 

  高4位是 3 固定
  后面四位:
  第一位:DUP  標記這條消息是不是重傳的
  第2,3位:消息等級  01  :消息等級1   10:消息等級2
  最后一位:RETAIN 是否需要服務器保留這條消息

 

  本來是 32    0011 0010
  變為了 3A    0011 1010



  其實服務器加上DUP是為了讓客戶端知道,我這條消息是重傳的,
  因為服務器第一次發的時候客戶端沒有返回PUBACK
  但是服務器知道我確實是傳給了客戶端
  客戶端這邊假設真的是沒有及時的回復PUBACK
  那么有兩種方式處理
  2.2.1.再次接收到消息以后,無論消息有沒有DUP標志
  直接處理消息
  如果判斷這條消息是需要返回 PUBACK的
  那么直接根據消息里面的消息標識符返回 PUBACK 即可
  2.2.2.判斷下如果有DUP標志,那么再提取下消息標識
  看一下我先前是不是處理了有相同消息標識符的消息
  如果有就說明我已經處理了,只是沒有返回PUBACK
  那么我不去處理這條消息
  直接根據消息里面的消息標識符返回PUBACK就可以

 

  2.2.3 其實....
  但是整體來說,對於消息等級是1的消息統統處理即可
  然后根據消息里面的消息標識符返回PUBACK即可
  先說一下為什么
  其實在客戶端1發布消息等級是1的消息的時候,
  如果客戶端1由於某些原因沒有接收到服務器的PUBACK
  那么客戶端1還會再發布先前的消息
  其實現在就有兩條或者多條相同的消息在服務器里面
  這些相同的消息(標識符不一樣的消息)就會發給客戶端2
  如果客戶端2一直不應答(PUBACK),那么服務器便會把所有的沒有收到應答的消息
  的DUP標記置一以后不停的發給客戶端2...
  直至客戶端2應答了所有的消息,或者客戶端2斷線了
  服務器才停止發送
  對於單片機而言,這些處理只能自己去實現
  為了方便和節省內存,對於消息等級是1的消息
  可以直接根據消息里面的消息標識符返回PUBACK

 

    所以對於消息等級是1的消息,其實客戶端至少會接收到1次消息




3.1 假設客戶端1 發布的主題是 1111 ;消息等級是:2  ;發送的消息是999  最終發送的信息如下:

 

  34 0b 00 04 31 31 31 31 XX XX 39 39 39
  XX XX是在發送的時候需要加上的消息標識符:
  消息標識符XX XX隨意即可:范圍1-65535
  假設消息標識符是 00 01
  注意:服務器接收到此消息以后並不會立即發送給訂閱了主題是1111,消息等級是2的客戶端

 

  服務器接收到以后會返回: PUBREC) "告訴客戶端我收到了"
  50 02 00 01

 

  客戶端1需要返回: PUBREL) "好的"
  62 02 00 01
  注意:返回這個以后,消息才會下發給訂閱了主題是1111,消息等級是2的客戶端

 

  服務器接着會返回: PUBCOMP)
  70 02 00 01



  很多人介紹QS2都說保證消息只傳輸一次,其實實際上是這樣保證的
  客戶端1在發送完消息以后
  34 0b 00 04 31 31 31 31 XX XX 39 39 39
  服務器會返回(PUBREC)

 

  但是只要客戶端1不回復 (PUBREL)
  無論客戶端1現在發送多少條消息等級是2的消息
  服務器都不會理會,服務器只會記錄你發送的最后一條消息
  客戶端1只有回復了(PUBREL)
  服務器才會把最后一條消息轉發出去
  最后返回 (PUBCOMP)




3.2 假設客戶端2訂閱了主題是 1111 ;消息等級是:2

 

  服務器接收到客戶端1發送的消息,  然后確認接收到客戶端1的(PUBREL)之后,
  轉發給客戶端2 :

 

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

 

  注意現在的XX XX(消息標識符)是服務器自己隨機生成的了

 

  假設標識符是 00 02

 

  客戶端2接收到以后需要返回: PUBREC) "告訴服務器我收到了" 
  50 02 00 02
  注意:如果客戶端2不回復: PUBREC),那么服務器會不停的發送
  34 0b 00 04 31 31 31 31 XX XX 39 39 39
  直至客戶端2回復: PUBREC)



  服務器接收到以后會返回: PUBREL) "好的"
  62 02 00 02

 

  客戶端2最后需要返回: PUBCOMP)
  70 02 00 02
  注意:即使客戶端2不返回(PUBCOMP),服務器隔一段時間也會默認客戶端2回復了(PUBCOMP)



十,補充(關於retain)





發布消息的時候,第一個字節的最后一位代表 Retain
意思是,是否需要服務器保留這個消息
如果設置了服務器保留了這個消息,那么只要客戶端訂閱了這個消息的主題
服務器就會立馬發送給客戶端保留的這個消息
詳細說明:
假設我發布的主題是:1111 消息是:999 ,消息等級隨意(假設是0)  然后設置了 Retain位置為1
31 09 00 04 31 31 31 31 39 39 39
客戶端1把這條消息發給了服務器
然后過了一會,客戶端2上線了
訂閱了主題 1111
因為上面的消息設置了讓服務器保留
所以只要客戶端2 一訂閱1111,便會收到 999這個消息
這就是Retain的作用



當然,有更巧妙的應用
我一般用這個來發送在線或者掉線,或者開關量數據
1.我設置客戶端1的遺囑發布的主題是 1111  遺囑消息是 offline   Retain為1
2.我設置客戶端1連接上服務器以后先發布一條消息
發布的主題也是 1111  消息是: online     Retain為1

 

注意:服務器最終只會保留最后一條需要保留的消息

 

只要是另一個客戶端訂閱 1111
如果客戶端1是掉線的,那么便會立即收到 offline
如果客戶端1是在線的,呢么便會立即收到 online

 

有些時候咱控制開關,咱打開上位機以后想立即知道開關的狀態
最好的方式就是把發送開關亮數據的主題 Retain設置為1
那么只要是上位機一訂閱設備發布的開關量主題,
便會立即得到開關量數據!
這樣便提高了用戶體驗.



補充:

昨天有個人測試MQTT,發現只要設備連接上MQTT,不需要訂閱,就能接收到服務器的消息
然后他就有點懵!
我說一下,其實這個功能也是屬於MQTT的范疇!
大家看MQTT協議,只知道訂閱了某個主題就可以收到某個主題的信息
注意:MQTT協議中並沒有說只有訂閱才可以收到!



我說一下:其實MQTT就是個TCP服務器,MQTT客戶端就是個TCP客戶端
其實大家要從大的方向上去考慮,整個的通信就分為兩層.
第一層是TCP通信
第二層是解析TCP數據
就是這樣而已!!!!

 

我問一下,我TCP服務器可以給TCP客戶端主動發信息吧???
其實就是上面說的,不需要訂閱就可以接收到信息!
有些人會想,怎么會?
不訂閱為啥可以接收?
其實你只是跟着別人學了個表面東西,然后給自己設定了一個錯誤的思路:只有訂閱才能接收信息!

 

服務器可以主動推送信息給各個客戶端!只要是信息格式是正確的就可以
因為TCP接收到信息以后就是解析一下是不是MQTT協議格式的數據而已!
有些人會想,為啥客戶端解析協議里面不判斷下是不是自己訂閱的呢??

這個是你自己根據自己的需求去做的事情!

你要想讓你的協議兼容到各個使用場景,你說你是做成開放形式還是封閉的?
你可能咋一聽感覺為啥還能這樣!
其實本來就是有的東西,只是你以前不了解而已!
 
 

結語

以上說的,對於高級語言已經封裝好的包而言,發布或者接收消息等級1/2,
需要做的后續操作,高級語言里面已經做了處理
但是對於單片機而言,需要自己處理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM