SpringBoot 整合 Mqtt (發布、訂閱)
文中部分內容借鑒了其他作者,在此感謝提供方法的各位作者。
1、MQTT 協議介紹(簡單介紹)
1.1、Mqtt協議中的三個角色:
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。

1.2、mqtt傳輸的消息內容
Topic(主題),可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload)
payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
1.3、mqtt客戶端
一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:
A、發布其他客戶端可能會訂閱的信息;
B、訂閱其它客戶端發布的消息;
C、退訂或刪除應用程序的消息;
D、斷開與服務器連接
1.4、mqtt服務端器
MQTT服務器以稱為"消息代理"(Broker),可以是一個應用程序或一台設備。它是位於消息發布者和訂閱者之間,它可以:
A、接受來自客戶的網絡連接;
B、接受客戶發布的應用信息;
C、處理來自客戶端的訂閱和退訂請求;
D、向訂閱的客戶轉發應用程序消息。
1.5、mqtt的訂閱、主題、會話
1.5.1、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
1.5.2、會話(Session)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在於一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。
1.5.3、主題名(Topic Name)
連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。
1.5.4、主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
1.5.5、負載(Payload)
消息訂閱者所具體接收的內容。
2、工具介紹(介紹本文中使用的)
2.1、代理服務器:Emqx
EMQ X 是基於 Erlang/OTP 語言平台開發,支持大規模連接和分布式集群,發布訂閱模式的開源 MQTT 消息服務器。
Linux docker 安裝啟動命令:
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
安轉后訪問http://ip:18083/
默認用戶名/密碼: admin/public
訪問如下表示安裝成功:


2.2、MQTT.fx
MQTT.fx 是目前主流的mqtt客戶端,可以快速驗證是否可以與IoT Hub 服務交流發布或訂閱消息。設備將當前所處的狀態作為MQTT主題發送給IoT Hub,每個MQTT主題topic具有不同等級的名稱,” MQTT代理服務器將接收到的主題topic發送給給所有訂閱的客戶端。
下載連接:http://www.jensd.de/apps/mqttfx/1.7.1/
參考:https://blog.csdn.net/tiantang_1986/article/details/85101366
3、環境簡介:
3.1、SpringBoot-version:2.1.2
3.2、Maven-version:3.6.1
3.3、開發工具: IDEA 2019.3
3.4、JDK-version:1.8
4、MQTT 發布
4.1、新建工程:MQTT-SpringBoot
4.2、pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>4.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
</dependencies>
4.3、配置文件:
server:
port: 8090
spring:
application:
name: MQTT-SpringBoot
mqtt:
username: admin
password: public
# 推送信息的連接地址,如果有多個,用逗號隔開,如:tcp://ip:1883,tcp://ip:1883
url: tcp://ip:1883,tcp://ip:1883
sender:
# 默認發送的主題
defaultTopic: goods
# clientid
clientId: mqtttest
4.4、代碼
4.4.1、modal
package com.riest.modal;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* ClassName:Send
* Describe:
* Author:DGJ
* Data:2020/10/29 10:06
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Send {
private String topic;
private String key;
private String value;
}
4.4.2、config
package com.riest.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
/**
* ClassName:MqttConfig
* Describe:
* Author:DGJ
* Data:2020/10/29 10:08
*/
@Configuration
public class MqttConfig {
/**
* 發布的bean名稱
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
/**
* 客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的"遺囑"消息
*/
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.sender.clientId}")
private String clientId;
@Value("${mqtt.sender.defaultTopic}")
private String defaultTopic;
/**
* MQTT連接器選項
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions(){
MqttConnectOptions options=new MqttConnectOptions();
// 設置連接的用戶名
if(!username.trim().equals("")){
options.setUserName(username);
}
// 設置連接的密碼
options.setPassword(password.toCharArray());
// 設置連接的地址
options.setServerURIs(StringUtils.split(url, ","));
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
// 但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設置 "遺囑" 消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的"遺囑"消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客戶端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生產者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(生產者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
}
4.4.3、service
package com.riest.service;
import com.riest.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* ClassName:SendInterface
* Describe:
* Author:DGJ
* Data:2020/10/29 10:08
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface ISend {
/**
* 發送信息到MQTT服務器
*
* @param data 發送的文本
*/
void sendToMqtt(String data);
/**
* 發送信息到MQTT服務器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 發送信息到MQTT服務器
*
* @param topic 主題
* @param qos 對消息處理的幾種機制。
* 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。
* 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。
* 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
4.4.4、controller
package com.riest.controller;
import com.alibaba.fastjson.JSONObject;
import com.riest.modal.Send;
import com.riest.service.ISend;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* ClassName:SendController
* Describe:
* Author:DGJ
* Data:2020/10/29 10:11
*/
@RestController
@Slf4j
public class SendController {
@Autowired
private ISend iMqttSender;
/**
* 發送自定義消息內容(使用默認主題)
* @param data
*/
@GetMapping(value = "/send")
public void test1(Send data) {
JSONObject json = (JSONObject) JSONObject.toJSON(data);
log.error("----->{}","mqtt 消息發布,使用默認主題發布:"+json.toJSONString());
iMqttSender.sendToMqtt(json.toJSONString());
}
/**
* 發送自定義消息內容,且指定主題
* @param data
*/
@RequestMapping("/send/topic")
public void test2(Send data) {
JSONObject json = (JSONObject) JSONObject.toJSON(data);
log.error("----->{}","mqtt 消息發布,指定主題:"+json.toJSONString());
iMqttSender.sendToMqtt(data.getTopic(), json.toJSONString());
}
}
4.4.5、主啟動
package com.riest;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* ClassName:MqttApplication
* Describe:
* Author:DGJ
* Data:2020/10/29 10:05
*/
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
}
}
4.5、工具設置
4.5.1、mqtt.fx


4.5.2、EMQX

4.6、測試
4.6.1 、調用(該請求時使用默認主題) http://localhost:8090/send?key=send&value=1234567
4.6.2、mqtt.fx收到訂閱的消息

4.6.3 、emqx查看

4.6.4、調用(指定主題發布)http://localhost:8090/send/topic?key=send&value=1234567&topic=dgj-test
4.6.5、mqtt.fx收到訂閱的消息

4.6.6 、emqx查看

5、MQTT 訂閱
5.1 、config
package com.riest.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
/**
* ClassName:MQTTConnect
* Describe:
* Author:DGJ
* Data:2020/10/29 10:46
*/
public class MQTTConnect {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
/**
* 把配置里的 cleanSession 設為false,客戶端掉線后 服務器端不會清除session,
* 當重連后可以接收之前訂閱主題的消息。當客戶端上線后會接受到它離線的這段時間的消息,
* 如果短線需要刪除之前的消息則可以設置為true
*
* @return
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setConnectionTimeout(10);
//設置心跳
options.setKeepAliveInterval(20);
return options;
}
public MqttConnectOptions getOptions(MqttConnectOptions options) {
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
return options;
}
}
5.2、sub端
package com.riest.service.sub;
import com.riest.config.MQTTConnect;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* ClassName:MqttSub
* Describe:
* Author:DGJ
* Data:2020/10/29 10:45
*/
@Component
public class MqttSub {
public static final String HOST = "tcp://122.51.240.115:1883";
private static final String clientid = "testrece";
private String topic = "goods";
public MqttClient client;
private MQTTConnect mqttConnect = new MQTTConnect();
/**true為非持久訂閱
*
* 方法實現說明 斷線重連方法,如果是持久訂閱,重連是不需要再次訂閱,如果是非持久訂閱,重連是需要重新訂閱主題 取決於options.setCleanSession(true);
*
* 就是這里的clientId,服務器用來區分用戶的,不能重復,clientId不能和發布的clientId一樣,否則會出現頻繁斷開連接和重連的問題
* 不僅不能和發布的clientId一樣,而且也不能和其他訂閱的clientId一樣,如果想要接收之前的離線數據,這就需要將client的 setCleanSession
* 設置為false,這樣服務器才能保留它的session,再次建立連接的時候,它就會繼續使用這個session了。 這時此連接clientId 是不能更改的。
* 但是其實還有一個問題,就是使用熱部署的時候還是會出現頻繁斷開連接和重連的問題,可能是因為剛啟動時的連接沒斷開,然后熱部署的時候又進行了重連,重啟一 * 下就可以了
* System.currentTimeMillis()
* @throws MqttException
*/
public void connect() throws MqttException {
//防止重復創建MQTTClient實例
if (client==null) {
// MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
//如果是訂閱者則添加回調類,發布不需要
client.setCallback(new PubCallBack(MqttSub.this));
}
MqttConnectOptions options = mqttConnect.getOptions();
//判斷攔截狀態,這里注意一下,如果沒有這個判斷,是非常坑的
if (!client.isConnected()) {
client.connect(options);
System.out.println("連接成功");
}else {
client.disconnect();
client.connect(mqttConnect.getOptions(options));
System.out.println("連接成功");
}
}
public void init() {
try {
connect();
subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱某個主題,qos默認為0
*
* @param topic .
*/
public void subscribe(String topic) {
subscribe(topic,0);
}
/**
* 訂閱某個主題
*
* @param topic .
* @param qos .
*/
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic,0);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
package com.riest.service.sub;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
/**
* ClassName:PubCallBack
* Describe:
* Author:DGJ
* Data:2020/10/29 10:48
*/
@Slf4j
public class PubCallBack implements MqttCallback {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.receiver.clientId}")
private String clientId;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultTopic;
private MqttSub mqttSub;
public PubCallBack(MqttSub subsribe) throws MqttException {
this.mqttSub = subsribe;
}
@Override
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
log.error("---------------------連接斷開,可以做重連");
while (true){
try {
//如果沒有發生異常說明連接成功,如果發生異常,則死循環
Thread.sleep(1000);
mqttSub.init();
break;
}catch (Exception e){
// e.printStackTrace();
continue;
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執行到這里面
String result = new String(message.getPayload(),"UTF-8");
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + result);
//這里可以針對收到的消息做處理
}
}
5.3、主啟動
package com.riest;
import com.riest.service.sub.MqttSub;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.PostConstruct;
/**
* ClassName:MqttApplication
* Describe:
* Author:DGJ
* Data:2020/10/29 10:05
*/
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
}
@Autowired
private MqttSub mqttSub;
@PostConstruct
public void consumeMqttClient() throws MqttException {
mqttSub.init(); // 訂閱 消息
}
}
6、測試訂閱
6.1、mqtt.fx

6.2、emqx

6.3、程序日志

7、最終項目結構

