先講一下paho的mqtt連接的java實現情況
1、paho的mqtt底層是采用三個線程進行異步的消息發送、處理和接收的【debug的時候可以看到三個線程】,然后比較坑的是,在處理消息的時候,如果有運行是異常拋出但是沒有處理的話,整個mqtt客戶端直接斷開連接。
2、然后就是底層paho提供了兩個客戶端連接實現——MqttClient和MqttAsyncClient。前者是同步的,后者是異步的,主要是把連接建立等耗時操作進行異步處理,一般使用方式為
IMqttToken conToken; conToken = asyncClient.client.connect(conToken); ... do some work... conToken.waitForCompletion();
注:其實MqttClient底層也是采用的異步形式,主要是為了同之前的api兼容
3、最后mqtt的對於消息的處理是采用回調的方式,同時,對於收發消息可以采用注冊監聽器的方式進行進度的監聽,具體使用可以參看paho項目的GitHub上的示例,上面有三個比較全的示例
4、關於MqttClientPersistence底下的兩個類MemoryPersistence和MqttDefaultFilePersistence,主要是為了消息傳送過程中的一個臨時緩存,如Qos為1,2的消息
重連的思路
針對mqtt協議的原本用途——低網絡質量環境,重連是必須的。目前的話重連有幾種思路
1、在回調函數里面設置當mqtt客戶端連接丟失時重新連接
2、在連接參數里面設置重連方法org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean),個人推薦使用第二種方式
下面附上自己的采用第一種方式設置的mqtt客戶端以及回調類
import java.io.UnsupportedEncodingException; import java.util.List; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttSecurityException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MyMqttClient { String clientId; private MemoryPersistence persistence = new MemoryPersistence(); // Private instance variables private MqttClient client; private String brokerUrl; private MqttConnectOptions conOpt; private boolean clean; private String password; private String userName; //需要重新訂閱的主題 private List<String> topicList; public MyMqttClient() { super(); } public MyMqttClient(String brokerUrl, String clientId, boolean cleanSession, String userName, String password) throws MqttException { super(); this.brokerUrl = brokerUrl; this.clientId = clientId; this.clean = cleanSession; this.password = password; this.userName = userName; // 建立mqtt連接屬性 this.conOpt = new MqttConnectOptions(); this.conOpt.setConnectionTimeout(60); // this.conOpt.setKeepAliveInterval(60); this.conOpt.setCleanSession(true); // 初始化客戶端 this.client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); this.client.setCallback(new MyMqttCallback(this)); } public List<String> getTopicList() { return topicList; } public void setTopicList(List<String> topicList) { this.topicList = topicList; } public void connect() { try { if (!this.client.isConnected()) { this.client.connect(this.conOpt); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void subscribe(String topicName, int qos) { try { this.client.subscribe(topicName, qos); } catch (MqttException e) { e.printStackTrace(); } } public void publish(String topicName, String message, int qos) { try { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes("utf-8")); this.client.publish(topicName, mqttMessage); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void close() { try { this.client.disconnect(); this.client.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void reConnect() throws MqttSecurityException, MqttException { if (null != this.client) { if(!this.client.isConnected()) { client.connect(this.conOpt); }else { this.client.disconnect(); this.client.connect(this.conOpt); } } } }
import java.sql.CallableStatement; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.example.util.Config; import com.example.util.HikariCPUtil; public class MyMqttCallback implements MqttCallback { private static final Logger logger = LoggerFactory.getLogger(MyMqttCallback.class); private static final ExecutorService pool = Executors.newFixedThreadPool(5); /** * 私有化的mqtt客戶端,用以重連 */ private MyMqttClient myClient; public MyMqttCallback(MyMqttClient myClient) { super(); this.myClient = myClient; } /** * 設置重連機制 */ @Override public void connectionLost(Throwable cause) { logger.error("連接丟失,原因{}",cause); // 連接丟失后,一般在這里面進行重連 while (true) { try { Thread.sleep(30000); myClient.reConnect(); List<String> topicList = this.myClient.getTopicList(); for (String topic : topicList) { this.myClient.subscribe(topic, Config.QOS); } logger.info("mqtt重新連接,重新訂閱!"); break; } catch (Exception e) { e.printStackTrace(); continue; } } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { //消息處理 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO Auto-generated method stub } }
關於重連之后的主題重新
主題重新訂閱這個目前主要有兩種實現方式,具體看需求
1、設置連接屬性的MqttConnectOptions.setCleanSession(false),然后設置mqtt客戶端的主題固定,重連上之后之前的主題保留,這個和mqtt的broker關系比較大
2、采用MqttCallbackExtended這個回調類,在org.eclipse.paho.client.mqttv3.MqttCallbackExtended.connectComplete(boolean, String)這個方法里面實現主題的重新訂閱,這個一般結合org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean)使用
3、像我上面的例子一樣,在包裝類里面緩存之前的topic,在短信重連成功的代碼里面進行重新訂閱即可
最后
代碼都是一步步晚上,不要想着拿着我的代碼就去用,能用,不保證會不會出什么bug的