springboot整合mqtt實現消息發送和消費,以及客戶端斷線重連之后的消息恢復


參考資料:

http://mqtt.p2hp.com/mqtt311

https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/02-ControlPacketFormat.html

https://blog.csdn.net/anxianfeng55555/article/details/80908795

MQTT簡介

​ MQTT是一種基於發布/訂閱模式的輕量級通訊協議,該協議構建在TCP/IP協議上。 MQTT最大的有點在於可以以極少的代碼和有限的帶寬,為遠程設備提供實時可靠的消息服務。做為一種低開銷、低帶寬占用的即時通訊協議,MQTT在物聯網、小型設備、移動應用等方面有廣泛應用。

特點

  • 開放消息協議,簡單易實現
  • 發布訂閱模式,一對多消息發布
  • 基於TCP/IP網絡連接,提供有序,無損,雙向連接
  • 2字節固定報頭,2字節心跳報文,最小化傳輸開銷和協議交換,有效減少網絡流量
  • 消息QoS支持,可靠傳輸保證

應用

  • 物聯網M2M通信,物聯網大數據采集
  • Android消息推送,WEB消息推送
  • 智能硬件、智能家具、智能電器
  • 車聯網通信,電動車站樁采集
  • 智慧城市、遠程醫療、遠程教育
  • 電力、石油與能源等行業市場

MQTT控制報文的結構

​ MQTT通過交換一些預定義的MQTT控制報文來工作,每條MQTT命令消息的消息頭都包含一個固定的報頭,有些消息會攜帶一個可變報文頭和一個負荷。消息格式如下:

|固定包頭,存在於所有MQTT控制包
|可變包頭,存在於某些MQTT控制包
|載荷,存在於某些MQTT控制包

固定報文頭(Fixed Header)

​ MQTT固定報文頭最少有兩個字節,第一個字節包含消息類型(Message Type)和QoS級別等標志位。第二個字節開始是剩余長度字段,該長度是后面的可變報文頭加消息負載的總長度,該字段最多允許四個字節。

​ 剩余長度使用了一種可變長度的結構來編碼,這種結構使用單一字節表示0-127的值。大於127的值如下處理。每個字節的低7位用來編碼數據,最高位用來表示是否還有后續字節。因此每個字節可以編碼128個值,再加上一個標識位。剩余長度最多可以用四個字節來表示。

​ 例如十進制的數字64可以被編碼成一個單獨的字節,十進制為64,八進制為0x40。十進制數字321(=65+2×128)被編碼為兩個字節,低位在前。第一個字節是65+128 = 193。注意最高位的128表示后面至少還有一個字節。第二個字節是2,表示2*127。(翻譯注:321 = 11000001 00000010,第一個字節是“標識符后面還有一個字節”+65,第二個字節是“標識符后面沒有字節了”+256)。

可變報文頭(Variable Header)

​ 可變報文頭主要包含協議名、協議版本、連接標志(Connect Flags)、心跳間隔時間(Keep Alive timer)、連接返回碼(Connect Return Code)、主題名(Topic Name)等

有效負荷(Payload)

​ 可以理解為消息主題(body)

​ 當MQTT發送的消息類型是CONNECT(連接)、PUBLISH(發布)、SUBSCRIBE(訂閱)、SUBACK(訂閱確認)、則會帶有負荷。

​ 各種類型消息的控制報文參考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html

MQTT的消息類型(Message Type)(控制報文類型)

名字 報文流動方向 描述
Reserved 0 禁止 保留
CONNECT 1 客戶端到服務端 客戶端請求連接服務端
CONNACK 2 服務端到客戶端 連接報文確認
PUBLISH 3 兩個方向都允許 發布消息
PUBACK 4 兩個方向都允許 QoS 1消息發布收到確認
PUBREC 5 兩個方向都允許 發布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務端 客戶端訂閱請求
SUBACK 9 服務端到客戶端 訂閱請求報文確認
UNSUBSCRIBE 10 客戶端到服務端 客戶端取消訂閱請求
UNSUBACK 11 服務端到客戶端 取消訂閱報文確認
PINGREQ 12 客戶端到服務端 心跳請求
PINGRESP 13 服務端到客戶端 心跳響應
DISCONNECT 14 客戶端到服務端 客戶端斷開連接
Reserved 15 禁止 保留

