mqtt client api: 阻塞API


fusesource版本:mqtt-client-1.11.jar
下載地址:https://github.com/fusesource/mqtt-client

fusesource提供三種mqtt client api: 阻塞API,基於Futur的API和回調API。其中,回調API是最復雜的也是性能最好的,另外兩種均是對回調API的封裝。 我們下面就簡單介紹一下回調API的使用方法。

  1 import org.fusesource.hawtbuf.Buffer;
  2 import org.fusesource.hawtbuf.UTF8Buffer;
  3 import org.fusesource.hawtdispatch.Dispatch;
  4 import org.fusesource.hawtdispatch.DispatchQueue;
  5 import org.fusesource.mqtt.client.BlockingConnection;
  6 import org.fusesource.mqtt.client.Callback;
  7 import org.fusesource.mqtt.client.CallbackConnection;
  8 import org.fusesource.mqtt.client.FutureConnection;
  9 import org.fusesource.mqtt.client.Listener;
 10 import org.fusesource.mqtt.client.MQTT;
 11 import org.fusesource.mqtt.client.Message;
 12 import org.fusesource.mqtt.client.QoS;
 13 import org.fusesource.mqtt.client.Topic;
 14 import org.fusesource.mqtt.client.Tracer;
 15 import org.fusesource.mqtt.codec.MQTTFrame; 
 16 public class MqttClient {
 17 public static void main(String[] args)
 18 {
 19  try {
 20    MQTT mqtt=new MQTT();
 21    
 22    //MQTT設置說明
 23    mqtt.setHost("tcp://10.1.58.191:1883");
 24    mqtt.setClientId("876543210"); //用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成
 25    mqtt.setCleanSession(false); //若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true
 26    mqtt.setKeepAlive((short) 60);//定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待
 27    mqtt.setUserName("admin");//服務器認證用戶名
 28    mqtt.setPassword("admin");//服務器認證密碼
 29    
 30    mqtt.setWillTopic("willTopic");//設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息
 31    mqtt.setWillMessage("willMessage");//設置“遺囑”消息的內容,默認是長度為零的消息
 32    mqtt.setWillQos(QoS.AT_LEAST_ONCE);//設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE
 33    mqtt.setWillRetain(true);//若想要在發布“遺囑”消息時擁有retain選項,則為true
 34    mqtt.setVersion("3.1.1");
 35    
 36    //失敗重連接設置說明
 37    mqtt.setConnectAttemptsMax(10L);//客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
 38    mqtt.setReconnectAttemptsMax(3L);//客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
 39    mqtt.setReconnectDelay(10L);//首次重連接間隔毫秒數,默認為10ms
 40    mqtt.setReconnectDelayMax(30000L);//重連接間隔毫秒數,默認為30000ms
 41    mqtt.setReconnectBackOffMultiplier(2);//設置重連接指數回歸。設置為1則停用指數回歸,默認為2
 42    
 43    //Socket設置說明
 44          mqtt.setReceiveBufferSize(65536);//設置socket接收緩沖區大小,默認為65536(64k)
 45          mqtt.setSendBufferSize(65536);//設置socket發送緩沖區大小,默認為65536(64k)
 46          mqtt.setTrafficClass(8);//設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸
 47          
 48          //帶寬限制設置說明
 49          mqtt.setMaxReadRate(0);//設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制
 50          mqtt.setMaxWriteRate(0);//設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制
 51          
 52          //選擇消息分發隊列
 53          mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法
 54    
 55          //設置跟蹤器
 56    mqtt.setTracer(new Tracer(){
 57              @Override
 58              public void onReceive(MQTTFrame frame) {
 59                  System.out.println("recv: "+frame);
 60              } 
 61              @Override
 62              public void onSend(MQTTFrame frame) {
 63                  System.out.println("send: "+frame);
 64              } 
 65              @Override
 66              public void debug(String message, Object... args) {
 67                  System.out.println(String.format("debug: "+message, args));
 68              }
 69          });
 70    
 71    
 72    
 73          //使用回調式API
 74    final CallbackConnection callbackConnection=mqtt.callbackConnection();
 75    
 76    //連接監聽
 77    callbackConnection.listener(new Listener() {
 78    
 79    //接收訂閱話題發布的消息
 80    @Override
 81    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
 82     System.out.println("=============receive msg================"+new String(payload.toByteArray()));
 83      onComplete.run();
 84    }
 85    
 86    //連接失敗
 87    @Override
 88    public void onFailure(Throwable value) {
 89     System.out.println("===========connect failure===========");
 90     callbackConnection.disconnect(null);
 91    }
 92    
 93       //連接斷開
 94    @Override
 95    public void onDisconnected() {
 96     System.out.println("====mqtt disconnected=====");
 97     
 98    }
 99    
100    //連接成功
101    @Override
102    public void onConnected() {
103     System.out.println("====mqtt connected=====");
104     
105    }
106   });
107    
108    
109    
110    //連接
111    callbackConnection.connect(new Callback() {
112     
113      //連接失敗
114        public void onFailure(Throwable value) {
115            System.out.println("============連接失敗:"+value.getLocalizedMessage()+"============");
116        } 
117        // 連接成功
118        public void onSuccess(Void v) { 
119            //訂閱主題
120            Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
121            callbackConnection.subscribe(topics, new Callback<byte[]>() {
122                //訂閱主題成功
123             public void onSuccess(byte[] qoses) {
124                    System.out.println("========訂閱成功=======");
125                }
126             //訂閱主題失敗
127                public void onFailure(Throwable value) {
128                 System.out.println("========訂閱失敗=======");
129                 callbackConnection.disconnect(null);
130                }
131            });
132            
133            
134             //發布消息
135            callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
136                public void onSuccess(Void v) {
137                  System.out.println("===========消息發布成功============");
138                }
139                public void onFailure(Throwable value) {
140                 System.out.println("========消息發布失敗=======");
141                 callbackConnection.disconnect(null);
142                }
143            }); 
144   
145        }
146    });
147    
148    
149    
150    while(true)
151    {
152     
153    }
154  
155   
156  } catch (Exception e) {
157   e.printStackTrace();
158  } 
159   
160 }
161 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM