MQTT的學習研究(十三) IBM MQTTV3 簡單發布訂閱實例


使用IBM MQTTv3實現相關的發布訂閱功能

MQTTv3的發布消息的實現:

Java代碼   收藏代碼
  1. package com.etrip.mqttv3;  
  2.   
  3. import com.ibm.micro.client.mqttv3.MqttClient;  
  4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
  5. import com.ibm.micro.client.mqttv3.MqttMessage;  
  6. import com.ibm.micro.client.mqttv3.MqttTopic;  
  7. /** 
  8.  * MQTTV3的發布消息類 
  9.  *  
  10.  * @author longgangbai 
  11.  */  
  12. public class MQTTPub {   
  13.     public static void doTest(){   
  14.         try {   
  15.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");   
  16.             MqttTopic topic = client.getTopic("tokudu/china");   
  17.             MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());   
  18.             message.setQos(1);   
  19.             client.connect();  
  20.             while(true){  
  21.                 MqttDeliveryToken token = topic.publish(message);   
  22.                 while (!token.isComplete()){   
  23.                     token.waitForCompletion(1000);   
  24.                 }   
  25.             }  
  26.         } catch (Exception e) {   
  27.             e.printStackTrace();   
  28.         }   
  29.     }   
  30. }   

 MQTTV3的訂閱消息類

Java代碼   收藏代碼
  1. package com.etrip.mqttv3;  
  2. import com.ibm.micro.client.mqttv3.MqttClient;  
  3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;  
  4. /** 
  5.  * MQTTV3的訂閱消息類 
  6.  *  
  7.  * @author longgangbai 
  8.  */  
  9. public class MQTTSubsribe {   
  10.     public static String doTest() {   
  11.         try {   
  12.             //創建MqttClient  
  13.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");   
  14.             //回調處理類  
  15.             CallBack callback = new CallBack();   
  16.             client.setCallback(callback);   
  17.             //創建連接可選項信息  
  18.             MqttConnectOptions conOptions = new MqttConnectOptions();   
  19.             //  
  20.             conOptions.setCleanSession(false);   
  21.             //連接broker  
  22.             client.connect(conOptions);   
  23.             //發布相關的訂閱  
  24.             client.subscribe("tokudu/china", 1);   
  25.             //client.disconnect();   
  26.         } catch (Exception e) {   
  27.             e.printStackTrace();   
  28.             return "failed";   
  29.         }   
  30.         return "success";   
  31.     }   
  32. }   

 回調處理類處理訂閱的消息類

 

Java代碼   收藏代碼
  1. package com.etrip.mqttv3;  
  2.   
  3. import com.ibm.micro.client.mqttv3.MqttCallback;  
  4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
  5. import com.ibm.micro.client.mqttv3.MqttMessage;  
  6. import com.ibm.micro.client.mqttv3.MqttTopic;  
  7. /** 
  8.  * 回調處理類 
  9.  * 處理訂閱的消息類 
  10.  *  
  11.  * @author longgangbai 
  12.  */  
  13. public class CallBack implements MqttCallback {   
  14.       
  15.     public CallBack() {   
  16.     }   
  17.     /** 
  18.      * 接收到信息的處理 
  19.      */  
  20.     public void messageArrived(MqttTopic topic, MqttMessage message) {   
  21.         try {   
  22.             System.out.println(" MQTTSubsribe  message.toString()"+message.toString());  
  23.         } catch (Exception e) {   
  24.             e.printStackTrace();   
  25.         }   
  26.     }   
  27.     public void connectionLost(Throwable cause) {  
  28.           
  29.     }   
  30.     public void deliveryComplete(MqttDeliveryToken token) {  
  31.           
  32.     }   
  33. }   

 

 

測試類:

Java代碼   收藏代碼
  1. package com.etrip.mqttv3;  
  2. /** 
  3.  * MQTTV3的測試類 
  4.  *  
  5.  * @author longgangbai 
  6.  */  
  7. public class MQTTMain {  
  8.     public static void main(String[] args) {  
  9.         //訂閱消息的方法  
  10.         MQTTSubsribe.doTest();  
  11.         //發布消息的類  
  12.         MQTTPub.doTest();  
  13.           
  14.     }  
  15. }  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM