一篇文章讓您了解MQTT


轉載:https://www.jianshu.com/p/de88edf8e023

什么是MQTT

​ MQTT是基於二進制消息的發布/訂閱編程模式的消息協議,最早由IBM提出的,如今已經成為OASIS規范。由於規范很簡單,非常適合需要低功耗和網絡帶寬有限的IoT場景。

 
MQTT使用場景

與XMPP相比有什么特點

​ 同MQTT類似的是XMPP協議,他們的特點點見下表:

  MQTT XMPP
基於協議層 TCP TCP,也可以基於HTTP
體積 小巧 龐大
適用場景 物聯網 聊天
省流 省流量 費流量
省電 省電 費電
成熟度 不成熟 成熟

發布/訂閱模式

與請求/回答這種同步模式不同,發布/訂閱模式解耦了發布消息的客戶(發布者)與訂閱消息的客戶(訂閱者)之間的關系,這意味着發布者和訂閱者之間並不需要直接建立聯系。打個比方,你打電話給朋友,一直要等到朋友接電話了才能夠開始交流,是一個典型的同步請求/回答的場景;而給一個好友郵件列表發電子郵件就不一樣,你發好電子郵件該干嘛干嘛,好友們到有空了去查看郵件就是了,是一個典型的異步發布/訂閱的場景。

熟悉編程的同學一定非常熟悉這種設計模式了,因為它帶來了這些好處:

  • 發布者與訂閱者不必了解彼此,只要認識同一個消息代理即可。

  • 發布者和訂閱者不需要交互,發布者無需等待訂閱者確認而導致鎖定。

  • 發布者和訂閱者不需要同時在線,可以自由選擇時間來消費消息。

主題

MQTT是通過主題對消息進行分類的,本質上就是一個UTF-8的字符串,不過可以通過反斜杠表示多個層級關系。主題並不需要創建,直接使用就是了。

主題還可以通過通配符進行過濾。其中,+可以過濾一個層級,而#只能出現在主題最后表示過濾任意級別的層級。

舉個例子:

  • building-b/floor-5:代表B樓5層的設備。

  • +/floor-5:代表任何一個樓的5層的設備。

  • building-b/#:代表B樓所有的設備。

注意,MQTT允許使用通配符訂閱主題,但是並不允許使用通配符廣播。

協議介紹

​ MQTT的通信協議並不復雜,最核心的部分,我認為是他的16種控制類型,如下表:

名字 報文流動方向 描述
Reserved 0 禁止 保留
CONNECT 1 客戶端到服務端 客戶端請求連接服務端
CONNACK 2 服務端到客戶端 連接報文確認
PUBLISH 3 兩個方向都允許 發布消息
PUBACK 4 兩個方向都允許 QoS 1消息發布收到確認
PUBREC 5 兩個方向都允許 發布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務端 客戶端訂閱請求
SUBACK 9 服務端到客戶端 訂閱請求報文確認
UNSUBSCRIBE 10 客戶端到服務端 客戶端取消訂閱請求
UNSUBACK 11 服務端到客戶端 取消訂閱報文確認
PINGREQ 12 客戶端到服務端 心跳請求
PINGRESP 13 服務端到客戶端 心跳響應
DISCONNECT 14 客戶端到服務端 客戶端斷開連接
Reserved 15 禁止 保留

​ 我看MQTT協議內容的時候,發現有趣的一點就是他的字符長度是可變的,可以用ASCII,也可以用UTF-8,這個在我接觸的其他協議里面是沒有的,這樣子的好處,顯而易見的就是能減少傳輸流量。

什么是QoS

​ Qos的全稱是服務質量(Quality of Service)。MQTT支持三種QoS,分別是0、1、2。級別越高,交互越復雜,越能保證正確性和到達率,但是開銷也更大。他們的交互流程圖如下:

  • QoS 0: 盡力而為。消息發送者會想盡辦法發送消息,但是遇到意外並不會重試。
 
image
  • QoS 1: 至少一次。消息接收者如果沒有知會或者知會本身丟失,消息發送者會再次發送以保證消息接收者至少會收到一次,當然可能造成重復消息。
 
image
  • QoS 2: 恰好一次。保證這種語義肯定會減少並發或者增加延時,不過丟失或者重復消息是不可接受的時候,級別2是最合適的。


     
    image

​ 服務質量是個老話題了。級別2所提供的不重不丟很多情況下是最理想的,不過往返多次的確認一定對並發和延遲帶來影響。級別1提供的至少一次語義在日志處理這種場景下是完全OK的,所以像Kafka這類的系統利用這一特點減少確認從而大大提高了並發。級別0適合雞肋數據場景,食之無味棄之可惜,就這么着吧。

相關閱讀

Paho UML

 
paho uml.png

