MQTT版本升級過程及源碼解析


MQTT版本升級過程及源碼解析


  首先說一下為什么要寫這篇文章呢,在我發現網上對MQTT的文章介紹實在太少了,可能也是使用這個的頻率比較低吧!還有對問題的定位以及解決的方式和辦法也太少了,所以特意寫這篇文章希望能作出一些貢獻,幫助到一些需要的人。

  主要記錄一下MQTT在原先1.2.0版本使用過程中出現的問題,排查問題到升級1.2.1版本過程中出現的問題,通過源碼一步步排查出最后的問題點,直到符合預期目標。

 

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.1</version>
</dependency>

  MQTT的搭建及SSL認證可以參考這個博客:https://www.cnblogs.com/yueli/p/7490453.html 在這不仔細闡述


  Qos介紹 

  在開頭先了解一下Qos的一些含義,也是這個問題為導火線

  • Qos = 0  最多一次的傳輸

       發布者PUBLISH消息到服務器(broker),發送即丟棄。沒有確認消息,也不知道對方是否收到。網絡層面,傳輸壓力小

  • Qos = 1  至少一次的傳輸

   發布者發布消息保存消息,服務器(broker)接收到消息,服務器(broker)PUBLISH到訂閱者,服務器(broker)回一個PUBACK信息到發布者讓刪除消息,然后訂閱者接收消息后PUBACK給服務器讓刪除消息。如果失敗了,在一段時間確認信息沒有收到,發送方都會將消息頭的DUP設置為1,然后再次發送消息,消息最少一次到達服務。例如網絡延遲等問題,發布者重復發送消息,訂閱者多次訂閱重復消息 

  • Qos = 2  只有一次的傳輸

  其實Qos = 2 只是在 1 的基礎上做了改掉的趕腳,在發布者PUBLISH到服務器之后多了消息的確認以及多了消息msgID的緩存,重復信息的去重。在服務器PUBLISH到訂閱者之后也多了消息的確認。

  三種情況的區別

   0 沒有保存message,沒有重發機制,啥事也不知道,1和2 的發布者和服務器有保存message,發布者有重發機制,服務器都有PUBLISH之后的PUBACK的確認機制,但是2的服務器多了緩存msgID的一項功能,提供了去重功能,防止了消息的重復發送,以及消息的接收的確認機制。訂閱者這邊不過多的介紹,感興趣再去了解。


 MQTT1.2.1版本出現的問題 

  問題的描述:因為項目中使用MQTT通信的地方比較多,一般都是以Qos = 0 的形式,這段時間發現會時常提醒報錯Too many publishes in progress (32202),看了一下源碼報錯地方

   從這里可以判定actualInFlight超出自己設置的maxInflight最大值導致的,嘗試加大maxInflight也無用,只是延遲報錯的時間而已


 問題排查過程

  首先上網搜一下是否有類似問題出現,果不其然有個哥們碰到了,博客地址:https://blog.csdn.net/lblblblblzdx/article/details/81159478 此文章給我很大幫助,感謝博主,但是最后的解決方案不是很好。

  跟蹤發布過程的源碼邏輯

  第一步:publish的過程

//1
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext,IMqttActionListener callback)
//2
public void sendNoWait(MqttWireMessage message, MqttToken token)
//3
void internalSend(MqttWireMessage message, MqttToken token)
//4
public void send(MqttWireMessage message, MqttToken token) 

  第二步:所有Qos類型,在publish消息的頂級父類中的構造器默認設置msgId = 0  

  第三步:消息在send方法中做了主要處理

   第四步:根據上面的Qos的介紹,說明我們publish的消息在這個地方全部都緩存在tokens這里,其實就是放入Hashtable中,不管什么等級的Qos

   在這幾個步驟中,已經緩存好信息,准備異步發送,其中的lock機制就不多去解釋

  第五步:異步發送,其實主要是在客戶端鏈接完成的時候就已經開始監聽了,connect流程

 1 //1
 2 public IMqttToken connect(MqttConnectOptions options)
 3 
 4 //2 異步鏈接
 5 ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken,userContext, callback, reconnecting);
 6 connectActionListener.connect();
 7 
 8 //3 客戶端的通信鏈接,包括發送和接收
 9 ClientComms.connect(options, token);
10 
11 //4 個人理解是 將通信信息塞進線程池中,分別開啟發送和接收的線程處理
12 ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
13 conbg.start();

  第六步:在ConnectBG的run中 new CommsSender. start線程run中while循環發送的信息流程,一直發送消息中

 

   第七步:在notifySent的方法中判斷Qos = 0 的作出了判斷及操作

   這些差不多就是發布過程的主題流程結構,了解這些才會知道讓你解決問題更加的容易


  問題點

  在上面的第六步的圖中黃色箭頭指出了問題出現的點,主要是在大數據量高並發的時候,因為在Qos = 0 的時候,在tokens(Hashtable)中的key一直是0,默認初始化。后面的流程中並沒有改變過,在黃色那塊tokenStore.getToken 在發送之后才remove數據,但是多條數據高並發的時候,在remove數據之后,后一條在get的時候會出現空的狀況,不發送信息,導致actualInFlight沒有減,一直增加,一定時間后就會超出最大值。Qos =1 、2 是不會發送這樣的情況,因為他們的messageId是唯一的。


  解決辦法

  1. 將Qos設置為1 (這是網上主流的解決辦法,但是這個太耗費資源問題,在我看來只是規避問題點而已)
  2. 升級MQTT版本1.2.1    (這個版本解決了剛才說的bug)      
  3. 也可以想想在不升級版本的情況下如何去改善這個問題,重寫那些類可以實現···

  


 

   解決過程

  既然說了第一種解決辦法不是很贊同,那就直接進去第二種辦法吧。升級到1.2.1版,時間:Feb, 2019,但是在升級版本的時候又出現了一些問題,因為改動還是有點多的。

  接下來說說1.2.1版本的改動了什么呢


  首先主要改動的是在Qos = 0 的不放入tokens中了,首先想到的是不會get到了,也不用擔心重復了,直接從集合pendingMessages中拿

   然后從信息的自身獲取數據的token,獲取不到再去tokens中拿

    以上兩個就是主要解決這個高並發沖突的原因


    跳入另一個坑中···

   為什么這么說呢,在我們開發的意識中,升級版本怎么也要向下兼容吧,那就順其自然的換個版本就完事了,結果一跑起來,一堆紅色的出現,那心情···我太南了。以我的性子,就是不能慣着她,繼續深挖為什么,既然坑已經有了,就不怕有多深。

   另一個坑的過程

    第一時間也是上網搜一些為啥,大家的解決也差不多,都是SSL驗證出現問題,但是解決方案也是五花八門


 

  解決方案

       第一種:設置系統屬性  類似這樣的文章 https://blog.csdn.net/hxpjava1/article/details/77937026

   第二種:有看了一些代碼了,就是重寫X509TrustManagerImpl,繞過SSL的驗證,試過有用,類似這樣的文章 https://blog.csdn.net/iverson_AL/article/details/100669777

   第三種:再深入看一些源碼,你會看到會什么會報錯,為什么會驗證,主要是你的屬性沒有設置好,采用了默認驗證導致


   解決過程

  這里主要說一下解決的過程,如何從這些網絡文章種找出適合自己的出路。

  首先 第一種我就不咋喜歡,動不動就設置了系統全局屬性,第二種方案,有兩種可能性,一是這個api真的有問題或是不符合自己的項目需要重寫代碼邏輯結構,二是在不了解的情況下直接繞過驗證。我在測試第二個的時候就是如此只是稍微看了一下源碼,沒有深入看進去,試了一下,果真可以實現我想要目的。

  但是過后又想了一下,不該如此,既然開源出來的東西,不可能如此**,應該會有什么地方可以簡單設置一下的。既然有這個想法就一直深入探究下去,果不其然,真相出來了。

  在1.2.1版本中MqttConnectOptions 的httpsHostnameVerificationEnabled屬性默認true,導致不是Https的被驗證不通過導致的,也可能MQTT開發人員安全意識很強,在1.2.0版本中沒有這個概念存在,所以在版本升級的時候需要加上MqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

  以下源碼查看的過程

    若是有Https證書是不會有問題的。


  總結一下

   其實很簡單的問題,居然整的時候那么復雜,原因是我們不夠強,面對源碼的時候還是比較害怕的,還有比較懶吧。

   整個過程比較繁瑣,啰嗦吧,耐心看下來,應該有收獲。

   操作總結:

    1. 升級MQTT版本1.2.1
    2. 若不是https的需要設置為false
    3. 不要走,跑起來

   擴展點及疑惑地點可以供參考

  

 

              轉載請注明出處  https://www.cnblogs.com/zhouguanglin/p/11986446.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM