Mqtt(paho)重連機制


本文是使用Java語言,eclipse paho的實現方式,去調用MQTT服務器端,編寫的MqttClient代碼中針對MQTT服務器重啟定制重連機制所遇到的問題進行匯總。

 

1.1編寫MqttConnection類,創建MQTT連接

 1 public synchronized boolean connect() {
 2         try {
 3             if(null == client) {
 4                 //host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,
 5                 // MemoryPersistence設置clientid的保存形式,默認為以內存保存
 6                 client = new MqttClient(host, client_id, new MemoryPersistence());
 7                 //設置回調
 8                 client.setCallback(new PushCallBack(MqttConnection.this, devDao));
 9             }
10             //獲取連接配置
11             getOption();
12             client.connect(option);
13             log.info("[MQTT] connect to Mqtt Server success...");
14             return isConnected();
15         } catch (Exception e) {
16             e.printStackTrace();
17             return false;
18         }
19     }
 1 private void getOption() {
 2         //MQTT連接設置
 3         option = new MqttConnectOptions();
 4         //設置是否清空session,false表示服務器會保留客戶端的連接記錄,true表示每次連接到服務器都以新的身份連接
 5         option.setCleanSession(true);
 6         //設置連接的用戶名
 7         option.setUserName(userName);
 8         //設置連接的密碼
 9         option.setPassword(password.toCharArray());
10         //設置超時時間 單位為秒
11         option.setConnectionTimeout(outTime);
12         //設置會話心跳時間 單位為秒 服務器會每隔(1.5*keepTime)秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
13         option.setKeepAliveInterval(keepTime);
14         option.setAutomaticReconnect(true);
15         //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
16 //            option.setWill(topic, "close".getBytes(), 2, true);
17     }

1.2編寫PushCallBack回調類,實現重連

 1 public void connectionLost(Throwable throwable) {
 2         // 連接丟失后,一般在這里面進行重連
 3         log.info("[MQTT] 連接斷開,30S之后嘗試重連...");
 4         while(true) {
 5             try {
 6                 Thread.sleep(30000);
 7                 mqttConn.reConnect();
 8                 break;
 9             } catch (Exception e) {
10                 e.printStackTrace();
11                 continue;
12             }
13         }
14     }

異常出現:

若在connectionLost()方法中直接循環調用MqttConnection類中connect()方法,

去實現重連機制的話,會出現在第一次重連成功后,一直斷開連接再重連再斷開連接再重連的死循環中。

異常定位:

原因在connect()方法中的這句:

 

重新new上一個client_id相同的MqttClient,client_id是MQTT client的唯一標識,client_id不能重復。

這樣就會出現重連時創建的MqttClient,使程序中初始化時創建的MqttClient斷開連接,斷開連接后就會回滾到connectionLost方法中,

然后此方法中又會繼續重連,就出現上述的斷開連接再重連再斷開連接再重連的死循環。

異常解決:

在MqttConnection連接類中定義一個重連的方法:

1 //斷線重連
2     public void reConnect() throws Exception {
3         if(null != client) {
4             client.connect(option);
5         }
6     }

不需要重新new一個MqttClient,只需要調用connect方法就OK了,eclipse paho就會把之前的連接重新創建起來。

 

另外分享setRetained()方法:

 1 /**
 2      * 發布消息
 3      * @param topic     消息主題
 4      * @param qos       消息傳輸質量
 5      * @param message   消息內容
 6      */
 7     public void publish(String topic, int qos, String message) throws Exception {
 8         MqttTopic mqttTopic = client.getTopic(topic);
 9         MqttMessage mqttMessage = new MqttMessage();
10         mqttMessage.setQos(qos);
11         //是否設置保留消息,若為true,后來的訂閱者訂閱該主題時仍可接收到該消息
12         mqttMessage.setRetained(false);
13         mqttMessage.setPayload(message.getBytes());
14         MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
15         token.waitForCompletion();
16         log.info("[MQTT] publish message : " + token.isComplete() +
17                 ",{topic : " + topic + ", message : " + message + "}");
18     }

setRetained():消息保留機制,若設置為true,mqtt服務器會保留每次發布的消息,

若訂閱某主題的客戶端重啟,則會把此主題之前發布的消息重新推送到客戶端。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM