文中提到的MQTT服務器Apache-Apollo,現在已經不維護。但是客戶端的寫法是通用的。用其它mq服務器寫法一樣,比如RabbitMQ+MQTT。
MQTT 介紹
- (1) MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協議,它是一種輕量級的、基於代理的“發布/訂閱”模式的消息傳輸協議。
- (2) MQTT 具有協議簡潔、小巧、可擴展性強、省流量、省電等優點,比較適合於在低帶寬、不可靠的網絡的進行遠程傳感器和控制設備通訊等,正在日益成為物聯網通信協議的重要組成部分。
- (3) MQTT 是ISO標准(ISO/IEC PRF 20922)下基於發布/訂閱范式的消息協議。它工作在 TCP/IP協議族上,是為硬件性能低下的遠程設備以及網絡狀況糟糕的情況下而設計的發布/訂閱型消息協議。
MQTT協議運行在TCP/IP或其他網絡協議,提供有序、無損、雙向連接。其特點包括:
- 1.使用的發布/訂閱消息模式,它提供了一對多消息分發,以實現與應用程序的解耦。
- 2.對負載內容屏蔽的消息傳輸機制。
- 3.對傳輸消息有三種服務質量(QoS):
- 最多一次,這一級別會發生消息丟失或重復,消息發布依賴於底層TCP/IP網絡。即:<=1
- 至多一次,這一級別會確保消息到達,但消息可能會重復。即:>=1
- 只有一次,確保消息只有一次到達。即:=1。在一些要求比較嚴格的計費系統中,可以使用此級別
- 4.數據傳輸和協議交換的最小化(協議頭部只有2字節),以減少網絡流量
- 5.通知機制,異常中斷時通知傳輸雙方
Apache-Apollo
Apache Apollo是一個代理服務器,其是在ActiveMQ基礎上發展而來的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多種協議。
原理:服務器端創建一個唯一訂閱號,發送者可以向這個訂閱號中發東西,然后接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的消息。以此來完成消息的推送。服務器其實是一個消息中轉站。
下載
下載地址:http://archive.apache.org/dist/activemq/activemq-apollo/
配置與啟動
- 1.需要安裝JDK環境
- 2.在命令行模式下進入bin,執行apollo create broker E:\MQTT\apache_apollo\broker,創建一個名為broker虛擬主機(Virtual Host)。需要特別注意的是,生成的目錄就是以后真正啟動程序的位置。
- 3.在命令行模式下進入E:\MQTT\apache_apollo\broker\bin,執行apollo-broker run,也可以用apollo-broker-service.exe配置成后台服務。
- 4.訪問http://127.0.0.1:61680打開web管理界面。(密碼查看broker/etc/users.properties)
- 5.啟動端口,看cmd輸出。
SpringBoot的開發
公共部分
添加依賴 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mqtt</artifactId>
<groupId>com.easy</groupId>
<version>1.0</version>
</parent>
<groupId>com.easy</groupId>
<artifactId>mqtt-subscriber</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>mqtt-subscriber</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--mqtt相關依賴 start-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt相關依賴 end-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
項目配置 application.yml
# Mqtt配置
mqtt:
serverURIs: tcp://127.0.0.1:61613
username: admin
password: password
client:
id: ${random.value}
topic: topic_default
發布者 mqtt-publisher 項目(端口:8081)
創建配置代碼 MqttPublisherConfig.java
package com.easy.mqtt.publisher.config;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
public class MqttPublisherConfig {
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
// 客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@PostConstruct
public void init() {
log.debug("username:{} password:{} hostUrl:{} clientId :{} ",
this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
}
/**
* MQTT連接器選項
*
* @return
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 設置連接的用戶名
if (!username.trim().equals("")) {
options.setUserName(username);
}
// 設置連接的密碼
options.setPassword(password.toCharArray());
// 設置連接的地址
options.setServerURIs(new String[]{hostUrl});
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
// 但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客戶端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生產者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(生產者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
clientId,
senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
}
消息發布器 MqttGateway.java
package com.easy.mqtt.publisher.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = MqttPublisherConfig.CHANNEL_NAME_OUT)
public interface MqttGateway {
/**
* 發送信息到MQTT服務器
*
* @param payload 發送的文本
*/
void sendToMqtt(String payload);
/**
* 發送信息到MQTT服務器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 發送信息到MQTT服務器
*
* @param topic 主題
* @param qos 對消息處理的幾種機制。
* 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。
* 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。
* 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
測試代碼 MqttTestController.java
package com.easy.mqtt.publisher.controller;
import com.easy.mqtt.publisher.config.MqttGateway;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* MQTT消息發送
*/
@RestController
@Slf4j
public class MqttTestController {
/**
* 注入發送MQTT的Bean
*/
@Resource
private MqttGateway mqttGateway;
/**
* 發送自定義消息內容(使用默認主題)
*
* @param msg 消息內容
* @return 返回
*/
@ResponseBody
@PostMapping(value = "/sendMqtt", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String msg) {
log.info("================生產默認主題的MQTT消息===={}============", msg);
mqttGateway.sendToMqtt(msg);
return new ResponseEntity<>("發送成功", HttpStatus.OK);
}
/**
* 發送自定義消息內容,且指定主題
*
* @param msg 消息內容
* @return 返回
*/
@ResponseBody
@PostMapping(value = "/sendMqtt2", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt2(@RequestParam("topic") String topic, @RequestParam(value = "msg") String msg) {
log.info("================生產自定義主題的MQTT消息===={}============", msg);
mqttGateway.sendToMqtt(topic, msg);
return new ResponseEntity<>("發送成功", HttpStatus.OK);
}
}
訂閱者 mqtt-subscriber 項目(端口:8082)
創建配置代碼 MqttSubscriberConfig.java
package com.easy.mqtt.subscriber.config;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
public class MqttSubscriberConfig {
/**
* 訂閱的bean名稱
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
/**
* MQTT連接器選項
*/
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 設置連接的用戶名
if (!username.trim().equals("")) {
options.setUserName(username);
}
// 設置連接的密碼
options.setPassword(password.toCharArray());
// 設置連接的地址
options.setServerURIs(new String[]{hostUrl});
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
// 但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
return options;
}
/**
* MQTT客戶端
*/
@Bean
public MqttPahoClientFactory receiverMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(消費者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息訂閱綁定(消費者)
*/
@Bean
public MessageProducer inbound() {
// 可以同時消費(訂閱)多個Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, receiverMqttClientFactory(),
new String[]{defaultTopic,"yuntian"});
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 設置訂閱通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息處理器(消費者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
log.info("\n--------------------START-------------------\n" +
"接收到訂閱消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n---------------------END--------------------");
}
};
}
}
運行示例
分別運行創建的生產者和訂閱者服務
- 1.默認主題發布消息測試
打開postman,post提交url請求(http://localhost:8081/sendMqtt?msg=哈哈哈)或用其它工具提交以下代碼
var settings = {
"url": "http://localhost:8081/sendMqtt?msg=哈哈哈",
"method": "POST",
"timeout": 0,
};
$.ajax(settings).done(function (response) {
console.log(response);
});
觀察訂閱者項目控制台,會有以下消息輸出:
--------------------START-------------------
接收到訂閱消息:
topic:topic_default
message:哈哈哈
---------------------END--------------------
表示訂閱者成功收到了發布者發布的消息
- 2.自定義主題發布消息測試
打開postman,post提交url請求(http://localhost:8081/sendMqtt2?topic=yuntian&msg=哈哈哈)或用其它工具提交以下代碼
var settings = {
"url": "http://localhost:8081/sendMqtt2?topic=yuntian&msg=哈哈哈",
"method": "POST",
"timeout": 0,
};
$.ajax(settings).done(function (response) {
console.log(response);
});
觀察訂閱者項目控制台,會有以下消息輸出:
--------------------START-------------------
接收到訂閱消息:
topic:yuntian
message:哈哈哈
---------------------END--------------------
表示訂閱者成功收到了發布者發布的yuntian主題消息