springboot集成MQTT實現消息收發,斷線重連


開始研究JAVA及MQTT,寫下該文章方便使用參考

1、添加依賴  pom.xml

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.35</version>
        </dependency>

2、mqtt配置  application.yml

# Spring配置
spring:
  # MQTT
  mqtt:
    # 服務器連接地址,如果有多個,用逗號隔開
    host: tcp://192.168.1.1:1883
    # 連接服務器默認客戶端ID
    clientId: mqtt_jlsId_
    # 默認的消息推送主題,實際可在調用接口時指定
    topic: topic_a,topic_b,topic_c
    # 用戶名
    username:
    # 密碼
    password:
    # 連接超時
    timeout: 30
    # 心跳
    keepalive: 30

3、獲取配置信息工具類

當然可以使用注解 @Value獲取配置信息
這里是網上查的資料直接使用以下類來獲取

package com.projuct.junlaishun.mqtt;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * 獲取配置信息
 **/
public class PropertiesUtil {

    public static String MQTT_HOST;
    public static String MQTT_CLIENT_ID;
    public static String MQTT_USER_NAME;
    public static String MQTT_PASSWORD;
    public static String MQTT_TOPIC;
    public static Integer MQTT_TIMEOUT;
    public static Integer MQTT_KEEP_ALIVE;

    /**
     *  mqtt配置
     */
    static {
        Properties properties = loadMqttProperties();
        MQTT_HOST = properties.getProperty("host");
        MQTT_CLIENT_ID = properties.getProperty("clientId")+Math.random();
        MQTT_USER_NAME = properties.getProperty("username");
        MQTT_PASSWORD = properties.getProperty("password");
        MQTT_TOPIC = properties.getProperty("topic");
        MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));
        MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));
    }

    private static Properties loadMqttProperties() {
        InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");
        Properties properties = new Properties();
        try {
            properties.load(inputstream);
            return properties;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (inputstream != null) {
                    inputstream.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

4、MQTT客戶端  MqttConsumer類

package com.projuct.junlaishun.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;


@Component
public class MqttConsumer implements ApplicationRunner {

    private static MqttClient client;

    @Override
    public void run(ApplicationArguments args) {
        System.out.println("初始化並啟動mqtt......");
        this.connect();
    }

    /**
     * 連接mqtt服務器
     */
    private void connect() {
        try {
            // 1 創建客戶端
            getClient();
            // 2 設置配置
            MqttConnectOptions options = getOptions();
            String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");
            // 3 消息發布質量
            int[] qos = getQos(topic.length);
            // 4 最后設置
            create(options, topic, qos);
        } catch (Exception e) {
            System.out.println("mqtt連接異常:" + e);
        }
    }

    /**
     *  創建客戶端  --- 1 ---
     */
    public void getClient() {
        try {
            if (null == client) {
                client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
            }

            System.out.println("創建mqtt客戶端:" );
        } catch (Exception e) {
            System.out.println("創建mqtt客戶端異常:\" + e:" );
        }
    }

    /**
     *  生成配置對象,用戶名,密碼等  --- 2 ---
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(PropertiesUtil.MQTT_USER_NAME);
        options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
        // 設置超時時間
        options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
        // 設置會話心跳時間
        options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
        // 是否清除session
        options.setCleanSession(false);
        System.out.println("--生成mqtt配置對象");
        return options;
    }

    /**
     *  qos   --- 3 ---
     */
    public int[] getQos(int length) {

        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT協議中有三種消息發布服務質量:
             *
             * QOS0: “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
             * QOS1: “至少一次”,確保消息到達,但消息重復可能會發生。
             * QOS2: “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果,資源開銷大
             */
            qos[i] = 1;
        }
        System.out.println("--設置消息發布質量");
        return qos;
    }

    /**
     *  裝載各種實例和訂閱主題  --- 4 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            System.out.println("--添加回調處理類");
            client.connect(options);
        } catch (Exception e) {
            System.out.println("裝載實例或訂閱主題異常:" + e);
        }
    }
    /**
     * 訂閱某個主題
     *
     * @param topic
     * @param qos
     */
    public static void subscribe(String topic, int qos) {
        try {
            System.out.println("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 發布,非持久化
     *
     *  qos根據文檔設置為1
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        publish(1, false, topic, msg);
    }

    /**
     * 發布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            System.out.println("topic:" + topic + " 不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();

            if (!token.isComplete()) {
                System.out.println("消息發送成功");
            }
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

5、mqtt回調處理類

package com.projuct.junlaishun.mqtt;

import org.eclipse.paho.client.mqttv3.*;

import java.util.Arrays;

/**
 * mqtt回調處理類
 */

public class MqttConsumerCallback implements MqttCallbackExtended {

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topic;
    private int[] qos;

    public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
        this.client = client;
        this.options = options;
        this.topic = topic;
        this.qos = qos;
    }

    /**
     * 斷開重連
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("MQTT連接斷開,發起重連......");
        try {
            if (null != client && !client.isConnected()) {
                client.reconnect();
                System.out.println("嘗試重新連接");
            } else {
                client.connect(options);
                System.out.println("嘗試建立新連接");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 接收到消息調用令牌中調用
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + Arrays.toString(topic));
    }

    /**
     * 消息處理
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String msg = new String(message.getPayload());
            System.out.println("收到topic:" + topic + " 消息:" + msg);
            System.out.println("收到消息后執行具體的業務邏輯操作,比如將消息存儲進數據庫");
        } catch (Exception e) {
            System.out.println("處理mqtt消息異常:" + e);
        }
    }

    /**
     * mqtt連接后訂閱主題
     */
    @Override
    public void connectComplete(boolean b, String s) {
        try {
            if (null != topic && null != qos) {
                if (client.isConnected()) {
                    client.subscribe(topic, qos);
                    System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
                    System.out.println("--訂閱主題::" + Arrays.toString(topic));
                } else {
                    System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
                }
            }
        } catch (Exception e) {
            System.out.println("mqtt訂閱主題異常:" + e);
        }
    }
}  

 

6、編寫測試方法MqttController

package com.projuct.junlaishun.mqtt;


import com.alibaba.fastjson.JSONObject;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/mqtt")
public class MqttController {
    /**
     * 推送消息
     */
    @ResponseBody
    //@GetMapping(value = "/push")
    @RequestMapping("sendMqttMessage")
    public JSONObject push(String topic,
                       String message) {
        MqttConsumer.publish(topic, message);
        JSONObject res = new JSONObject();
        res.put("code",0);
        res.put("msg","測試發布主題成功");
        return res;
    }
}

 測試記錄截圖

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM