MQTT物聯網通訊協議入門及Demo實現


一、MQTT協議概念

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),它是一個極其輕量級發布/訂閱消息傳輸協議,輕量級指的是較少的代碼和帶寬。因為在物聯網行業有類似充電樁、娃娃機、遙控飛行器等等這樣的設備,它們的網絡可能存在不穩定的情況並且只需要傳輸少量的數據,MQTT就應運而生專為受限設備和低帶寬、高延遲或不可靠的網絡而設計。

發布/訂閱機制

發布/訂閱模型將發送消息的客戶端(發布者)與接收消息的客戶端(訂閱者)分離。發布者和訂閱者從不直接聯系。他們甚至不知道對方的存在,它們之間由一個第三方組件(代理)處理幫助篩選所有傳入消息,並將其正確分發給訂閱者。消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者

image-20220416214035341

這個機制最重要的是將發布者和訂閱者進行解耦

  1. 發布者、訂閱者不需要交換端口知道對方的主機,只需要知道代理的主機和端口
  2. 發布者、訂閱者不需要同時都運行,哪怕一方下線
  3. 發布或接收期間,這兩個組件上的操作都不需要中斷

MQTT客戶端

發布者和訂閱者都是客戶端,可以是設備也可以是服務器,簡單來說就是網絡連接到MQTT代理的任何設備

Broker代理(服務器)

代理負責接收所有消息、過濾消息、確定誰訂閱了每條消息,並將消息發送到這些訂閱的客戶端。代理還保存具有持久會話的所有客戶端的會話數據,包括訂閱和丟失的消息。代理的另一個職責是客戶端的身份驗證和授權。通常,代理是可擴展的,這有助於自定義身份驗證、授權和集成到后端系統中。

MQTT消息結構

MQTT消息包含三個部分:

  • 固定頭(Fixed header)

    image-20220417220241178

  • 可變頭(Variable header)

    image-20220417222930425

  • 消息體(payload)

image-20220417222819163

二、MQTT協議實現原理

MQTT 客戶端需要連接到代理后立即發布消息,然后訂閱者從里面訂閱數據,這里涉及到六個部分:CONNECTPublishSubscribeUnsubscribeSUBACKUnsuback

MQTT連接

客戶端向代理發送CONNECT消息。代理響應一個CONNACK消息和一個狀態碼。連接建立后,代理將保持連接打開,直到客戶端發送斷開連接命令或連接斷開

CONNECT消息主要包含以下內容:

  • ClientId:代理使用ClientId來標識客戶端和客戶端當前狀態,對於每個客戶端和代理ClientId是唯一的

  • Clean Session:標志告訴代理客戶端是否想要建立一個持久會話。如果為false代理會存儲客戶端的所有訂閱以及使用服務質量(QoS)級別1或2進行訂閱的客戶端的所有錯過的消息。如果為true代理不為客戶端存儲任何內容,並清除以前任何持久會話中的所有信息

  • Username/Password:用戶名和密碼用於客戶端身份驗證和授權。強烈建議用戶名和密碼與安全傳輸使用SSL證書驗證客戶端,因此不需要用戶名和密碼

  • Will Message:遺囑,當客戶端斷開連接時,此消息通知其他客戶端

  • KeepAlive:客戶端指定並在連接建立時與代理通信。這個間隔定義了代理和客戶端在不發送消息的情況下可以忍受的最長時間

  • LWT字段:包含lastWillTopic、lastWillMessage、lastWillRetain、lastWillQos

    這個字段可以幫助了解客戶端是正常斷開連接(使用 MQTT 斷開連接消息)還是不正常斷開連接(沒有斷開連接消息),檢測到客戶端已不正常地斷開連接。為了響應不正常的斷開連接,代理將最后一個將消息發送到最后一個將消息主題的所有訂閱客戶端。如果客戶端使用正確的斷開連接消息正常斷開連接,那么代理將丟棄存儲的 LWT 消息

代理收到 CONNECT 消息時,返回連接確認標志

MQTT消息發布

每條消息都必須包含一個主題,代理可以使用該主題將消息轉發給感興趣的客戶端

Publish消息包含以下內容:

  • packetID:數據包標識符在消息在客戶端和代理之間流動時唯一標識消息。數據包標識符僅與大於零的 QoS 級別相關

  • topicName:主題名稱,主題區分大小寫

    主題格式就像URL:deviceName/1638791867

    1. +:表示任意匹配某一級主題,例如deviceName/+/weaved可以匹配deviceName/1638791867/weaved,但是無法匹配deviceName/1638791867/weaving
    2. #:表示匹配多級,例如deviceName/#可以匹配deviceName/1638791867/weaved
    3. $:是為 MQTT 代理的內部統計信息保留的,客戶端無法向這些主題發布消息
  • QOS:服務級別質量,有3 個 QoS 級別

    1. 最多一次 (0)

      只會傳輸一次,不能保證對方一定會收到

      image-20220417161252851

    2. 至少一次 (1)常用

      至少保證對方能夠收到一次消息,獲得接收方發來的 PUBACK數據包,如果發送方在合理的時間內未收到 PUBACK 數據包,則發送方將重新發送 PUBLISH 數據包

      image-20220417161340386

    3. 正好一次 (2)

      QoS 2 是最安全、最慢的服務質量級別,由發送方和接收方之間的至少兩個請求/響應流(四部分握手)提供。

      (1)、當接收方從發送方獲取 QoS 2 PUBLISH 數據包時,它會相應地處理發布消息,並使用確認 PUBLISH 數據包的PUBREC 數據包回復發送方。如果發送方未從接收方獲取 PUBREC 數據包,它將再次發送帶有重復 (DUP) 標志的 PUBLISH 數據包,直到收到確認。

      (2)、接收方收到 PUBREC 數據包,發送方就可以安全地丟棄初始 PUBLISH 數據包。

      (3)、發送方存儲來自接收方的 PUBREC 數據包,並使用PUBREL數據包進行響應

      (4)、接收方獲得 PUBREL 數據包后,它可以丟棄所有存儲的狀態並使用PUBCOMP數據包進行應答

      image-20220417162001595

    如果數據包在此過程中丟失,發件人負責在合理的時間內重新傳輸消息

  • retainFlag:消息是否由代理保存為指定主題的最后一個已知正確值。當新客戶端訂閱某個主題時,它們會收到保留在該主題上的最后一條消息

    保留的消息可幫助新訂閱的客戶端在訂閱主題后立即獲取狀態更新,而不需要等到客戶端下一次推送消息。保留的消息消除了等待發布客戶端發送下一個更新的時間

  • payload:消息的實際內容包含圖像,任何編碼的文本,加密數據以及二進制的數據

  • dupFlag:標志指示郵件是重復的,這個重復發送跟QoS大於0的時候有關

客戶端將消息發送到 MQTT代理進行發布時,代理將讀取消息,確認消息(根據 QoS 級別),並處理消息。代理的處理包括確定哪些客戶端訂閱了主題並向它們發送消息

MQTT訂閱機制

MQTT客戶端發送了消息。如果沒人接收消息將毫無意義,所以也會有客戶端來訂閱消息,客戶端會向 MQTT 代理發送一條 SUBSCRIBE消息

Subscribe消息包含以下內容:

  • packetID:數據包標識符在消息在客戶端和代理之間流動時唯一標識消息。數據包標識符僅與大於零的 QoS 級別相關

  • 訂閱列表:一個 SUBSCRIBE 消息可以包含一個客戶端的多個訂閱,每個訂閱都由一個主題和一個 QoS 級別組成

MQTT訂閱確認

為了確認每個訂閱,代理向客戶端發送 SUBACK確認消息

SUBACK消息包含以下內容:

  • packetID:數據包標識符在消息在客戶端和代理之間流動時唯一標識消息
  • rerurnCode:每訂閱一個主題發送一個返回代碼
返回代碼 返回代碼響應
0 成功 - 最大 QoS 0
1 成功 - 最大 QoS 1
2 成功 - 最大 QoS 2
128 失敗

客戶端成功發送 SUBSCRIBE 消息並接收 SUBACK 消息后,它將獲取與 SUBSCRIBE 消息包含的訂閱中的主題匹配的每個已發布消息

MQTT取消訂閱

消息可以訂閱那么也可以取消訂閱,會刪除代理上客戶端的現有預訂

Unsubscribe消息包含以下內容:

  • packetID:數據包標識符在消息在客戶端和代理之間流動時唯一標識消息
  • List of Topic(主題列表):主題列表可以包含多個客戶要取消訂閱的主題。只需發送主題

MQTT確認取消訂閱

要確認取消訂閱,代理會向客戶端發送 Unsuback確認消息

Unsuback消息包含以下內容:

  • packetID:數據包標識符在消息在客戶端和代理之間流動時唯一標識消息,這與取消訂閱消息中的數據包標識符相同

三、MQTT基本功能

持久會話

客戶端需要連接到代理並且訂閱主題,但是客戶端和代理之間如果連接在非持久會話中中斷,那么主題會丟失,需要在重新連接時再次訂閱。為了避免這個問題可以使用持久會話功能,它主要是在代理中存儲了:

  • 客戶端的會話以及訂閱
  • QOS為1和2中沒有確認的消息
  • 客戶端在斷聯時候錯過的消息
  • 客戶端接收到的所有尚未完全確認的 QoS 2 消息

為了開啟代理上的持久會話,在MQTT客戶端連接到代理服務器的時候有個cleanSession字段設置為false表示開啟持久會話,所有信息和消息都將保留,代理存儲會話,直到客戶端重新聯機並收到消息,如果長時間不聯機,那么會消耗內存

客戶端上的持久會話,當客戶端請求服務器保存會話數據時,客戶端負責存儲以下信息:

  • QoS 1 或 2 流中尚未由代理確認的所有消息
  • 從代理接收到的所有尚未完全確認的 QoS 2 消息

四、MQTT Demo

搭建MQTT服務器

官方文檔:產品概覽 | EMQX 文檔

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 平台開發的開源物聯網 MQTT 消息服務器。

Erlang/OTP是出色的軟實時 (Soft-Realtime)、低延時 (Low-Latency)、分布式 (Distributed)的語言平台。

MQTT 是輕量的 (Lightweight)、發布訂閱模式 (PubSub) 的物聯網消息協議。

EMQX 設計目標是實現高可靠,並支持承載海量物聯網終端的 MQTT 連接,支持在海量物聯網設備間低延時消息路由:

  1. 穩定承載大規模的 MQTT 客戶端連接,單服務器節點支持 200 萬連接。
  2. 分布式節點集群,快速低延時的消息路由。
  3. 消息服務器內擴展,支持定制多種認證方式、高效存儲消息到后端數據庫。
  4. 完整物聯網協議支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有協議支持

使用Docker安裝EMQX

1、獲取Docker鏡像

docker pull emqx/emqx:4.4.3

image-20220419101256317

2、啟動Docker

docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.4.3

3、訪問Web管理控制台

控制台地址: http://XXXXXX:18083,默認用戶: admin,密碼:public

image-20220419101524738

各個服務端口說明:
1883:MQTT 協議端口
8883:MQTT/SSL 端口
8083:MQTT/WebSocket 端口
8080:HTTP API 端口
18083:Dashboard 管理控制台端口

搭建MQTT消息推送客戶端

引入相關依賴包

  <dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <optional>true</optional>
        </dependency>
    </dependencies>

MQTT客戶端

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

/**
 * 消息推送客戶端
 *
 * @author yanglingcong
 */
@Slf4j
@Component
public class MyMqttClient {

    private final static int QOS_1 = 1;

    private final static String USER_NAME = "ylc";

    private final static int PASSWORLD = 123456;

    private final static int KEEP_ALIVE = 60;

    /**
     * 連接地址
     * */
    public static final String HOST = "tcp://XXXXX:1883";

    /**
    * 訂閱主題
    * */
    public static final String TOPIC = "deviceName/";

    //客戶端唯一ID
    private static final String clientid = "pubClient";


    public static void main(String[] args) {
        MqttClient mqtt = createMqtt();
        publishMessage("Hello", TOPIC, mqtt);
    }

    public static MqttClient createMqtt() {
        MqttClient client = null;

        MqttConnectOptions connectOptions = new MqttConnectOptions();
        //斷開之后自動重聯
        connectOptions.setAutomaticReconnect(true);
        //設置會話心跳時間 代理和客戶端在不發送消息的情況下可以忍受的最長時間
        connectOptions.setKeepAliveInterval(KEEP_ALIVE);
        //不建立持久會話
        connectOptions.setCleanSession(true);
        //用戶名
        connectOptions.setUserName(USER_NAME);
        //密碼
        connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());
        try {
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            //MQTT連接
            client.connect(connectOptions);
            //消息回調
            client.setCallback(new MqttCallBackHandle(client));
        } catch (MqttException e) {
            log.warn("MQTT消息異常{}", e);

        }
        return client;

    }

    /**
     * 消息推送
     *
     * @param message 消息內容
     * @param topic   發送的主題
     * @author yanglingcong
     * @date 2022/4/18 21:25
     */
    public static void publishMessage(String message, String topic, MqttClient mqttClient) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(QOS_1);
        //保留在該主題上的最后一條消息
        //mqttMessage.setRetained(true);
        mqttMessage.setPayload(message.getBytes());
        try {
            mqttClient.publish(topic, mqttMessage);
            log.info("MQTT消息發送成功:{}", message);
        } catch (MqttException e) {
            log.warn("MQTT消息推送失敗");
            e.printStackTrace();
        }
    }

}

MQTT回調接口

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;

/**
 * MQTT消息回調方法
 */
@Slf4j
public class MqttCallBackHandle implements MqttCallbackExtended {


    private MqttClient client;

    public  MqttCallBackHandle(MqttClient client){
        this.client=client;
    }

    //訂閱主題
    private final static String CMD_TOP_FORMAT = "deviceName/";

    /**
     * 連接成功后調用該方法
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        try {
            //重新訂閱主題
            client.subscribe(CMD_TOP_FORMAT);
            log.info("=====MQTT重聯成功=====");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /** 
     * 斷開連接后回調方法
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("=====MQTT連接斷開=====");
    }

    /**
     * 接收訂閱到的消息
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("=====MQTT消息訂閱成功=====");
        log.info("主題:{},內容:{}",topic,message);
    }

    /**
     * 發送完成
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====MQTT消息發送完畢=====");
    }
}

搭建MQTT消息訂閱客戶端

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

/**
 * 消息訂閱客戶端
 *
 * @author yanglingcong*/
@Component
@Slf4j
public class MyMqttSubClient {

    private final static int QOS_1 = 1;

    private final static String USER_NAME = "ylc";

    private final static int PASSWORLD = 123456;

    private final static int KEEP_ALIVE = 60;

    //連接地址
    public static final String HOST = "tcp://xxxx:1883";

    // 訂閱主題
    public static final String TOPIC = "deviceName/";

    //客戶端唯一ID
    private static final String clientid = "subClient";


    public static void main(String[] args) {
        subscribe();
    }

    public MyMqttSubClient() throws MqttException {
        //訂閱
        subscribe();
    }

    public  static void subscribe()  {
        MqttClient client=null;

        MqttConnectOptions connectOptions=new MqttConnectOptions();
        //斷開之后自動重聯
        connectOptions.setAutomaticReconnect(true);
        //設置會話心跳時間 代理和客戶端在不發送消息的情況下可以忍受的最長時間
        connectOptions.setKeepAliveInterval(KEEP_ALIVE);
        //不建立持久會話
        connectOptions.setCleanSession(true);
        //用戶名
        connectOptions.setUserName(USER_NAME);
        //密碼
        connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());

        try {
            client=new MqttClient(HOST,clientid, new MemoryPersistence());
            //MQTT連接
            client.connect(connectOptions);

        } catch (MqttException e) {
            e.printStackTrace();
        }
        //消息回調
        client.setCallback(new MqttCallBackHandle(client));


        try {
            client.subscribe(TOPIC,QOS_1);
        } catch (MqttException e) {
            log.warn("MQTT消息訂閱異常{}",e);
            e.printStackTrace();
        }
    }
}

環境測試

image-20220419115315933

1、MQTT客戶端pubClient向服務器推送消息

image-20220419120031352

2、MQTT客戶端subClient從服務器訂閱消息

image-20220419120043822

3、踢除客戶端,會自動重聯,因為設置了MQTT斷開自動重聯

image-20220419120159757

五、MQTT常見問題

MQTT消息持久化

如果 cleanSession 設為true,一旦掉線客戶端不會存儲任何內容,並清除以前任何持久會話中的所有信息

如果 cleanSession 設為false,重連后可以接收之前訂閱主題的消息,還有離線時期未接收的消息

MQTT訂閱恢復機制

MQTT掉線設置自動重聯之后,無法再進行訂閱。MqttCallbackExtended接口有一個connectComplete方法用於重新訂閱主題

MQTT和消息隊列的區別

  • 消息隊列可以存儲消息,直到被消費為止

  • 消息隊列只能被消費處理一次,不像MQTT訂閱的人都可以收到消息

  • 消息隊列需要先創建隊列,MQTT可以使用時候創建

  • MQTT是一種通信協議,MQ是消息通道

  • MQTT面向海量設備連接、MQ是面向海量數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM