MQTT、SpringBoot


SpringBoot 整合 Mqtt (發布、訂閱)

文中部分內容借鑒了其他作者,在此感謝提供方法的各位作者。

1、MQTT 協議介紹(簡單介紹)

1.1、Mqtt協議中的三個角色:

實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。

1.2、mqtt傳輸的消息內容

Topic(主題),可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload)

payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。

1.3、mqtt客戶端

一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:

A、發布其他客戶端可能會訂閱的信息;

B、訂閱其它客戶端發布的消息;

C、退訂或刪除應用程序的消息;

D、斷開與服務器連接

1.4、mqtt服務端器

MQTT服務器以稱為"消息代理"(Broker),可以是一個應用程序或一台設備。它是位於消息發布者和訂閱者之間,它可以:

A、接受來自客戶的網絡連接;

B、接受客戶發布的應用信息;

C、處理來自客戶端的訂閱和退訂請求;

D、向訂閱的客戶轉發應用程序消息。

1.5、mqtt的訂閱、主題、會話

1.5.1、訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。

1.5.2、會話(Session)

每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在於一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。

1.5.3、主題名(Topic Name)

連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。

1.5.4、主題篩選器(Topic Filter)

一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。

1.5.5、負載(Payload)

消息訂閱者所具體接收的內容。

2、工具介紹(介紹本文中使用的)

2.1、代理服務器:Emqx

EMQ X 是基於 Erlang/OTP 語言平台開發,支持大規模連接和分布式集群,發布訂閱模式的開源 MQTT 消息服務器。

Linux docker 安裝啟動命令:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

安轉后訪問http://ip:18083/

默認用戶名/密碼: admin/public

訪問如下表示安裝成功:

2.2、MQTT.fx

MQTT.fx 是目前主流的mqtt客戶端,可以快速驗證是否可以與IoT Hub 服務交流發布或訂閱消息。設備將當前所處的狀態作為MQTT主題發送給IoT Hub,每個MQTT主題topic具有不同等級的名稱,” MQTT代理服務器將接收到的主題topic發送給給所有訂閱的客戶端。
下載連接:http://www.jensd.de/apps/mqttfx/1.7.1/

參考:https://blog.csdn.net/tiantang_1986/article/details/85101366

3、環境簡介:

  3.1、SpringBoot-version:2.1.2
  3.2、Maven-version:3.6.1
  3.3、開發工具: IDEA 2019.3
  3.4、JDK-version:1.8

4、MQTT 發布

4.1、新建工程:MQTT-SpringBoot

4.2、pom

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>

    <!--MQTT-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>4.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>
    </dependencies>

4.3、配置文件:

server:
  port: 8090

spring:
  application:
    name: MQTT-SpringBoot

mqtt:
  username: admin
  password: public
# 推送信息的連接地址,如果有多個,用逗號隔開,如:tcp://ip:1883,tcp://ip:1883
  url: tcp://ip:1883,tcp://ip:1883
  sender:
#    默認發送的主題
    defaultTopic: goods
#    clientid
    clientId: mqtttest

4.4、代碼

4.4.1、modal

package com.riest.modal;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * ClassName:Send
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:06
 */
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Send {
    private String topic;
    private String key;
    private String value;
}

4.4.2、config

package com.riest.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

/**
 * ClassName:MqttConfig
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:08
 */
@Configuration
public class MqttConfig {

    /**
     * 發布的bean名稱
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

    /**
     * 客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的"遺囑"消息
     */
    private static final byte[] WILL_DATA;
    static {
        WILL_DATA = "offline".getBytes();
    }

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.sender.clientId}")
    private String clientId;

    @Value("${mqtt.sender.defaultTopic}")
    private String defaultTopic;

    /**
     * MQTT連接器選項
     */
    @Bean
    public MqttConnectOptions getSenderMqttConnectOptions(){
        MqttConnectOptions options=new MqttConnectOptions();
        // 設置連接的用戶名
        if(!username.trim().equals("")){
            options.setUserName(username);
        }
        // 設置連接的密碼
        options.setPassword(password.toCharArray());
        // 設置連接的地址
        options.setServerURIs(StringUtils.split(url, ","));
        // 設置超時時間 單位為秒
        options.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
        // 但這個方法並沒有重連的機制
        options.setKeepAliveInterval(20);
        // 設置 "遺囑" 消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的"遺囑"消息。
        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }

    /**
     * MQTT客戶端
     */
    @Bean
    public MqttPahoClientFactory senderMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getSenderMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(生產者)
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息處理器(生產者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, senderMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }
}

4.4.3、service

package com.riest.service;

import com.riest.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * ClassName:SendInterface
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:08
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface ISend {
    /**
     * 發送信息到MQTT服務器
     *
     * @param data 發送的文本
     */
    void sendToMqtt(String data);

    /**
     * 發送信息到MQTT服務器
     *
     * @param topic 主題
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    String payload);

    /**
     * 發送信息到MQTT服務器
     *
     * @param topic 主題
     * @param qos 對消息處理的幾種機制。
     * 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。
     * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。
     * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    @Header(MqttHeaders.QOS) int qos,
                    String payload);
}

4.4.4、controller

package com.riest.controller;

import com.alibaba.fastjson.JSONObject;
import com.riest.modal.Send;
import com.riest.service.ISend;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * ClassName:SendController
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:11
 */
@RestController
@Slf4j
public class SendController {

    @Autowired
    private ISend iMqttSender;

    /**
     *  發送自定義消息內容(使用默認主題)
     * @param data
     */
    @GetMapping(value = "/send")
    public void test1(Send data) {
        JSONObject json = (JSONObject) JSONObject.toJSON(data);
        log.error("----->{}","mqtt 消息發布,使用默認主題發布:"+json.toJSONString());
        iMqttSender.sendToMqtt(json.toJSONString());
    }

    /**
     *  發送自定義消息內容,且指定主題
     * @param data
     */
    @RequestMapping("/send/topic")
    public void test2(Send data) {
        JSONObject json = (JSONObject) JSONObject.toJSON(data);
        log.error("----->{}","mqtt 消息發布,指定主題:"+json.toJSONString());
        iMqttSender.sendToMqtt(data.getTopic(), json.toJSONString());

    }
}

4.4.5、主啟動

package com.riest;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * ClassName:MqttApplication
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:05
 */
@SpringBootApplication
public class MqttApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttApplication.class,args);
    }
}

4.5、工具設置

4.5.1、mqtt.fx


4.5.2、EMQX

4.6、測試

4.6.1 、調用(該請求時使用默認主題) http://localhost:8090/send?key=send&value=1234567

4.6.2、mqtt.fx收到訂閱的消息

4.6.3 、emqx查看

4.6.4、調用(指定主題發布)http://localhost:8090/send/topic?key=send&value=1234567&topic=dgj-test

4.6.5、mqtt.fx收到訂閱的消息

4.6.6 、emqx查看

5、MQTT 訂閱

5.1 、config

package com.riest.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;

/**
 * ClassName:MQTTConnect
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:46
 */
public class MQTTConnect {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    /**
     *  把配置里的 cleanSession 設為false,客戶端掉線后 服務器端不會清除session,
     *  當重連后可以接收之前訂閱主題的消息。當客戶端上線后會接受到它離線的這段時間的消息,
     *  如果短線需要刪除之前的消息則可以設置為true
     *
     * @return
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setConnectionTimeout(10);
        //設置心跳
        options.setKeepAliveInterval(20);
        return options;
    }

    public MqttConnectOptions getOptions(MqttConnectOptions options) {

        options.setCleanSession(false);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        return options;
    }
}

5.2、sub端

package com.riest.service.sub;

import com.riest.config.MQTTConnect;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * ClassName:MqttSub
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:45
 */
@Component
public class MqttSub {
    public static final String HOST = "tcp://122.51.240.115:1883";
    private static final String clientid = "testrece";
    private String topic = "goods";
    public MqttClient client;
    private MQTTConnect mqttConnect = new MQTTConnect();	

    /**true為非持久訂閱
     *
     *  方法實現說明 斷線重連方法,如果是持久訂閱,重連是不需要再次訂閱,如果是非持久訂閱,重連是需要重新訂閱主題 取決於options.setCleanSession(true);
     *
     *  就是這里的clientId,服務器用來區分用戶的,不能重復,clientId不能和發布的clientId一樣,否則會出現頻繁斷開連接和重連的問題
     *  不僅不能和發布的clientId一樣,而且也不能和其他訂閱的clientId一樣,如果想要接收之前的離線數據,這就需要將client的 setCleanSession
     *  設置為false,這樣服務器才能保留它的session,再次建立連接的時候,它就會繼續使用這個session了。 這時此連接clientId 是不能更改的。
     *  但是其實還有一個問題,就是使用熱部署的時候還是會出現頻繁斷開連接和重連的問題,可能是因為剛啟動時的連接沒斷開,然后熱部署的時候又進行了重連,重啟一	  *   下就可以了
     *  System.currentTimeMillis()
     * @throws MqttException
     */
    public void connect() throws MqttException {
        //防止重復創建MQTTClient實例
        if (client==null) {
            // MemoryPersistence設置clientid的保存形式,默認為以內存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            //如果是訂閱者則添加回調類,發布不需要
            client.setCallback(new PubCallBack(MqttSub.this));
        }
        MqttConnectOptions options = mqttConnect.getOptions();
        //判斷攔截狀態,這里注意一下,如果沒有這個判斷,是非常坑的
        if (!client.isConnected()) {
            client.connect(options);
            System.out.println("連接成功");
        }else {
            client.disconnect();
            client.connect(mqttConnect.getOptions(options));
            System.out.println("連接成功");
        }
    }

    public void init() {
        try {
            connect();
            subscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題,qos默認為0
     *
     * @param topic .
     */
    public void subscribe(String topic) {
        subscribe(topic,0);
    }

    /**
     * 訂閱某個主題
     *
     * @param topic .
     * @param qos .
     */
    public void subscribe(String topic, int qos) {

        try {
            client.subscribe(topic,0);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

package com.riest.service.sub;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;

/**
 * ClassName:PubCallBack
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:48
 */
@Slf4j
public class PubCallBack implements MqttCallback {
    @Value("${mqtt.username}")
    private  String username;

    @Value("${mqtt.password}")
    private  String password;

    @Value("${mqtt.url}")
    private  String url;

    @Value("${mqtt.receiver.clientId}")
    private  String clientId;

    @Value("${mqtt.receiver.defaultTopic}")
    private  String defaultTopic;


    private MqttSub mqttSub;


    public  PubCallBack(MqttSub subsribe) throws MqttException {
        this.mqttSub = subsribe;
    }


    @Override
    public void connectionLost(Throwable cause) {
        // 連接丟失后,一般在這里面進行重連
        log.error("---------------------連接斷開,可以做重連");

        while (true){
            try {
                //如果沒有發生異常說明連接成功,如果發生異常,則死循環
                Thread.sleep(1000);
                mqttSub.init();
                break;
            }catch (Exception e){
//                e.printStackTrace();
                continue;
            }
        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }



    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息會執行到這里面
        String result = new String(message.getPayload(),"UTF-8");
        System.out.println("接收消息主題 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息內容 : " + result);
        //這里可以針對收到的消息做處理
    }
}

5.3、主啟動

package com.riest;

import com.riest.service.sub.MqttSub;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

/**
 * ClassName:MqttApplication
 * Describe:
 * Author:DGJ
 * Data:2020/10/29 10:05
 */
@SpringBootApplication
public class MqttApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttApplication.class,args);
    }

    @Autowired
    private MqttSub mqttSub;

    @PostConstruct
    public void consumeMqttClient() throws MqttException {
        mqttSub.init();           // 訂閱 消息
    }
}

6、測試訂閱

6.1、mqtt.fx

6.2、emqx

6.3、程序日志

7、最終項目結構


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM