使用IBM MQTTv3實現相關的發布訂閱功能
MQTTv3的發布消息的實現:
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- /**
- * MQTTV3的發布消息類
- *
- * @author longgangbai
- */
- public class MQTTPub {
- public static void doTest(){
- try {
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
- MqttTopic topic = client.getTopic("tokudu/china");
- MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
- message.setQos(1);
- client.connect();
- while(true){
- MqttDeliveryToken token = topic.publish(message);
- while (!token.isComplete()){
- token.waitForCompletion(1000);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MQTTV3的訂閱消息類
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttConnectOptions;
- /**
- * MQTTV3的訂閱消息類
- *
- * @author longgangbai
- */
- public class MQTTSubsribe {
- public static String doTest() {
- try {
- //創建MqttClient
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
- //回調處理類
- CallBack callback = new CallBack();
- client.setCallback(callback);
- //創建連接可選項信息
- MqttConnectOptions conOptions = new MqttConnectOptions();
- //
- conOptions.setCleanSession(false);
- //連接broker
- client.connect(conOptions);
- //發布相關的訂閱
- client.subscribe("tokudu/china", 1);
- //client.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- return "failed";
- }
- return "success";
- }
- }
回調處理類處理訂閱的消息類
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttCallback;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- /**
- * 回調處理類
- * 處理訂閱的消息類
- *
- * @author longgangbai
- */
- public class CallBack implements MqttCallback {
- public CallBack() {
- }
- /**
- * 接收到信息的處理
- */
- public void messageArrived(MqttTopic topic, MqttMessage message) {
- try {
- System.out.println(" MQTTSubsribe message.toString()"+message.toString());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public void connectionLost(Throwable cause) {
- }
- public void deliveryComplete(MqttDeliveryToken token) {
- }
- }
測試類:
- package com.etrip.mqttv3;
- /**
- * MQTTV3的測試類
- *
- * @author longgangbai
- */
- public class MQTTMain {
- public static void main(String[] args) {
- //訂閱消息的方法
- MQTTSubsribe.doTest();
- //發布消息的類
- MQTTPub.doTest();
- }
- }

