最近在使用MQTT來實現消息的傳輸,網上demo很多,這里就不在重復介紹了,直接上代碼,百度就能出現一大堆
下面是MQTT實現訂閱的主要代碼部分
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
//設置斷開后重新連接
options.setAutomaticReconnect(true);
try {
client.setCallback(new PushCallback());//設置各種情況的回調函數
client.connect(options);
//訂閱消息
int[] Qos = {0};
String[] topic1 = {TOPIC};
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
回到方法實現代碼如下
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 連接丟失后,觸發這個方法
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// publish后會執行到這里
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
System.out.println("接收消息主題:"+arg0);
System.out.println("接收消息Qos:"+arg1.getQos());
System.out.println("接收消息內容:"+new String(arg1.getPayload()));
}
}
如果MQTT連接斷開,會調用connectionLost 函數,我一開始覺得直接在這里使用client.connect(options),就可以直接實現重連了,結果報錯mqtt的狀態為

這個好解決,如果這樣設置,可以實現斷開自動重連
//設置斷開后重新連接
options.setAutomaticReconnect(true);
但這樣重連是實現了,但是之前訂閱的主題卻接收不到消息了,需要重新訂閱主題才能正常接收消息,那我這個重新訂閱的代碼要怎么再放進去呢,反正不是再connectionLost里就是了,那是后連接還沒有重連連上!
繼續看MQTT的connec的源碼發現了一段代碼使我找到了解決方案

MqttReconnectCallback 是實現MqttCallbackExtended接口的
發現comms中有設置重連的回調對象,但是怎么把這個回調由我們來主動放進去呢?繼續往下看源碼可以發現

也就是如果我們在之前放入client的回調對象是實現的 MqttCallbackExtended 接口,則MQTT會將我們的回調對象放入 connectActionListener 中 然后由 connectActionListener實現具體的connect
接下來我們不callback 對象改為實現 MqttCallbackExtended這個接口,然后實現下面方法,
@Override
public void connectComplete(boolean reconnect, String serverURI) {
//連接成功后調用
client.subscribe(topics,Qos);//具體訂閱代碼
}
就可以解決MQTT重連后無法訂閱的問題