說明:

  1. 上面三個類(MqttAndroidClient, MqttService, MqttConnection)是存在於android.paho庫中,也就是說,這三個類是paho基於android的封裝,而其余的類都是封裝在java.paho中,處於底層庫。

  2. 中間的縱軸線從上往下,MqttService,MqttConnection, MqttAsyncClient, ClientComms,這四個類都具備connect, publish, subscribe等開放給上層的基礎方法,因為上層的調用都是通過這四個類傳遞到底層。

  3. Paho運行的核心在於CommsCallback, CommsSender和CommsReceiver三個線程,分別用來做觸發回調、發送消息和接收消息三個動作。

  4. ClientState和CommsTokenStore維護着一套復雜的隊列,三個線程通過使用這兩個類,來實現線程間的同步和消息的排隊。

  5. ClientComms的功能除了基礎方法接口之外,還維護底層三個線程的狀態,這些狀態主要包括:

    1. connected

    2. connecting

    3. disconnected

    4. disconnecting

    5. closed

    6. resting

  6. MqttAsyncClient的功能除了基礎方法接口之外,主要負責連接的重連,通過下面介紹的流程分析可以看出來,mqtt的重連機制就是在MqttAsynClient發起的。

  7. MqttConnection是一個獨立的mqtt連接,用於維護當前連接的所有狀態。

流程分析

因為整個庫的代碼量比較大,我這里就不把代碼貼出來了,這是我通過閱讀代碼整理出來的所有關鍵方法的流程,包括connect, publish 和reconnect,希望能對讀者您有幫助。

  • connect()
  1. MqttAndroidClient.bindService()

  2. MqttService.connect()

    1. MqttService.createConnection is not exists
  3. MqttConnection.connect()

    1. if option.isCleanSession(), then discard old data (we set cleanSession true)

    2. callback action connect to activity when start connect fail, result fail or success

    3. if MqttAsyncClient created

      1. if isConnecting, return

      2. if is connected, doAfterConnectSuccess

      3. else MqttAsyncClient.connect

    4. new MqttAsyncClient and connect

  4. MqttAsyncClient.connect()

    1. judge if need connect by ClientComms

      1. if connected, throw exception

      2. if connecting, throw exception

      3. if disconnected, throw exception

      4. if closed, throw exception

    2. ClientComms.setReconnectCallback if options set automaticReconnect

    3. new MqttToken as userToken

    4. new ConnectActionListener and set comms, userToken

  5. ConnectActionListener.connect()

    1. persistence.open

    2. comms.connect

  6. ClientComms.connect()

    1. if not disconnected or is closing, throw exception

    2. tokenStore.open(), just set closeResponse null

    3. ConnectBG.run

  7. ConnectBG.run()

    1. CommsReceiver.start()

      1. runningSemaphore.acquire(), ensure just one receiver thread is running

      2. while in.available && message instance of MqttAck

      3. tokenStore.getToken

        1. if token != null, synchronized token and clientState.notifyReceivedAck

        2. else if instance of MqttPubRec MqttPubComp MqttPubAck, log itthese signal only receive after send

        3. else throw exception

    2. CommsSender.start()

      1. runningSemaphore.acquire(), ensure just one sender thread is running

      2. await clientState.pendingMessages.removeElementAt(0)

      3. out.write(message)

    3. CommsCallback.start()

      1. runningSemaphore.acquire(), ensure just one callback thread is running

      2. workAvailable.wait()

      3. check if exists complete token, if true then handleActionComplete

        1. if token.isComplete, notifyComplete

        2. token.internalTok.notifyComplete

        3. if token.isNotified

          1. if mqttCallback != null and is DeliveryToken, notify deliveryComplete

          2. if token.getActionCallback != null, notify onSuccess or onFail

        4. token.setNotifiyed(true)

      4. check if exists MqttPublish, if true then handleMessage(MqttPublish)this happens when register topic, not exists in DataCollector

    4. internalSend

      1. clientState.send()
  8. clientState.send()

    1. token.setMessageId()

    2. if MqttPublish

      1. tokenStore.saveToken

      2. pedingMessages.addElement

      3. queueLock.notifyAll()

    3. if MqttConnect

      1. tokenStore.saveToken

      2. pedingFlows.insertElementAt(0)

      3. queueLock.notifyAll()

  • publish()
  1. MqttAndroidClient.publish()

  2. MqttService.publish()

  3. MqttConnection.publish()

    1. if MqttAsyncClient.isConnected

      1. MqttAsyncClient.publish

      2. storeSendDetails

    2. else, callbackToActivity error

  4. MqttAsyncClient.publish()

    1. new MqttPublish

    2. comms.sendNoWait

  5. ClientComms.sendNoWait()

    1. if connected or trying to connect or trying to disconnect

      1. if disconnectedBuffer is not empty, put message to thatafter reconnect, it will send disconnectBuffer message first, before send real-time message

      2. internalSend()

        1. token.setClient

        2. clientState.send

    2. else if disconnectedBuffer is not null, put message to that

    3. else throw exception

  6. ClientState.send()go to connect

  • Reconnect automatic
  1. ClientComms.shutdownConnection()

  2. CommsCallback.connectionLost()

  3. MqttAsyncClient.MqttReconnectCallback.connectinLost()

    1. startReconnectCycle

    2. run ReconnectTask after 1s

    3. attempReconnect()

  4. MqttAsyncClient.connect()go to connect .4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM