最近在使用MQTT來實現消息的傳輸,網上demo很多,這里就不在重復介紹了,直接上代碼,百度就能出現一大堆
下面是MQTT實現訂閱的主要代碼部分
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); //設置斷開后重新連接 options.setAutomaticReconnect(true); try { client.setCallback(new PushCallback());//設置各種情況的回調函數 client.connect(options); //訂閱消息 int[] Qos = {0}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); }
回到方法實現代碼如下
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,觸發這個方法 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // publish后會執行到這里 } @Override public void messageArrived(String arg0, MqttMessage arg1) throws Exception { System.out.println("接收消息主題:"+arg0); System.out.println("接收消息Qos:"+arg1.getQos()); System.out.println("接收消息內容:"+new String(arg1.getPayload())); } }
如果MQTT連接斷開,會調用connectionLost 函數,我一開始覺得直接在這里使用client.connect(options),就可以直接實現重連了,結果報錯mqtt的狀態為
這個好解決,如果這樣設置,可以實現斷開自動重連
//設置斷開后重新連接
options.setAutomaticReconnect(true);
但這樣重連是實現了,但是之前訂閱的主題卻接收不到消息了,需要重新訂閱主題才能正常接收消息,那我這個重新訂閱的代碼要怎么再放進去呢,反正不是再connectionLost里就是了,那是后連接還沒有重連連上!
繼續看MQTT的connec的源碼發現了一段代碼使我找到了解決方案
MqttReconnectCallback 是實現MqttCallbackExtended接口的
發現comms中有設置重連的回調對象,但是怎么把這個回調由我們來主動放進去呢?繼續往下看源碼可以發現
也就是如果我們在之前放入client的回調對象是實現的 MqttCallbackExtended 接口,則MQTT會將我們的回調對象放入 connectActionListener 中 然后由 connectActionListener實現具體的connect
接下來我們不callback 對象改為實現 MqttCallbackExtended這個接口,然后實現下面方法,
@Override public void connectComplete(boolean reconnect, String serverURI) { //連接成功后調用
client.subscribe(topics,Qos);//具體訂閱代碼
}
就可以解決MQTT重連后無法訂閱的問題