消息質量(QoS)

  • QoS 0:最多分發一次。消息的傳遞完全依賴於底層的TCP/IP協議,協議里沒有定義應答和重試,消息要么只會到達服務端一次,要么根本沒有到達。

  • QoS 1:至少分發一次。服務器的消息接收由PUBACK消息進行確認,如果通信鏈路或發送設備異常,或者指定時間內沒有收到確認消息,發送端會重發這條在消息頭中設置了DUP位的消息。QoS 2:只分發一次。這是最高級別的消息傳遞,消息丟失和重復都是不可接受的,使用這個服務質量等級會有額外的開銷。

    通過下面的例子可以更深刻的理解上面三個傳輸質量等級。
    比如目前流行的共享單車智能鎖,智能鎖可以定時使用QoS level 0質量消息請求服務器,發送單車的當前位置,如果服務器沒收到也沒關系,反正過一段時間又會再發送一次。之后用戶可以通過App查詢周圍單車位置,找到單車后需要進行解鎖,這時候可以使用QoS level 1質量消息,手機App不斷的發送解鎖消息給單車鎖,確保有一次消息能達到以解鎖單車。最后用戶用完單車后,需要提交付款表單,可以使用QoS level 2質量消息,這樣確保只傳遞一次數據,否則用戶就會多付錢了。

Springboot整合MQTT實現消息發布和訂閱

一、在Linux上搭建MQTT服務

1.1、打開EMQ官網:https://www.emqx.io/cn/products/broker

1.2、點擊開始試用

1.3、選擇服務器對應版本

1.4、復制下載命令到ssh工具中執行

​ 下載完成

1.5、下載完成后執行安裝命令

1.6、安裝成功后執行命令:

sudo emqx start

出現以下信息表示啟動成功

1.7、測試

​ 瀏覽器訪問ip:18083進入管理界面,默認賬號為admin,密碼為public

二、MQTT服務搭建完成后使用Springboot整合MQTT協議

2.1、創建一個maven項目

2.2、在父工程下創建一個Springboot項目作為消息提供者,導入以下依賴

<!--mqtt相關依賴-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.3、修改配置文件

spring:
  application:
    name: provider
  #MQTT配置信息
  mqtt:
    #MQTT服務端地址,端口默認為1883,如果有多個,用逗號隔開,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復)
    client:
      id: provider-id
    #MQTT默認的消息推送主題,實際可在調用接口時指定
    default:
      topic: topic
server:
  port: 8081

2.4、消息發布者客戶端配置

package com.xct.mqttprovider.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: xct
 * @Date: 2021/7/30 15:32
 * @Description:
 */
@Configuration
@Slf4j
public class MqttProviderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對象
     */
    private MqttClient client;

    /**
     * 客戶端連接服務端
     * @author xct
     * @param
     * @return void
     * @date 2021/7/30 16:01
     */

    public void connect(){
        try {
            //創建MQTT客戶端對象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
            //設置為true表示每次連接到服務端都是以新的身份
            options.setCleanSession(true);
            //設置連接用戶名
            options.setUserName(username);
            //設置連接密碼
            options.setPassword(password.toCharArray());
            //設置超時時間,單位為秒
            options.setConnectionTimeout(100);
            //設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
            //設置回調
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主題目的地,用於發布/訂閱消息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一種機制來跟蹤消息的傳遞進度。
        //用於在以非阻塞方式(在后台運行)執行發布時跟蹤消息的傳遞進度
        MqttDeliveryToken token;
        try {
            //將指定消息發布到主題,但不等待消息傳遞完成。返回的token可用於跟蹤消息的傳遞狀態。
            //一旦此方法干凈地返回,消息就已被客戶端接受發布。當連接可用時,將在后台完成消息傳遞。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

2.5、消息發布客戶端回調

package com.xct.mqttprovider.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @Author: xct
 * @Date: 2021/7/30 16:00
 * @Description:
 */
@Configuration
public class MqttProviderCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 與服務器斷開連接的回調
     * @author xct
     * @param throwable
     * @return void
     * @date 2021/7/30 16:19
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println(clientId + "與服務器斷開連接");
    }

    /**
     * 消息到達的回調
     * @author xct
     * @param s
     * @param mqttMessage
     * @return void
     * @date 2021/7/30 16:19
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    /**
     * 消息發布成功的回調
     * @author xct
     * @param iMqttDeliveryToken
     * @return void
     * @date 2021/7/30 16:20
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        IMqttAsyncClient client = iMqttDeliveryToken.getClient();
        System.out.println(client.getClientId() + "發布消息成功!");
    }
}

2.6、創建控制器測試發布消息

package com.xct.mqttprovider.controller;

import com.xct.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @Author: xct
 * @Date: 2021/7/30 16:26
 * @Description:
 */
@Controller
public class SendController {

    @Autowired
    private MqttProviderConfig providerClient;

    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            providerClient.publish(qos,retained,topic,message);
            return "發送成功";
        }catch (Exception e){
            e.printStackTrace();
            return "發送失敗";
        }
    }
}

2.7、在父工程下創建一個Springboot項目作為消息消費者,導入以下依賴

<!--mqtt相關依賴-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.8、配置文件

spring:
  application:
    name: consumer
  #MQTT配置信息
  mqtt:
    #MQTT服務端地址,端口默認為1883,如果有多個,用逗號隔開,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復)
    client:
      id: consumer-id
    #MQTT默認的消息推送主題,實際可在調用接口時指定
    default:
      topic: topic
server:
  port: 8082

2.9、消費者客戶端配置

package com.xct.mqttconsumer.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @Author: xct
 * @Date: 2021/7/30 17:06
 * @Description:
 */
@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對象
     */
    private MqttClient client;

    /**
     * 在bean初始化后連接到服務器
     * @author xct
     * @param
     * @return void
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務端
     * @author xct
     * @param
     * @return void
     * @date 2021/7/30 16:01
     */
    public void connect(){
        try {
            //創建MQTT客戶端對象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設置為false表示服務器會保留客戶端的連接記錄,客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
            //設置為true表示每次連接到服務端都是以新的身份
            options.setCleanSession(true);
            //設置連接用戶名
            options.setUserName(username);
            //設置連接密碼
            options.setPassword(password.toCharArray());
            //設置超時時間,單位為秒
            options.setConnectionTimeout(100);
            //設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
            //設置回調
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //訂閱主題
            //消息等級,和主題數組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
            int[] qos = {1,1};
            //主題
            String[] topics = {"topic1","topic2"};
          	//訂閱主題
            client.subscribe(topics,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開連接
     * @author xct
     * @param
     * @return void
     * @date 2021/8/2 09:30
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 訂閱主題
     * @author xct
     * @param topic
     * @param qos
     * @return void
     * @date 2021/7/30 17:12
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

3.0、 消息消費者客戶端回調

package com.xct.mqttconsumer.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @Author: xct
 * @Date: 2021/7/30 17:06
 * @Description:
 */
public class MqttConsumerCallBack implements MqttCallback {
    /**
     * 客戶端斷開連接的回調
     * @author xct
     * @param throwable
     * @return void
     * @date 2021/7/30 17:14
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("與服務器斷開連接,可重連");
    }

    /**
     * 消息到達的回調
     * @author xct
     * @param topic
     * @param message
     * @return void
     * @date 2021/7/30 17:14
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    /**
     * 消息發布成功的回調
     * @author xct
     * @param iMqttDeliveryToken
     * @return void
     * @date 2021/7/30 17:14
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

3.1、控制器提供手動建立連接和斷開連接方法

package com.xct.mqttconsumer.controller;

import com.xct.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @Author: xct
 * @Date: 2021/7/30 17:20
 * @Description:
 */
@Controller
public class TestController {
    @Autowired
    private MqttConsumerConfig client;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @RequestMapping("connect")
    @ResponseBody
    public String connect(){
        client.connect();
        return clientId + "連接到服務器";
    }

    @RequestMapping("disConnect")
    @ResponseBody
    public String disConnect(){
        client.disConnect();
        return clientId + "與服務器斷開連接";
    }
}

3.2、測試

分別啟動兩個項目,可以在管理界面看到創建的兩個客戶端

調用發布消息接口發布消息

消費者控制台打印

3.3、客戶端斷線消息恢復

把消費者與服務端斷開連接

再調用發布消息接口發送兩條消息到topic1,然后再把消費者連接到服務端

控制台沒有東西打印

修改消費者客戶端配置,把setCleanSession改為false

重啟項目,把消費者客戶端斷開連接,調用發布消息接口發布兩條消息,再把消費者和服務端連接上


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM