轉載:https://www.jianshu.com/p/de88edf8e023
什么是MQTT
MQTT是基於二進制消息的發布/訂閱編程模式的消息協議,最早由IBM提出的,如今已經成為OASIS規范。由於規范很簡單,非常適合需要低功耗和網絡帶寬有限的IoT場景。
與XMPP相比有什么特點
同MQTT類似的是XMPP協議,他們的特點點見下表:
| MQTT | XMPP | |
|---|---|---|
| 基於協議層 | TCP | TCP,也可以基於HTTP |
| 體積 | 小巧 | 龐大 |
| 適用場景 | 物聯網 | 聊天 |
| 省流 | 省流量 | 費流量 |
| 省電 | 省電 | 費電 |
| 成熟度 | 不成熟 | 成熟 |
發布/訂閱模式
與請求/回答這種同步模式不同,發布/訂閱模式解耦了發布消息的客戶(發布者)與訂閱消息的客戶(訂閱者)之間的關系,這意味着發布者和訂閱者之間並不需要直接建立聯系。打個比方,你打電話給朋友,一直要等到朋友接電話了才能夠開始交流,是一個典型的同步請求/回答的場景;而給一個好友郵件列表發電子郵件就不一樣,你發好電子郵件該干嘛干嘛,好友們到有空了去查看郵件就是了,是一個典型的異步發布/訂閱的場景。
熟悉編程的同學一定非常熟悉這種設計模式了,因為它帶來了這些好處:
-
發布者與訂閱者不必了解彼此,只要認識同一個消息代理即可。
-
發布者和訂閱者不需要交互,發布者無需等待訂閱者確認而導致鎖定。
-
發布者和訂閱者不需要同時在線,可以自由選擇時間來消費消息。
主題
MQTT是通過主題對消息進行分類的,本質上就是一個UTF-8的字符串,不過可以通過反斜杠表示多個層級關系。主題並不需要創建,直接使用就是了。
主題還可以通過通配符進行過濾。其中,+可以過濾一個層級,而#只能出現在主題最后表示過濾任意級別的層級。
舉個例子:
-
building-b/floor-5:代表B樓5層的設備。
-
+/floor-5:代表任何一個樓的5層的設備。
-
building-b/#:代表B樓所有的設備。
注意,MQTT允許使用通配符訂閱主題,但是並不允許使用通配符廣播。
協議介紹
MQTT的通信協議並不復雜,最核心的部分,我認為是他的16種控制類型,如下表:
| 名字 | 值 | 報文流動方向 | 描述 |
|---|---|---|---|
| Reserved | 0 | 禁止 | 保留 |
| CONNECT | 1 | 客戶端到服務端 | 客戶端請求連接服務端 |
| CONNACK | 2 | 服務端到客戶端 | 連接報文確認 |
| PUBLISH | 3 | 兩個方向都允許 | 發布消息 |
| PUBACK | 4 | 兩個方向都允許 | QoS 1消息發布收到確認 |
| PUBREC | 5 | 兩個方向都允許 | 發布收到(保證交付第一步) |
| PUBREL | 6 | 兩個方向都允許 | 發布釋放(保證交付第二步) |
| PUBCOMP | 7 | 兩個方向都允許 | QoS 2消息發布完成(保證交互第三步) |
| SUBSCRIBE | 8 | 客戶端到服務端 | 客戶端訂閱請求 |
| SUBACK | 9 | 服務端到客戶端 | 訂閱請求報文確認 |
| UNSUBSCRIBE | 10 | 客戶端到服務端 | 客戶端取消訂閱請求 |
| UNSUBACK | 11 | 服務端到客戶端 | 取消訂閱報文確認 |
| PINGREQ | 12 | 客戶端到服務端 | 心跳請求 |
| PINGRESP | 13 | 服務端到客戶端 | 心跳響應 |
| DISCONNECT | 14 | 客戶端到服務端 | 客戶端斷開連接 |
| Reserved | 15 | 禁止 | 保留 |
我看MQTT協議內容的時候,發現有趣的一點就是他的字符長度是可變的,可以用ASCII,也可以用UTF-8,這個在我接觸的其他協議里面是沒有的,這樣子的好處,顯而易見的就是能減少傳輸流量。
什么是QoS
Qos的全稱是服務質量(Quality of Service)。MQTT支持三種QoS,分別是0、1、2。級別越高,交互越復雜,越能保證正確性和到達率,但是開銷也更大。他們的交互流程圖如下:
- QoS 0: 盡力而為。消息發送者會想盡辦法發送消息,但是遇到意外並不會重試。
- QoS 1: 至少一次。消息接收者如果沒有知會或者知會本身丟失,消息發送者會再次發送以保證消息接收者至少會收到一次,當然可能造成重復消息。
-
QoS 2: 恰好一次。保證這種語義肯定會減少並發或者增加延時,不過丟失或者重復消息是不可接受的時候,級別2是最合適的。
image
服務質量是個老話題了。級別2所提供的不重不丟很多情況下是最理想的,不過往返多次的確認一定對並發和延遲帶來影響。級別1提供的至少一次語義在日志處理這種場景下是完全OK的,所以像Kafka這類的系統利用這一特點減少確認從而大大提高了並發。級別0適合雞肋數據場景,食之無味棄之可惜,就這么着吧。
相關閱讀
Paho UML
說明:
-
上面三個類(MqttAndroidClient, MqttService, MqttConnection)是存在於android.paho庫中,也就是說,這三個類是paho基於android的封裝,而其余的類都是封裝在java.paho中,處於底層庫。
-
中間的縱軸線從上往下,MqttService,MqttConnection, MqttAsyncClient, ClientComms,這四個類都具備connect, publish, subscribe等開放給上層的基礎方法,因為上層的調用都是通過這四個類傳遞到底層。
-
Paho運行的核心在於CommsCallback, CommsSender和CommsReceiver三個線程,分別用來做觸發回調、發送消息和接收消息三個動作。
-
ClientState和CommsTokenStore維護着一套復雜的隊列,三個線程通過使用這兩個類,來實現線程間的同步和消息的排隊。
-
ClientComms的功能除了基礎方法接口之外,還維護底層三個線程的狀態,這些狀態主要包括:
-
connected
-
connecting
-
disconnected
-
disconnecting
-
closed
-
resting
-
-
MqttAsyncClient的功能除了基礎方法接口之外,主要負責連接的重連,通過下面介紹的流程分析可以看出來,mqtt的重連機制就是在MqttAsynClient發起的。
-
MqttConnection是一個獨立的mqtt連接,用於維護當前連接的所有狀態。
流程分析
因為整個庫的代碼量比較大,我這里就不把代碼貼出來了,這是我通過閱讀代碼整理出來的所有關鍵方法的流程,包括connect, publish 和reconnect,希望能對讀者您有幫助。
- connect()
-
MqttAndroidClient.bindService()
-
MqttService.connect()
- MqttService.createConnection is not exists
-
MqttConnection.connect()
-
if option.isCleanSession(), then discard old data (we set cleanSession true)
-
callback action connect to activity when start connect fail, result fail or success
-
if MqttAsyncClient created
-
if isConnecting, return
-
if is connected, doAfterConnectSuccess
-
else MqttAsyncClient.connect
-
-
new MqttAsyncClient and connect
-
-
MqttAsyncClient.connect()
-
judge if need connect by ClientComms
-
if connected, throw exception
-
if connecting, throw exception
-
if disconnected, throw exception
-
if closed, throw exception
-
-
ClientComms.setReconnectCallback if options set automaticReconnect
-
new MqttToken as userToken
-
new ConnectActionListener and set comms, userToken
-
-
ConnectActionListener.connect()
-
persistence.open
-
comms.connect
-
-
ClientComms.connect()
-
if not disconnected or is closing, throw exception
-
tokenStore.open(), just set closeResponse null
-
ConnectBG.run
-
-
ConnectBG.run()
-
CommsReceiver.start()
-
runningSemaphore.acquire(), ensure just one receiver thread is running
-
while in.available && message instance of MqttAck
-
tokenStore.getToken
-
if token != null, synchronized token and clientState.notifyReceivedAck
-
else if instance of MqttPubRec MqttPubComp MqttPubAck, log itthese signal only receive after send
-
else throw exception
-
-
-
CommsSender.start()
-
runningSemaphore.acquire(), ensure just one sender thread is running
-
await clientState.pendingMessages.removeElementAt(0)
-
out.write(message)
-
-
CommsCallback.start()
-
runningSemaphore.acquire(), ensure just one callback thread is running
-
workAvailable.wait()
-
check if exists complete token, if true then handleActionComplete
-
if token.isComplete, notifyComplete
-
token.internalTok.notifyComplete
-
if token.isNotified
-
if mqttCallback != null and is DeliveryToken, notify deliveryComplete
-
if token.getActionCallback != null, notify onSuccess or onFail
-
-
token.setNotifiyed(true)
-
-
check if exists MqttPublish, if true then handleMessage(MqttPublish)this happens when register topic, not exists in DataCollector
-
-
internalSend
- clientState.send()
-
-
clientState.send()
-
token.setMessageId()
-
if MqttPublish
-
tokenStore.saveToken
-
pedingMessages.addElement
-
queueLock.notifyAll()
-
-
if MqttConnect
-
tokenStore.saveToken
-
pedingFlows.insertElementAt(0)
-
queueLock.notifyAll()
-
-
- publish()
-
MqttAndroidClient.publish()
-
MqttService.publish()
-
MqttConnection.publish()
-
if MqttAsyncClient.isConnected
-
MqttAsyncClient.publish
-
storeSendDetails
-
-
else, callbackToActivity error
-
-
MqttAsyncClient.publish()
-
new MqttPublish
-
comms.sendNoWait
-
-
ClientComms.sendNoWait()
-
if connected or trying to connect or trying to disconnect
-
if disconnectedBuffer is not empty, put message to thatafter reconnect, it will send disconnectBuffer message first, before send real-time message
-
internalSend()
-
token.setClient
-
clientState.send
-
-
-
else if disconnectedBuffer is not null, put message to that
-
else throw exception
-
-
ClientState.send()go to connect
- Reconnect automatic
-
ClientComms.shutdownConnection()
-
CommsCallback.connectionLost()
-
MqttAsyncClient.MqttReconnectCallback.connectinLost()
-
startReconnectCycle
-
run ReconnectTask after 1s
-
attempReconnect()
-
-
MqttAsyncClient.connect()go to connect .4
