消息中間件技術 - 淺談mqtt協議及其實現


作者:carter(佘虎),轉載請注明出處,特別說明:本博文來自博主原博客,為保證新博客中博文的完整性,特復制到此留存,如需轉載請注明新博客地址即可。

1.1概念

MQTT(MQ Telemetry Transport) 消息隊列遙測傳輸協議是IBM開發的一種網絡應用層的協議,提供輕量級的,支持可發布/可訂閱的的消息推送模式,使設備對設備之間的短消息通信變得簡單,比如現在應用廣泛的低功耗傳感器,手機、嵌入式計算機、微型控制器,衛星等移動設備。

1.2優點

1.2.1非常低的通信開銷

MQTT 的獨特之處在於,它的每消息標題可以短至 2 個byte。MQ 和 HTTP 都擁有高得多的每消息開銷。對於 HTTP,為每個新請求消息重新建立 HTTP 連接會導致重大的開銷。MQ 和 MQTT 所使用的永久連接顯著減少了這一開銷。

1.2.2低功耗,省電

您需要能夠及時地將通知傳遞給客戶。為此,必須采用某種定期輪詢或推送方法;從電池、系統負載和帶寬角度講,推送是最佳解決方案。MQTT 是專門針對低功耗目標而設計的。HTTP 的設計沒有考慮此因素,因此增加了功耗。

1.2.3單機百萬級並發

在 HTTP 堆棧上,維護數百萬個並發連接,需要做許多的工作來提供支持。盡管可以實現此支持,但大多數商業產品都為處理這一數量級的永久連接而進行了優化。IBM 提供了 IBM MessageSight,這是一個單機架裝載服務器,經過測試能處理多達 100 萬個通過 MQTT 並發連接的設備。相反,MQ 不是為大量並發客戶端而設計的。

1.2.4對網絡環境的容忍度

MQTT提供三種不同消息傳遞等級,讓消息能按需到達目的地,適應在不穩定工作的網絡傳輸需求。MQTT 和 MQ 能夠從斷開等故障中恢復,而且沒有進一步的代碼需求。但是,HTTP 無法原生地實現此目的,需要客戶端重試編碼,這可能增加冪等性問題。

1.2.5客戶端多平台支持

支持各種流行編程語言(包括C,Java,Ruby,Python 等等)且易於使用的客戶端。

1.2.6發布/訂閱模式,開發簡易

支持發布 / 訂閱模型,簡化應用程序的開發。

1.2.7推送通知

企業可能需要在沒有第三方中介的情況下發送敏感的信息。這降低了特定於操作系統的解決方案(比如 Apple iOS、Google Play 通知)作為主要傳輸機制的價值。

HTTP 只允許使用一種稱為COMET 的方法,使用持久的 HTTP 請求來執行推送。從客戶端和服務器的角度講,此方法都很昂貴。MQ 和 MQTT 都支持推送,這是它們的一個基本特性。

1.2.8防火牆容錯

一些企業防火牆將出站連接限制到一些已定義的端口。這些端口通常被限制為 HTTP(80 端口)、HTTPS(443 端口)等。HTTP 顯然可以在這些情況下運行。MQTT 可封裝在一個 WebSockets 連接中,顯示為一個 HTTP 升級請求,從而允許在這些情況下運行。MQ 不允許采用這種模式。

 

1.3缺點

由於MQTT本身的各項技術優勢,越來越多的企業傾向於選用MQTT作為物聯網產品通訊的標准協議,也因此,工程師們漸漸發現MQTT協議要想大規模商用,也有一些有待完善的功能。比如:

1.3.1沒有齊備的SDK

不同的異構終端,需要有對應的與MQTT服務器通信的軟件SDK包,比如MCU、Linux、Android、IOS、WEB等之間要實現互聯互通必然需要不同的SDK包。

1.3.2不支持File和AV

有些應用場景,需要傳輸的信息可能不僅僅限於指令,比如聲音信號和視頻信號,這些需要通過File和AV來實現通信。

1.3.3不支持與第三方的HTTP集成

雖然MQTT協議優於普通的HTTP協議,但是基於傳統的HTTP協議的WEB服務器仍然占主流市場,那么這些服務器要實現與MQTT協議的互聯互通,以降低升級成本也尤為關鍵。

1.3.4不支持用戶管理接口

用戶在進行設備的行為數據分析的時候,顯得尤為重要,這又是工業4.0、大數據時代的必然需求。

1.3.5原生不支持離線

消息彌補設備離線以后,MQTT服務器對設備的控制信息丟失的問題。

(解決方案:https://helpcdn.aliyun.com/document_detail/59914.html)

1.3.6不支持點對點通信

采用標准的MQTT協議,理論上可以通過相互訂閱的方式實現點對點通信,但是邏輯相對復雜,並且對設備的安全性方面存在擔憂。當設備B和設備C在同一主題的情況下,設備A無法知道是設備B還是設備C發送的消息,也有可能消息被設備D竊聽。

1.3.7不支持群通信和群管理

實現了對群組成員的管理,群組成員之間能互通消息,這在一個設備被多人控制,或者多個設備被一人控制的這種場景下,尤為有用。

 

1.4實現

1.4.1 交互圖

 

怎么樣,是不是一目了然,非常簡單

1.4.2 MQTT代理

1.4.2.1 mosquitto

1.4.2.2 EMQ

1.4.2.3 HiveMQ

1.4.2.4 ActiveMQ

1.4.2.5 mosca

1.4.3 MQTT客戶端

1.4.3.1 eclipse Paho      (支持C,C++, JavaJavaScriptPython, Go, C#)

1.4.3.2 M2MQTT     (C#)

1.4.3.3 Fusesource MQTTClient     (Java)

1.4.3.4 MQTT.js    (javascript)

1.4.3.5 Libmosquitto    (c/c++)

1.4.3.6 Twisted

 

1.  Mosquito

2.1 簡介

一款實現了消息推送協議 MQTT v3.1 的開源消息代理軟件,提供輕量級的,支持可發布/可訂閱的的消息推送模式,使設備對設備之間的短消息通信變得簡單,比如現在應用廣泛的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備。一個典型的應用案例就是 Andy Stanford-ClarkMosquitto(MQTT協議創始人之一)在家中實現的遠程監控和自動化。並在 OggCamp 的演講上,對MQTT協議進行詳細闡述。

1.2         官網

https://mosquitto.org/

1.3         優點

2.3.1使用簡單

2.3.2 網絡資料豐富

 

1.4         缺點

2.4.1 沒有可視化管理后台

2.4.2 商用實例不多

2.5 安裝及踩坑

1.源碼包下載:http://mosquitto.org/files/source/

或者wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz

版本:mosquitto-1.4.tar.gz

解壓:tar -zxvf mosquitto-1.4.tar.gz

進入目錄:cd mosquitto-1.4

2.編譯安裝

打開配置文件,去掉暫且不需要的功能:

vi config.mk

如:WITH_TLS,WITH_TLS_PSK, WITH_SRV, WITH_WEBSOCKETS, WITH_SOCKS, WITH_UUID等

保存退出:wq

安裝mosquitto

make

make install

踩過的坑:

a】編譯找不到openssl/ssl.h

  安裝openssl     sudo apt-get install libssl-dev

【b】編譯過程找不到ares.h

sudo apt-get install libc-ares-dev

【c】編譯過程找不到uuid/uuid.h

sudo apt-get install uuid-dev

【d】使用過程中找不到libmosquitto.so.1

error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory

    【解決方法】——修改libmosquitto.so位置

# 創建鏈接

sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1

# 更新動態鏈接庫

sudo ldconfig

【e】make: g++:命令未找到  

    【解決方法】

    安裝g++編譯器

sudo apt-get install g++

 

啟動 mosquitto broker

mosquitto -c /etc/mosquitto/mosquitto.conf.example &

-c : specify the broker config file.

 -d : put the broker into the background after starting.

 -h : display this help.

 -p : start the broker listening on the specified port.

      Not recommended in conjunction with the -c option.

 -v : verbose mode - enable all logging types. This overrides

      any logging options given in the config file.

訂閱消息:

./mosquitto_sub  -h 127.0.0.1 -p 1883 -t "/sports/wordcup"

 

發布消息:

./mosquitto_pub -h 127.0.0.1 -p 1883 -t "/sports/wordcup " -m "this is carter hello"

 

或者

./mosquitto_sub  -h 10.129.4.12 -p 1883 -t "/sports/wordcup"

./mosquitto_pub -h 10.129.4.12 -p 1883 -t "/sports/wordcup" -m "this is carter hello 666"

 

外網地址不行,所以我在本機用paho代碼一直報timeOut異常,原因是服務器防火牆未放開端口

【解決辦法】

放開防火牆端口:firewall-cmd --add-port=1883/tcp –permanent

重啟防火牆:systemctl restart firewalld

2.6         java客戶端實現(eclipse.paho)

2.6.1 添加依賴

Java客戶端實現

采用eclipse.paho框架

新建maven工程,加入依賴

 
<!-- spring整合mqtt 開始-->

<dependency>

   <groupId>org.springframework.integration</groupId>

   <artifactId>spring-integration-mqtt</artifactId>

   <version>4.1.0.RELEASE</version>

   <exclusions>

      <exclusion>

         <groupId>org.eclipse.paho</groupId>

         <artifactId>mqtt-client</artifactId>

      </exclusion>

   </exclusions>

</dependency>

<!-- spring整合mqtt 結束-->

<!-- mqtt依賴 開始 -->

<dependency>

   <groupId>org.eclipse.paho</groupId>

   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

   <version>1.2.0</version>

</dependency>

<dependency>

   <groupId>org.eclipse.paho</groupId>

   <artifactId>mqtt-client</artifactId>

</dependency>

<!-- mqtt依賴 結束 -->

2.6.2 發布消息

public class ServerMQTT {

    //tcp://MQTT安裝的服務器地址:MQTT定義的端口號

    public static final String HOST = "tcp://111.9.116.136:1883";

    //定義一個主題

    public static final String TOPIC = "pos_message_all";

    //定義MQTT的ID,可以在MQTT服務配置中指定

    private static final String clientid = "server11";


    private MqttClient client;

    private MqttTopic topic11;

    private String userName = "mosquitto";  //非必須

    private String passWord = "";  //非必須

    private MqttMessage message;

    /**

     * 構造函數

     * @throws MqttException

     */

    public ServerMQTT() throws MqttException {

        // MemoryPersistence設置clientid的保存形式,默認為以內存保存

        client = new MqttClient(HOST, clientid, new MemoryPersistence());

        connect();

    }

    /**

     *  用來連接服務器

     */

    private void connect() {

        MqttConnectOptions options = new MqttConnectOptions();

        options.setCleanSession(false);

        options.setUserName(userName);

        options.setPassword(passWord.toCharArray());

        // 設置超時時間

        options.setConnectionTimeout(10);

        // 設置會話心跳時間

        options.setKeepAliveInterval(20);

        try {

            client.setCallback(new PushCallBack());

            client.connect(options);

            topic11 = client.getTopic(TOPIC);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    /**

     * @param topic

     * @param message

     * @throws MqttPersistenceException

     * @throws MqttException

     */

    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,

            MqttException {

        MqttDeliveryToken token = topic.publish(message);

        token.waitForCompletion();

        System.out.println("message is published completely! "

                + token.isComplete());

    }

    /**

     *  啟動入口

     * @param args

     * @throws MqttException

     */

    public static void main(String[] args) throws MqttException {

        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();

        server.message.setQos(1);  //保證消息能到達一次

        server.message.setRetained(true);

        server.message.setPayload("I love this Summer 8888".getBytes());

        server.publish(server.topic11 , server.message);

        System.out.println(server.message.isRetained() + "------ratained狀態");

    }

}

 

publish –> pushCallBack.deliveryComplete()

2.6.3 訂閱消息

/**

 * 模擬一個客戶端接收消息

 */

public class ClientMQTT {

    public static final String HOST = "tcp://111.9.116.136:1883";

    public static final String TOPIC1 = "pos_message_all";

    private static final String clientid = "client11";

    private MqttClient client;

    private MqttConnectOptions options;

    private String userName = "admin";    //非必須

    private String passWord = "password";  //非必須

    @SuppressWarnings("unused")

    private ScheduledExecutorService scheduler;

    /**

     * $SYS中各主題說明如下:

     $SYS/broker/load/connections/+     不同時間段內服務器接收到的connections包的平均數。最后的“+”可是1min,5min,15min。分別表示1分鍾,5分鍾,15分鍾的平均數。

     $SYS/broker/load/bytes/received/+     不同時間段內服務器接收數據的平均字節數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/bytes/sent/+     不同時間段內服務器發送數據的平均字節數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/messages/received/+     不同時間段內服務器接收到的所有類型消息的平均數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/messages/sent/+     不同時間段內服務器發送的所有類型的消息的平均數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/publish/dropped/+     不同時間段內服務器丟棄的消息的平均數,這表明了那些持久連接但與服務器斷開的客戶端失去消息的速率。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/publish/received/+     不同時間段內服務器接收的發布消息的平均數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/publish/sent/+     不同時間段內服務器發送的發布消息的平均數。最后的“+”可是1min,5min,15min。

     $SYS/broker/load/sockets/+     不同時間段內服務器打開的socket連接的平均數。最后的“+”可是1min,5min,15min。

     $SYS/broker/messages/inflight     等待確認的Qos>0的消息的數量。

     $SYS/broker/messages/received     自服務器啟動以來接收的所有類型的消息總數。

     $SYS/broker/messages/sent     自服務器啟動以來發送的所有類型的消息總數。

     $SYS/broker/messages/stored     服務器存儲的消息的總數,包括保留消息和持久連接客戶端的消息隊列中的消息數。

     $SYS/broker/publish/messages/dropped     由於inflight/queuing限制而直接丟棄的消息的總數,相關設置請查看mosquitto.conf中max_inflight_messages 和max_queued_messages參數。

     $SYS/broker/publish/messages/received     自服務器啟動以來接收的發布消息的總數。

     $SYS/broker/publish/messages/sent     自服務器啟動以來發送的發布消息的總數。

     $SYS/broker/retained messages/count     服務器保留的消息總數。

     $SYS/broker/subscriptions/count     服務器訂閱主題總數。

     $SYS/broker/timestamp     Mosquitto軟件build的詳細時間(Static)。

     $SYS/broker/uptime     Mosquitto啟動時長(單位:秒)。

     $SYS/broker/version     Mosquitto軟件版本號(Static)。

     */

    public static final String TOPIC2 = "$SYS/broker/bytes/received";  //自服務器啟動以來共接收的字節數

    public static final String TOPIC3 = "$SYS/broker/bytes/sent";  //自服務器啟動以來共發送的字節數

    public static final String TOPIC4 = "$SYS/broker/clients/expired";  //超過有效期被斷開連接的客戶端數量,有效期通過persistent_client_expiration參數設置。

    public static final String TOPIC5 = "$SYS/broker/clients/disconnected";  //自服務器啟動以來斷開的連接數

    public static final String TOPIC6 = "$SYS/broker/clients/maximum";  //服務器同一時間連接的最大客戶端數量

    public static final String TOPIC7 = "$SYS/broker/clients/total";  //有效和無效連接、注冊到服務器上的總數。

    public static final String TOPIC8 = "$SYS/broker/connection/#";  //如果服務器設置了橋接,系統會提供一個主題來標識連接狀態,默認使用$SYS/broker/connection/,如果主題值為1表示連接激活,如果為0表示連接沒有激活。

    public static final String TOPIC9 = "$SYS/broker/heap/current size";  //Mosquitto正在使用的堆內存大小。注意這個主題是否可以使用取決於系統編譯時的相關參數設置。

    public static final String TOPIC10 = "$SYS/broker/heap/maximum size";  //Mosquitto使用的最大堆內存。這個參數是否有效也取決於系統編譯時的相關參數設置。

    private void start() {

        try {

            // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存

            client = new MqttClient(HOST, clientid, new MemoryPersistence());

            // MQTT的連接設置

            options = new MqttConnectOptions();

            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,設置為true表示每次連接到服務器都以新的身份連接

            options.setCleanSession(false);

            // 設置連接的用戶名

            options.setUserName(userName);

            // 設置連接的密碼

            options.setPassword(passWord.toCharArray());

            // 設置超時時間 單位為秒

            options.setConnectionTimeout(10);

            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制

            options.setKeepAliveInterval(20);

            // 設置回調

            client.setCallback(new PushCallBack());

            MqttTopic topic = client.getTopic(TOPIC1);

            //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息

            options.setWill(topic, "close".getBytes(), 2, true);//遺囑

            client.connect(options);

            //訂閱消息

            int[] Qos  = {1,0,2,1,0,2,1,0,2};

            String[] topic1 = {TOPIC2,TOPIC3,TOPIC4,TOPIC5,TOPIC6,TOPIC7,TOPIC8,TOPIC9,TOPIC10};

//            int[] Qos  = {1};

//            String[] topic1 = {TOPIC1};

            client.subscribe(topic1, Qos);

        } catch (Exception e) {

            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws MqttException {

        ClientMQTT client = new ClientMQTT();

        client.start();

    }

}

Client.start()->pushCallBack.messageArrived()

2.6.4回調函數實現mqttCallBack接口

PushCallBack 必須實現 MqttCallback 接口
有三個方法:
public void connectionLost(Throwable cause) {

    // 連接丟失后,一般在這里面進行重連

    System.out.println("連接斷開,可以做重連");

}

public void deliveryComplete(IMqttDeliveryToken token) {

    System.out.println("deliveryComplete---------" + token.isComplete() +token.getMessageId());

}


public void messageArrived(String topic, MqttMessage message) throws Exception {

    // subscribe后得到的消息會執行到這里面

    System.out.println("接收消息主題 : " + topic);

    System.out.println("接收消息Qos : " + message.getQos());

    System.out.println("接收消息內容 : " + new String(message.getPayload()));

}

2.6.5  服務質量等級說明(Qos)

MQTT提供三種Qos的消息傳遞質量:

最多一次(Atmost once delivery):QoS=0,協議對此等級應用信息不要求回應確認,也沒有重發機制,這類信息可能會發生消息丟失或重復,取決於TCP/IP提供的盡最大努力交互的數據包服務。

(0:消息最多被傳遞一次,比如一般類廣告,通知)

最少一次(Atleast once delivery):QoS=1,確保信息到達,但消息重復可能發生,發送者如果在指定時間內沒有收到PUBACK控制報文,應用信息會被重新發送,且控制報文中DUP標志位置1。

(1 :消息會被傳遞但可能會重復傳遞,比如賬戶余額通知)

僅僅一次(Exactlyonce delivery):QoS=2,最高級別的服務質量,消息丟失和重復都是不可接受的。

(2 :消息保證傳遞且僅有一次傳遞,比如交易支付批復通知)

2.6.6 斷開鏈接

pushCallBack.connectionLost()

3    EMQ

3.1 簡介

EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 語言平台開發,支持大規模連接和分布式集群,發布訂閱模式的開源 MQTT 消息服務器。

3.1.1官網

http://www.emqtt.com/docs/v2/index.html

3.1.2優點

3.3.1 商用比較普及,有完備的運營團隊支撐

3.3.2 可視化的管理后台

開放18083端口訪問管理后台

3.3.3 能抗高並發

官方的回復是8核心32G的配置能夠承載160W台設備的鏈接

3.1.3 缺點

3.1.4安裝及發現的問題

nzip emqttd-macosx-v2.0.zip && cd emqttd
# 啟動emqttd  ./bin/emqttd start
# 檢查運行狀態./bin/emqttd_ctl status
# 停止emqttd ./bin/emqttd stop
默認配置文件 /bin/emqenv
[ "x" = "x$EMQ_NODE_NAME" ] && EMQ_NODE_NAME=emqttd@127.0.0.1
[ "x" = "x$EMQ_NODE_COOKIE" ] && EMQ_NODE_COOKIE=emqsecretcookie
[ "x" = "x$EMQ_MAX_PACKET_SIZE" ] && EMQ_MAX_PACKET_SIZE=64KB
[ "x" = "x$EMQ_MAX_PORTS" ] && EMQ_MAX_PORTS=65536
[ "x" = "x$EMQ_TCP_PORT" ] && EMQ_TCP_PORT=1883
[ "x" = "x$EMQ_SSL_PORT" ] && EMQ_SSL_PORT=8883
[ "x" = "x$EMQ_WS_PORT" ] && EMQ_WS_PORT=8083
[ "x" = "x$EMQ_WSS_PORT" ] && EMQ_WSS_PORT=8084
對外暴露的tcp端口依然是1883  和mosquitto一樣

 

3.1.5 操作

在客戶端分別訂閱和發布消息,在管理后台列表可以看到消息的狀態,管理后台默認端口為18083

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM