fusesource版本:mqtt-client-1.11.jar
下載地址:https://github.com/fusesource/mqtt-client
fusesource提供三種mqtt client api: 阻塞API,基於Futur的API和回調API。其中,回調API是最復雜的也是性能最好的,另外兩種均是對回調API的封裝。 我們下面就簡單介紹一下回調API的使用方法。
1 import org.fusesource.hawtbuf.Buffer; 2 import org.fusesource.hawtbuf.UTF8Buffer; 3 import org.fusesource.hawtdispatch.Dispatch; 4 import org.fusesource.hawtdispatch.DispatchQueue; 5 import org.fusesource.mqtt.client.BlockingConnection; 6 import org.fusesource.mqtt.client.Callback; 7 import org.fusesource.mqtt.client.CallbackConnection; 8 import org.fusesource.mqtt.client.FutureConnection; 9 import org.fusesource.mqtt.client.Listener; 10 import org.fusesource.mqtt.client.MQTT; 11 import org.fusesource.mqtt.client.Message; 12 import org.fusesource.mqtt.client.QoS; 13 import org.fusesource.mqtt.client.Topic; 14 import org.fusesource.mqtt.client.Tracer; 15 import org.fusesource.mqtt.codec.MQTTFrame; 16 public class MqttClient { 17 public static void main(String[] args) 18 { 19 try { 20 MQTT mqtt=new MQTT(); 21 22 //MQTT設置說明 23 mqtt.setHost("tcp://10.1.58.191:1883"); 24 mqtt.setClientId("876543210"); //用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成 25 mqtt.setCleanSession(false); //若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true 26 mqtt.setKeepAlive((short) 60);//定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待 27 mqtt.setUserName("admin");//服務器認證用戶名 28 mqtt.setPassword("admin");//服務器認證密碼 29 30 mqtt.setWillTopic("willTopic");//設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息 31 mqtt.setWillMessage("willMessage");//設置“遺囑”消息的內容,默認是長度為零的消息 32 mqtt.setWillQos(QoS.AT_LEAST_ONCE);//設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE 33 mqtt.setWillRetain(true);//若想要在發布“遺囑”消息時擁有retain選項,則為true 34 mqtt.setVersion("3.1.1"); 35 36 //失敗重連接設置說明 37 mqtt.setConnectAttemptsMax(10L);//客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1 38 mqtt.setReconnectAttemptsMax(3L);//客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1 39 mqtt.setReconnectDelay(10L);//首次重連接間隔毫秒數,默認為10ms 40 mqtt.setReconnectDelayMax(30000L);//重連接間隔毫秒數,默認為30000ms 41 mqtt.setReconnectBackOffMultiplier(2);//設置重連接指數回歸。設置為1則停用指數回歸,默認為2 42 43 //Socket設置說明 44 mqtt.setReceiveBufferSize(65536);//設置socket接收緩沖區大小,默認為65536(64k) 45 mqtt.setSendBufferSize(65536);//設置socket發送緩沖區大小,默認為65536(64k) 46 mqtt.setTrafficClass(8);//設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸 47 48 //帶寬限制設置說明 49 mqtt.setMaxReadRate(0);//設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制 50 mqtt.setMaxWriteRate(0);//設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制 51 52 //選擇消息分發隊列 53 mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法 54 55 //設置跟蹤器 56 mqtt.setTracer(new Tracer(){ 57 @Override 58 public void onReceive(MQTTFrame frame) { 59 System.out.println("recv: "+frame); 60 } 61 @Override 62 public void onSend(MQTTFrame frame) { 63 System.out.println("send: "+frame); 64 } 65 @Override 66 public void debug(String message, Object... args) { 67 System.out.println(String.format("debug: "+message, args)); 68 } 69 }); 70 71 72 73 //使用回調式API 74 final CallbackConnection callbackConnection=mqtt.callbackConnection(); 75 76 //連接監聽 77 callbackConnection.listener(new Listener() { 78 79 //接收訂閱話題發布的消息 80 @Override 81 public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) { 82 System.out.println("=============receive msg================"+new String(payload.toByteArray())); 83 onComplete.run(); 84 } 85 86 //連接失敗 87 @Override 88 public void onFailure(Throwable value) { 89 System.out.println("===========connect failure==========="); 90 callbackConnection.disconnect(null); 91 } 92 93 //連接斷開 94 @Override 95 public void onDisconnected() { 96 System.out.println("====mqtt disconnected====="); 97 98 } 99 100 //連接成功 101 @Override 102 public void onConnected() { 103 System.out.println("====mqtt connected====="); 104 105 } 106 }); 107 108 109 110 //連接 111 callbackConnection.connect(new Callback() { 112 113 //連接失敗 114 public void onFailure(Throwable value) { 115 System.out.println("============連接失敗:"+value.getLocalizedMessage()+"============"); 116 } 117 // 連接成功 118 public void onSuccess(Void v) { 119 //訂閱主題 120 Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)}; 121 callbackConnection.subscribe(topics, new Callback<byte[]>() { 122 //訂閱主題成功 123 public void onSuccess(byte[] qoses) { 124 System.out.println("========訂閱成功======="); 125 } 126 //訂閱主題失敗 127 public void onFailure(Throwable value) { 128 System.out.println("========訂閱失敗======="); 129 callbackConnection.disconnect(null); 130 } 131 }); 132 133 134 //發布消息 135 callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() { 136 public void onSuccess(Void v) { 137 System.out.println("===========消息發布成功============"); 138 } 139 public void onFailure(Throwable value) { 140 System.out.println("========消息發布失敗======="); 141 callbackConnection.disconnect(null); 142 } 143 }); 144 145 } 146 }); 147 148 149 150 while(true) 151 { 152 153 } 154 155 156 } catch (Exception e) { 157 e.printStackTrace(); 158 } 159 160 } 161 }
