記錄一下SpringBoot+MQTT的使用


   <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.3</version>
        </dependency>
  mqtt:
    urls: tcp://127.0.0.1:1883
    clientId: WebKn9wu
    username: king
    password: 12341234
    completionTimeout: 30000
/**
 * Mqtt 屬性配置類
 */
@Component
@Configuration
@Setter
@Getter
public class MqttConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.urls}")
    private String urls;

    @Value("${spring.mqtt.clientId}")
    private String clientId;
    /** automaticReconnection 是否自動重連 默認false */
    private boolean automaticReconnection = true;
    /**
     *  連接超時
     */
    @Value("${spring.mqtt.completionTimeout}")
    private int completionTimeout;
}
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
 * Mqtt 創建連接 消息發布 訂閱
 */
@Component
public class MyMqttClient {

    private static final Logger logger = LoggerFactory.getLogger(MyMqttClient.class);
    @Autowired
    private MqttConfig config;
    @Autowired
    private PushCallback pushCallback;

    @Autowired
    private MqttLogService mqttLogService;

    private org.eclipse.paho.client.mqttv3.MqttClient client;
    private static MemoryPersistence memoryPersistence = null;
    /** options MQTT Cleint連接配置 */
    private MqttConnectOptions options;

    public org.eclipse.paho.client.mqttv3.MqttClient getClient() {
        return client;
    }

    public void setClient(org.eclipse.paho.client.mqttv3.MqttClient client) {
        this.client = client;
    }

    /**
     * connect : 客戶端連接
     */
    @PostConstruct
    public void connect() {
        try {
            //            設置持久化方式
            memoryPersistence = new MemoryPersistence();
            this.client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getUrls(), config.getClientId(), memoryPersistence);
            options = new MqttConnectOptions();
            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
            // `重連清空session`
            options.setCleanSession(true);
            // 設置連接的用戶名
            options.setUserName(config.getUsername());
            // 設置連接的密碼
            options.setPassword(config.getPassword().toCharArray());
            // 設置超時時間 單位為秒
            options.setConnectionTimeout(config.getCompletionTimeout());
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線,但這個方法並沒有重連的機制
            options.setKeepAliveInterval(30);
            // 是否重連
            options.setAutomaticReconnect(true);
            // options.setMaxInflight(config.getMaxInflight());
            this.setClient(client);
            client.setCallback(pushCallback);
            // 如果是new的回調 可以把 業務接口傳過去
            // client.setCallback(new DefaultCallback(this, mqttLogService));
            if (null !=client){
                client.connect(options);
            }
        } catch (MqttException e) {
            logger.error(e.getCause().getLocalizedMessage());
            e.printStackTrace();
        }
    }

    /**
     * publish : 發布
     *
     * @param qos         連接方式
     * @param retained    是否保留
     * @param topic       主題
     * @param pushMessage 消息體
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = this.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.info("topic not exist");
        }
         MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     * @param topic 主題
     * @param qos 消息質量 0 1 2
     */
    public void subscribe(String topic, int qos) {
        if (null != client && client.isConnected()) {
            logger.info("開始訂閱主題" + topic);
            try {
                this.getClient().subscribe(topic, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }

    /**
     * 取消訂閱
     * @param topic 要取消的主題
     */
    public void cleanTopic(String topic) {
        if (null != client && !client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            System.out.println("mqttClient is error");
        }
    }
    /**關閉連接*/
    @PreDestroy
    public void closeConnect() {
        //關閉存儲方式
        if (null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.error("memoryPersistence is null");
        }

//        關閉連接
        if (null != client) {
            if (client.isConnected()) {
                try {
                    client.disconnect();
                    client.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    logger.error("關閉連接異常={}",e.getMessage());
                }
            } else {
                logger.error("關閉連接異常={}","mqttClient is not connect");
            }
        } else {
            logger.error("關閉連接異常={}","mqttClient is null");
        }
    }
}

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/***
 * Mqtt 消息返回指定接口
 */
@Slf4j
@Component
public class PushCallback implements MqttCallbackExtended {
    private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
    @Autowired
    private MyMqttClient client;

    @Autowired
    private MqttConfig mqttConfig;

    // 這是保存數據的接口
    @Autowired
    private MqttLogService mqttLogService;

/**
     * 丟失了對服務端的連接后觸發的回調
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        new Thread(() -> {
            logger.info("[MQTT] 連接斷開,1s之后嘗試重連...", throwable);
            boolean reconnecting = false;
            for (int i = 1; i < 5; i++) {
                try {
                    if (client.getClient().isConnected()) {
                        break;
                    }
                    Thread.sleep(1000);
                    boolean needReconnect = !mqttConfig.isAutomaticReconnection() && !reconnecting && !client.getClient().isConnected();
                    if (needReconnect) {
                        logger.info("開始重連...");
                        client.getClient().reconnect();
                        reconnecting = true;
                    }
                } catch (Exception e) {
                    logger.info("mqtt重連失敗,繼續重連,reason:" + e.getMessage(), e);
                    continue;
                }
            }
        }).start();
    }

  /**
     * 應用收到消息后觸發的回調
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("=====接收消息主題 : " + topic);
        String msg = new String(mqttMessage.getPayload());
        log.info("=====接收消息內容 : " + msg);
        String[] tsplit = topic.split("/");
        // 調用 `MqttLogService` 內的存儲方法

    }

 /**
     * 消息發布者消息發布完成產生的回調
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

 /**
     * 連接完成 觸發回調
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        // 訂閱間隔自動上報數據
        client.subscribe("$king/devices/xxx/#", 0);
        // 訂閱設備信息
        client.subscribe("$king/devices/xxx/#", 0);
        log.info("mqtt連接成功,客戶端ID:" + mqttConfig.getUrls());
        // 對連接成功 保存日志
    }

}

第二種Callback

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/***
 * Mqtt 消息返回指定接口
 */
@Slf4j
@Component
public class DefaultCallback implements MqttCallbackExtended {
    private static final Logger logger = LoggerFactory.getLogger(DefaultCallback.class);

    private MqttLogService mqttLogService;

    private MyMqttClient client;

    public DefaultCallback(){
        
    }
    public DefaultCallback(MyMqttClient pushClient, MqttLogService service) {
        super();
        this.client = pushClient;
        this.mqttLogService = service;
    }

    /**
     * 丟失了對服務端的連接后觸發的回調
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        new Thread(() -> {
            logger.info("[MQTT] 連接斷開,1s之后嘗試重連...", throwable);
            MqttClient mqttClient = client.getClient();
            boolean reconnecting = false;
            for (int i = 1; i < 5; i++) {
                try {
                    if (mqttClient.isConnected()) {
                        break;
                    }
                    Thread.sleep(1000);
                    boolean needReconnect = !client.config.isAutomaticReconnection() && !reconnecting && !mqttClient.isConnected();
                    if (needReconnect) {
                        logger.info("開始重連...");
                        mqttClient.reconnect();
                        reconnecting = true;
                    }
                } catch (Exception e) {
                    logger.info("mqtt重連失敗,繼續重連,reason:" + e.getMessage(), e);
                    continue;
                }
            }
        }).start();
    }


    /**
     * 應用收到消息后觸發的回調
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("=====接收消息主題 : " + topic);
        String msg = new String(mqttMessage.getPayload());
        log.info("=====接收消息內容 : " + msg);

        String[] tsplit = topic.split("/");
        // mqttLogService.saveMqtt(mqttLog);
        log.info("-----對數據進行持久化");
    }

    /**
     * 消息發布者消息發布完成產生的回調
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    /**
     * 連接完成 觸發回調
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
         // 訂閱間隔自動上報數據
        client.subscribe("$king/devices/xxx/#", 0);
        // 訂閱設備信息
        client.subscribe("$king/devices/xxx/#", 0);
        log.info("mqtt連接成功,客戶端ID:" + client.config.getUrls());
        // 對連接成功 保存日志
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM