關於MQTT
做一個無人船項目,使用MQTT通信。
MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。
發布訂閱

文檔總結
🟣 MQTT 協議
服務端
服務端使用 mosquitto(版本2.0.14)
下載頁面:https://mosquitto.org/download/
客戶端
MQTTX
下載頁面:https://mqttx.app/zh#download
MQTT.fx
下載鏈接:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe
paho
https://github.com/eclipse/paho.mqtt.java
paho是eclipse提供MQTT客戶端開源庫,Java代碼集成這個客戶端用來收發消息。
springboot 集成 MQTT
代碼:https://gitee.com/ioufev/mqtt-springboot-demo
依賴
pom.xml
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
類MqttConfig
spring中集成框架,有消息入站通道(用來接收消息)和出站通道(用來發送消息)
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
// 消費消息
/**
* 創建MqttPahoClientFactory,設置MQTT Broker連接屬性,如果使用SSL驗證,也在這里設置。
* @return factory
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 設置代理端的URL地址,可以是多個
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
factory.setConnectionOptions(options);
return factory;
}
/**
* 入站通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 入站
*/
@Bean
public MessageProducer inbound() {
// Paho客戶端消息驅動通道適配器,主要用來訂閱主題
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
adapter.setCompletionTimeout(5000);
// Paho消息轉換器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字節接收消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(1); // 設置QoS
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
// ServiceActivator注解表明:當前方法用於處理MQTT消息,inputChannel參數指定了用於消費消息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字節格式
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 根據主題分別進行消息處理。
if (topic.matches(".+/sensor")) { // 匹配:1/sensor
String sensorSn = topic.split("/")[0];
System.out.println("傳感器" + sensorSn + ": 的消息: " + payload);
} else if (topic.equals("collector")) {
System.out.println("采集器的消息:" + payload);
} else {
System.out.println("丟棄消息:主題[" + topic + "],負載:" + payload);
}
};
}
// 發送消息
/**
* 出站通道
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 出站
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
// 發送消息和消費消息Channel可以使用相同MqttPahoClientFactory
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
messageHandler.setAsync(true); // 如果設置成true,即異步,發送消息時將不會阻塞。
messageHandler.setDefaultTopic("command");
messageHandler.setDefaultQos(1); // 設置默認QoS
// Paho消息轉換器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// defaultPahoMessageConverter.setPayloadAsBytes(true); // 發送默認按字節類型發送消息
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
接口MqttGateway
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定義重載方法,用於消息發送
void sendToMqtt(String payload);
// 指定topic進行消息發送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
測試
測試方式:使用接口工具,給接口發送消息,從而調用MQTT客戶端發布消息
類MqttController
import com.ioufev.mqtt.domain.MyMessage;
import com.ioufev.mqtt.mqtt.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class MqttController {
@Resource
private MqttGateway mqttGateway;
@PostMapping("/send")
public String send(@RequestBody MyMessage myMessage) {
// 發送消息到指定主題
mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent();
}
}
類MyMessage
public class MyMessage {
private String topic;
private String content;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
