本文是使用Java語言,eclipse paho的實現方式,去調用MQTT服務器端,編寫的MqttClient代碼中針對MQTT服務器重啟定制重連機制所遇到的問題進行匯總。
1.1編寫MqttConnection類,創建MQTT連接
1 public synchronized boolean connect() { 2 try { 3 if(null == client) { 4 //host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示, 5 // MemoryPersistence設置clientid的保存形式,默認為以內存保存 6 client = new MqttClient(host, client_id, new MemoryPersistence()); 7 //設置回調 8 client.setCallback(new PushCallBack(MqttConnection.this, devDao)); 9 } 10 //獲取連接配置 11 getOption(); 12 client.connect(option); 13 log.info("[MQTT] connect to Mqtt Server success..."); 14 return isConnected(); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 return false; 18 } 19 }
1 private void getOption() { 2 //MQTT連接設置 3 option = new MqttConnectOptions(); 4 //設置是否清空session,false表示服務器會保留客戶端的連接記錄,true表示每次連接到服務器都以新的身份連接 5 option.setCleanSession(true); 6 //設置連接的用戶名 7 option.setUserName(userName); 8 //設置連接的密碼 9 option.setPassword(password.toCharArray()); 10 //設置超時時間 單位為秒 11 option.setConnectionTimeout(outTime); 12 //設置會話心跳時間 單位為秒 服務器會每隔(1.5*keepTime)秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 13 option.setKeepAliveInterval(keepTime); 14 option.setAutomaticReconnect(true); 15 //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 16 // option.setWill(topic, "close".getBytes(), 2, true); 17 }
1.2編寫PushCallBack回調類,實現重連
1 public void connectionLost(Throwable throwable) { 2 // 連接丟失后,一般在這里面進行重連 3 log.info("[MQTT] 連接斷開,30S之后嘗試重連..."); 4 while(true) { 5 try { 6 Thread.sleep(30000); 7 mqttConn.reConnect(); 8 break; 9 } catch (Exception e) { 10 e.printStackTrace(); 11 continue; 12 } 13 } 14 }
異常出現:
若在connectionLost()方法中直接循環調用MqttConnection類中connect()方法,
去實現重連機制的話,會出現在第一次重連成功后,一直斷開連接再重連再斷開連接再重連的死循環中。
異常定位:
原因在connect()方法中的這句:
重新new上一個client_id相同的MqttClient,client_id是MQTT client的唯一標識,client_id不能重復。
這樣就會出現重連時創建的MqttClient,使程序中初始化時創建的MqttClient斷開連接,斷開連接后就會回滾到connectionLost方法中,
然后此方法中又會繼續重連,就出現上述的斷開連接再重連再斷開連接再重連的死循環。
異常解決:
在MqttConnection連接類中定義一個重連的方法:
1 //斷線重連 2 public void reConnect() throws Exception { 3 if(null != client) { 4 client.connect(option); 5 } 6 }
不需要重新new一個MqttClient,只需要調用connect方法就OK了,eclipse paho就會把之前的連接重新創建起來。
另外分享setRetained()方法:
1 /** 2 * 發布消息 3 * @param topic 消息主題 4 * @param qos 消息傳輸質量 5 * @param message 消息內容 6 */ 7 public void publish(String topic, int qos, String message) throws Exception { 8 MqttTopic mqttTopic = client.getTopic(topic); 9 MqttMessage mqttMessage = new MqttMessage(); 10 mqttMessage.setQos(qos); 11 //是否設置保留消息,若為true,后來的訂閱者訂閱該主題時仍可接收到該消息 12 mqttMessage.setRetained(false); 13 mqttMessage.setPayload(message.getBytes()); 14 MqttDeliveryToken token = mqttTopic.publish(mqttMessage); 15 token.waitForCompletion(); 16 log.info("[MQTT] publish message : " + token.isComplete() + 17 ",{topic : " + topic + ", message : " + message + "}"); 18 }
setRetained():消息保留機制,若設置為true,mqtt服務器會保留每次發布的消息,
若訂閱某主題的客戶端重啟,則會把此主題之前發布的消息重新推送到客戶端。