MQTT 測試DEMO


mqtt消息客戶端

package com.cjcx.inter.apimall.beijing.aibee;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Mqtt 簡易客戶端
 * Topic naming Examples:
 * Valid Topic subscriptions
 * Single topic subscriptions
 * <p>
 * /
 * /house
 * house/room/main-light
 * house/room/side-light
 * Using Wildcards
 * Subscribing to topic house/#
 * <p>
 * Covers
 * <p>
 * house/room1/main-light
 * house/room1/alarm
 * house/garage/main-light
 * house/main-door
 * etc
 * Subscribing to topic house/+/main-light
 * <p>
 * covers
 * <p>
 * house/room1/main-light
 * house/room2/main-light
 * house/garage/main-light
 * but doesn’t cover
 * <p>
 * house/room1/side-light
 * house/room2/side-light
 * Invalid Topic Subscriptions
 * house+ – Reason- no topic level
 * house# – Reason- no topic level
 * Publishing to Topics
 * A client can only publish to an individual topic. That is, using wildcards when publishing is not allowed.
 * <p>
 * E.G- To publish a message to two topics you need to publish the message twice
 */
@Component
public class AibeeMqttClient {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private String url;
    private String clientId = null;

    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;

    public AibeeMqttClient() {
        logger.info("AibeeMqttClient come int");
    }

    public void init(String url, String clientId) {
        if (!StringUtils.hasLength(clientId)) {
            logger.warn("MQTT clientId is null");
            return;
        }
        this.url = url;
        this.clientId = clientId;

        // 初始化連接設置對象
        mqttConnectOptions = new MqttConnectOptions();
        // true可以安全地使用內存持久性作為客戶端斷開連接時清除的所有狀態
        mqttConnectOptions.setCleanSession(true);
        // 設置連接超時
        mqttConnectOptions.setConnectionTimeout(30);
        // 設置持久化方式
        memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(url, clientId, memoryPersistence);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.warn("MQTT new MqttClient() 異常:{}", e);
            return;
        }

        // 設置連接和回調
        if (!mqttClient.isConnected()) {
            // 客戶端添加回調函數
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        logger.info("MQTT 連接已斷開, 60秒后重新連接");
                        Thread.sleep(60 * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    reConnect();
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("Client 接收消息主題 : " + topic);
                    System.out.println("Client 接收消息Qos : " + message.getQos());
                    System.out.println("Client 接收消息內容 : " + new String(message.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    logger.info("deliveryComplete");
                }

                @Override
                public void connectComplete(boolean b, String s) {
                    logger.info("connectComplete:{}, s:{}", b, s);
                }
            });
            //    創建連接
            try {
                mqttClient.connect(mqttConnectOptions);
                logger.info("MQTT 連接狀態: {}", (mqttClient.isConnected() ? "已連接" : "未連接"));
            } catch (MqttException e) {
                e.printStackTrace();
                logger.warn("MQTT 連接異常:{}", e);
            }
        } else {
            logger.info("MQTT 連接狀態已經連接..");
        }
    }

    //關閉連接
    public void closeConnect() {
        // 關閉存儲方式
        if (null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("memoryPersistence is null");
        }

        // 關閉連接
        if (null != mqttClient) {
            if (mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                    mqttClient.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                logger.info("mqttClient is not connect");
            }
        } else {
            logger.info("mqttClient is null");
        }
    }

    //    發布消息
    public void publishMessage(String pubTopic, String message, int qos) {
        if (null != mqttClient && mqttClient.isConnected()) {
            logger.info("發布消息   " + mqttClient.isConnected());
            logger.info("id:" + mqttClient.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = mqttClient.getTopic(pubTopic);
            if (null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if (!publish.isComplete()) {
                        logger.info("消息發布成功");
                    } else {
                        logger.info("消息發布失敗");
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        } else {
            reConnect();
        }

    }

    //    重新連接
    public void reConnect() {
        if (null != mqttClient) {
            if (!mqttClient.isConnected()) {
                if (null != mqttConnectOptions) {
                    try {
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                } else {
                    logger.info("MQTT 重連 mqttConnectOptions is null");
                }
            } else {
                logger.info("MQTT 重連 mqttClient is null or connect");
            }
        } else {
            init(url, clientId);
        }
    }

    //    訂閱主題
    public void subTopic(String topic, int qos) {
        if (null != mqttClient && mqttClient.isConnected()) {
            try {
                mqttClient.subscribe(topic, qos);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("mqttClient is error");
        }
    }

    //    清空主題
    public void cleanTopic(String topic) {
        if (null != mqttClient && !mqttClient.isConnected()) {
            try {
                mqttClient.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.info("mqttClient is error");
        }
    }
}
View Code

 

測試demo

package com.cjcx.inter.apimall.beijing.aibee;

import org.apache.commons.lang3.StringUtils;

public class ClientTest {

    public static void main(String[] args) {
        AibeeMqttClient aibeeMqttClient = new AibeeMqttClient();
        aibeeMqttClient.init("tcp://127.0.0.1:1883", "123");

        aibeeMqttClient.subTopic("/topic", 0);
        aibeeMqttClient.subTopic("house/room/a", 1);
        aibeeMqttClient.subTopic("house/room/b", 1);

        java.util.Scanner sc = new java.util.Scanner(System.in);
        int i = 1;
        StringBuilder sb = new StringBuilder();
        sb.append("使用指南:\r\n");
        sb.append(i++ + "、發送消息,輸入 'send topic message'.\r\n");
        sb.append(i++ + "、退出程序,輸入 'exit'.\r\n");

        System.out.println(sb);

        String line = sc.nextLine(); // 這個就是用戶輸入的數據
        while (true) {
            if ("exit".equalsIgnoreCase(line)) {
                System.out.println("Thanks for using! bye bye.");
                break;
            } else if ("?".equals(line)) {
                System.out.println(sb);
            }
            System.out.println("line:" + line);

            processCommand(aibeeMqttClient, line);

            line = sc.nextLine(); // 這個就是用戶輸入的數據
        }

        // aibeeMqttClient.publishMessage("/topic", "i am comming...", 1);
        aibeeMqttClient.closeConnect();
    }

    private static void processCommand(AibeeMqttClient aibeeMqttClient, String line) {
        try {
            if (StringUtils.isBlank(line)) {
                return;
            }

            String[] arr = line.split(" ");//StrUtil.split(line, " ");
            String command = arr[0];
            System.out.println("command:" + command);
            if ("send".equalsIgnoreCase(command)) {
                aibeeMqttClient.publishMessage(arr[1], arr[2], 1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
View Code


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM