參考資料:
https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/02-ControlPacketFormat.html
https://blog.csdn.net/anxianfeng55555/article/details/80908795
MQTT簡介
MQTT是一種基於發布/訂閱模式的輕量級通訊協議,該協議構建在TCP/IP協議上。 MQTT最大的有點在於可以以極少的代碼和有限的帶寬,為遠程設備提供實時可靠的消息服務。做為一種低開銷、低帶寬占用的即時通訊協議,MQTT在物聯網、小型設備、移動應用等方面有廣泛應用。
特點
- 開放消息協議,簡單易實現
- 發布訂閱模式,一對多消息發布
- 基於TCP/IP網絡連接,提供有序,無損,雙向連接
- 2字節固定報頭,2字節心跳報文,最小化傳輸開銷和協議交換,有效減少網絡流量
- 消息QoS支持,可靠傳輸保證
應用
- 物聯網M2M通信,物聯網大數據采集
- Android消息推送,WEB消息推送
- 智能硬件、智能家具、智能電器
- 車聯網通信,電動車站樁采集
- 智慧城市、遠程醫療、遠程教育
- 電力、石油與能源等行業市場
MQTT控制報文的結構
MQTT通過交換一些預定義的MQTT控制報文來工作,每條MQTT命令消息的消息頭都包含一個固定的報頭,有些消息會攜帶一個可變報文頭和一個負荷。消息格式如下:
|固定包頭,存在於所有MQTT控制包
|可變包頭,存在於某些MQTT控制包
|載荷,存在於某些MQTT控制包
固定報文頭(Fixed Header)
MQTT固定報文頭最少有兩個字節,第一個字節包含消息類型(Message Type)和QoS級別等標志位。第二個字節開始是剩余長度字段,該長度是后面的可變報文頭加消息負載的總長度,該字段最多允許四個字節。
剩余長度使用了一種可變長度的結構來編碼,這種結構使用單一字節表示0-127的值。大於127的值如下處理。每個字節的低7位用來編碼數據,最高位用來表示是否還有后續字節。因此每個字節可以編碼128個值,再加上一個標識位。剩余長度最多可以用四個字節來表示。
例如十進制的數字64可以被編碼成一個單獨的字節,十進制為64,八進制為0x40。十進制數字321(=65+2×128)被編碼為兩個字節,低位在前。第一個字節是65+128 = 193。注意最高位的128表示后面至少還有一個字節。第二個字節是2,表示2*127。(翻譯注:321 = 11000001 00000010,第一個字節是“標識符后面還有一個字節”+65,第二個字節是“標識符后面沒有字節了”+256)。
可變報文頭(Variable Header)
可變報文頭主要包含協議名、協議版本、連接標志(Connect Flags)、心跳間隔時間(Keep Alive timer)、連接返回碼(Connect Return Code)、主題名(Topic Name)等
有效負荷(Payload)
可以理解為消息主題(body)
當MQTT發送的消息類型是CONNECT(連接)、PUBLISH(發布)、SUBSCRIBE(訂閱)、SUBACK(訂閱確認)、則會帶有負荷。
各種類型消息的控制報文參考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT的消息類型(Message Type)(控制報文類型)
名字 | 值 | 報文流動方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客戶端到服務端 | 客戶端請求連接服務端 |
CONNACK | 2 | 服務端到客戶端 | 連接報文確認 |
PUBLISH | 3 | 兩個方向都允許 | 發布消息 |
PUBACK | 4 | 兩個方向都允許 | QoS 1消息發布收到確認 |
PUBREC | 5 | 兩個方向都允許 | 發布收到(保證交付第一步) |
PUBREL | 6 | 兩個方向都允許 | 發布釋放(保證交付第二步) |
PUBCOMP | 7 | 兩個方向都允許 | QoS 2消息發布完成(保證交互第三步) |
SUBSCRIBE | 8 | 客戶端到服務端 | 客戶端訂閱請求 |
SUBACK | 9 | 服務端到客戶端 | 訂閱請求報文確認 |
UNSUBSCRIBE | 10 | 客戶端到服務端 | 客戶端取消訂閱請求 |
UNSUBACK | 11 | 服務端到客戶端 | 取消訂閱報文確認 |
PINGREQ | 12 | 客戶端到服務端 | 心跳請求 |
PINGRESP | 13 | 服務端到客戶端 | 心跳響應 |
DISCONNECT | 14 | 客戶端到服務端 | 客戶端斷開連接 |
Reserved | 15 | 禁止 | 保留 |
消息質量(QoS)
-
QoS 0:最多分發一次。消息的傳遞完全依賴於底層的TCP/IP協議,協議里沒有定義應答和重試,消息要么只會到達服務端一次,要么根本沒有到達。
-
QoS 1:至少分發一次。服務器的消息接收由PUBACK消息進行確認,如果通信鏈路或發送設備異常,或者指定時間內沒有收到確認消息,發送端會重發這條在消息頭中設置了DUP位的消息。QoS 2:只分發一次。這是最高級別的消息傳遞,消息丟失和重復都是不可接受的,使用這個服務質量等級會有額外的開銷。
通過下面的例子可以更深刻的理解上面三個傳輸質量等級。
比如目前流行的共享單車智能鎖,智能鎖可以定時使用QoS level 0質量消息請求服務器,發送單車的當前位置,如果服務器沒收到也沒關系,反正過一段時間又會再發送一次。之后用戶可以通過App查詢周圍單車位置,找到單車后需要進行解鎖,這時候可以使用QoS level 1質量消息,手機App不斷的發送解鎖消息給單車鎖,確保有一次消息能達到以解鎖單車。最后用戶用完單車后,需要提交付款表單,可以使用QoS level 2質量消息,這樣確保只傳遞一次數據,否則用戶就會多付錢了。
Springboot整合MQTT實現消息發布和訂閱
一、在Linux上搭建MQTT服務
1.1、打開EMQ官網:https://www.emqx.io/cn/products/broker
1.2、點擊開始試用
1.3、選擇服務器對應版本
1.4、復制下載命令到ssh工具中執行
下載完成
1.5、下載完成后執行安裝命令
1.6、安裝成功后執行命令:
sudo emqx start
出現以下信息表示啟動成功
1.7、測試
瀏覽器訪問ip:18083進入管理界面,默認賬號為admin,密碼為public
二、MQTT服務搭建完成后使用Springboot整合MQTT協議
2.1、創建一個maven項目
2.2、在父工程下創建一個Springboot項目作為消息提供者,導入以下依賴
<!--mqtt相關依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.3、修改配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服務端地址,端口默認為1883,如果有多個,用逗號隔開,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復)
client:
id: provider-id
#MQTT默認的消息推送主題,實際可在調用接口時指定
default:
topic: topic
server:
port: 8081
2.4、消息發布者客戶端配置
package com.xct.mqttprovider.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* @Author: xct
* @Date: 2021/7/30 15:32
* @Description:
*/
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 客戶端連接服務端
* @author xct
* @param
* @return void
* @date 2021/7/30 16:01
*/
public void connect(){
try {
//創建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主題目的地,用於發布/訂閱消息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進度。
//用於在以非阻塞方式(在后台運行)執行發布時跟蹤消息的傳遞進度
MqttDeliveryToken token;
try {
//將指定消息發布到主題,但不等待消息傳遞完成。返回的token可用於跟蹤消息的傳遞狀態。
//一旦此方法干凈地返回,消息就已被客戶端接受發布。當連接可用時,將在后台完成消息傳遞。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
2.5、消息發布客戶端回調
package com.xct.mqttprovider.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* @Author: xct
* @Date: 2021/7/30 16:00
* @Description:
*/
@Configuration
public class MqttProviderCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 與服務器斷開連接的回調
* @author xct
* @param throwable
* @return void
* @date 2021/7/30 16:19
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println(clientId + "與服務器斷開連接");
}
/**
* 消息到達的回調
* @author xct
* @param s
* @param mqttMessage
* @return void
* @date 2021/7/30 16:19
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
/**
* 消息發布成功的回調
* @author xct
* @param iMqttDeliveryToken
* @return void
* @date 2021/7/30 16:20
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId() + "發布消息成功!");
}
}
2.6、創建控制器測試發布消息
package com.xct.mqttprovider.controller;
import com.xct.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Author: xct
* @Date: 2021/7/30 16:26
* @Description:
*/
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos,retained,topic,message);
return "發送成功";
}catch (Exception e){
e.printStackTrace();
return "發送失敗";
}
}
}
2.7、在父工程下創建一個Springboot項目作為消息消費者,導入以下依賴
<!--mqtt相關依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.8、配置文件
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服務端地址,端口默認為1883,如果有多個,用逗號隔開,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復)
client:
id: consumer-id
#MQTT默認的消息推送主題,實際可在調用接口時指定
default:
topic: topic
server:
port: 8082
2.9、消費者客戶端配置
package com.xct.mqttconsumer.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @Author: xct
* @Date: 2021/7/30 17:06
* @Description:
*/
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
* @author xct
* @param
* @return void
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
* @author xct
* @param
* @return void
* @date 2021/7/30 16:01
*/
public void connect(){
try {
//創建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄,客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//訂閱主題
//消息等級,和主題數組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1,1};
//主題
String[] topics = {"topic1","topic2"};
//訂閱主題
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
* @author xct
* @param
* @return void
* @date 2021/8/2 09:30
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
* @author xct
* @param topic
* @param qos
* @return void
* @date 2021/7/30 17:12
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
3.0、 消息消費者客戶端回調
package com.xct.mqttconsumer.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @Author: xct
* @Date: 2021/7/30 17:06
* @Description:
*/
public class MqttConsumerCallBack implements MqttCallback {
/**
* 客戶端斷開連接的回調
* @author xct
* @param throwable
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務器斷開連接,可重連");
}
/**
* 消息到達的回調
* @author xct
* @param topic
* @param message
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發布成功的回調
* @author xct
* @param iMqttDeliveryToken
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
3.1、控制器提供手動建立連接和斷開連接方法
package com.xct.mqttconsumer.controller;
import com.xct.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Author: xct
* @Date: 2021/7/30 17:20
* @Description:
*/
@Controller
public class TestController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "連接到服務器";
}
@RequestMapping("disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "與服務器斷開連接";
}
}
3.2、測試
分別啟動兩個項目,可以在管理界面看到創建的兩個客戶端
調用發布消息接口發布消息
消費者控制台打印
3.3、客戶端斷線消息恢復
把消費者與服務端斷開連接
再調用發布消息接口發送兩條消息到topic1,然后再把消費者連接到服務端
控制台沒有東西打印
修改消費者客戶端配置,把setCleanSession改為false
重啟項目,把消費者客戶端斷開連接,調用發布消息接口發布兩條消息,再把消費者和服務端連接上