參考spring官方提供的教程:spring集成MQTT官方文檔
以下代碼均由自己封裝,可能存在誤差或遺漏,還望大佬指教。
環境版本
spring-boot 版本 2.4.3
spring-integration的版本為:5.4.3
Spring Integration提供了入站適配器和出站適配器以支持MQTT協議。
你需要在你的項目中加入spring-integration-mqtt依賴:
Maven:
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.3</version>
</dependency>
application.yml:
spring:
mqtt:
username:
password:
url: tcp://ip:port
clientId: clientId
topic: default
completionTimeout: 2000
核心代碼
啟動類
@SpringBootApplication @EnableCaching public class MqttApplication { public static void main(String[] args) { SpringApplication.run(MqttApplication.class, args); } }
配置項
@Data @Configuration @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfiguration { private String username; private String password; private String url; private String clientId; private String topic = "TOPIC_DEFAULT"; private Integer completionTimeout = 2000; /** * 注冊MQTT客戶端工廠 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); //如果設置為 false,客戶端和服務器將在客戶端、服務器和連接重新啟動時保持狀態。隨着狀態的保持: // 即使客戶端、服務器或連接重新啟動,消息傳遞也將可靠地滿足指定的 QOS。服務器將訂閱視為持久的。 // 如果設置為 true,客戶端和服務器將不會在客戶端、服務器或連接重新啟動時保持狀態。 options.setCleanSession(true); //該值以秒為單位,必須>0,定義了客戶端等待與 MQTT 服務器建立網絡連接的最大時間間隔。 // 默認超時為 30 秒。值 0 禁用超時處理,這意味着客戶端將等待直到網絡連接成功或失敗。 options.setConnectionTimeout(0); //此值以秒為單位,定義發送或接收消息之間的最大時間間隔,必須>0 options.setKeepAliveInterval(90); //自動重新連接 options.setAutomaticReconnect(true); options.setUserName(this.getUsername()); options.setPassword(this.getPassword().toCharArray()); options.setServerURIs(new String[]{this.getUrl()}); factory.setConnectionOptions(options); return factory; } }
InboundConfiguration
/** * @author Jackpot * @version jdk1.8 * @date 2021/9/7 12:12 下午 * @description */ @Slf4j @AllArgsConstructor @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; private MqttMessageReceiver mqttMessageReceiver; /** * 此處可以使用其他消息通道 * Spring Integration默認的消息通道,它允許將消息發送給一個訂閱者,然后阻礙發送直到消息被接收。 * * @return */ @Bean public MessageChannel mqttInBoundChannel() { return new DirectChannel(); } /** * 適配器, 兩個topic共用一個adapter * 客戶端作為消費者,訂閱主題,消費消息 * * @param * @param * @return */ @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); adapter.setCompletionTimeout(60000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setRecoveryInterval(10000); adapter.setQos(0); adapter.setOutputChannel(mqttInBoundChannel()); return adapter; } /** * mqtt入站消息處理工具,對於指定消息入站通道接收到生產者生產的消息后處理消息的工具。 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInBoundChannel") public MessageHandler mqttMessageHandler() { return this.mqttMessageReceiver; } }
Receiver
@Slf4j @AllArgsConstructor @Component public class MqttMessageReceiver implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { try { MessageHeaders headers = message.getHeaders(); //獲取消息Topic String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); log.info("[獲取到的消息的topic :]{} ", receivedTopic); //獲取消息體 String payload = (String) message.getPayload(); log.info("[獲取到的消息的payload :]{} ", payload); //todo .... } catch (Exception e) { e.printStackTrace(); } } }
OutboundConfiguration
@Slf4j @AllArgsConstructor @Configuration public class MqttOutboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory); messageHandler.setDefaultQos(0); //開啟異步 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getTopic()); return messageHandler; } }
Gateway
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 發送mqtt消息 * @param topic 主題 * @param payload 內容 * @return void * @author Jackpot * @date 2021/9/7 12:20 下午 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 發送包含qos的消息 * @param topic 主題 * @param qos 對消息處理的幾種機制。 * * 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。<br> * * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。<br> * * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。 * @param payload 消息體 * @return void * @author Jackpot * @date 2021/9/7 12:21 下午 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 發送包含qos的消息 * @param topic 主題 * @param qos 對消息處理的幾種機制。 * * 0 表示的是訂閱者沒收到消息不會再次發送,消息會丟失。<br> * * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。<br> * * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。 * @param payload 消息體 * @return void * @author Jackpot * @date 2021/9/7 12:21 下午 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
sender
@Component @AllArgsConstructor public class MqttMessageSender { private MqttGateway mqttGateway; /** * 發送mqtt消息 * @param topic 主題 * @param message 內容 * @return void * @author Jackpot * @date 2021/9/7 12:20 下午 */ public void send(String topic, String message) { mqttGateway.sendToMqtt(topic, message); } /** * 發送包含qos的消息 * @param topic 主題 * @param qos 質量 * @param messageBody 消息體 * @return void * @author Jackpot * @date 2021/9/7 12:21 下午 */ public void send(String topic, int qos, JSONObject messageBody){ mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); } /** * 發送包含qos的消息 * @param topic 主題 * @param qos 質量 * @param message 消息體 * @return void * @author Jackpot * @date 2021/9/7 12:21 下午 */ public void send(String topic, int qos, byte[] message){ mqttGateway.sendToMqtt(topic, qos, message); } }
