本篇記錄一下MQTT客戶端與Spring Mvc整合
網絡上已經有很多的MQTT客戶端與SpringBoot整合的技術文檔,但是與Spring Mvc框架的整合文檔似乎並不太多,可能是因為SpringMvc框架已經逐漸被淘汰了。但很幸運,我這次JAVA后端項目就是用的SpringMvc,所以在整理了網上很多資料后,也為了方便其他后來人,把我這次的整合開發過程用文字記錄一下。
POM引入依賴
千萬注意引入依賴的版本,一定要保持3個依賴的版本一致,不同的版本可能造成各種問題。
我在開發過程中碰到因為3個版本不一致,導致Class找不到的情況,所以對版本間區別不清楚的朋友們,就不要改版本。
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>4.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>4.1.0.RELEASE</version> </dependency>
resource目錄下添加mqtt.properties配置文件
#用戶名 mqtt.username=mqttPubClient #密碼 mqtt.password=123456 #是否清除會話 mqtt.cleanSession=false #服務端url mqtt.serverURI1=tcp://127.0.0.1:1883 mqtt.async=true #超時時間 mqtt.completionTimeout=20000 #心跳 mqtt.keepAliveInterval=30 #客戶端id mqtt.clientId=mqttPubClient #默認的消息服務質量 mqtt.defaultQos=1 #MQTT-監聽的主題 mqtt.topic=hello
寫Spring Mqtt 的配置XML
在resource目錄下添加spring-mqtt.xml,要確保這個XML能被Spring啟動時加載進去。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:int="http://www.springframework.org/schema/integration" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt" 7 xsi:schemaLocation=" 8 http://www.springframework.org/schema/integration 9 http://www.springframework.org/schema/integration/spring-integration-4.1.xsd 10 http://www.springframework.org/schema/beans 11 http://www.springframework.org/schema/beans/spring-beans-3.1.xsd 12 http://www.springframework.org/schema/integration/mqtt 13 http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd 14 http://www.springframework.org/schema/context 15 http://www.springframework.org/schema/context/spring-context-3.1.xsd "> 16 17 <context:property-placeholder location="classpath:mqtt.properties" ignore-unresolvable="true"/> 18 19 <!--MQTT配置--> 20 <bean id="clientFactory" 21 class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> 22 <property name="userName" value="${mqtt.username}"/> 23 <property name="password" value="${mqtt.password}"/> 24 <property name="cleanSession" value="${mqtt.cleanSession}"/> 25 <property name="keepAliveInterval" value="${mqtt.keepAliveInterval}"/> 26 <property name="serverURIs"> 27 <array> 28 <value>${mqtt.serverURI1}</value> 29 </array> 30 </property> 31 </bean> 32 33 <bean id="mqttHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler"> 34 <constructor-arg name="clientId" value="${mqtt.clientId}"/> 35 <constructor-arg name="clientFactory" ref="clientFactory"/> 36 <property name="async" value="${mqtt.async}"/> 37 <property name="defaultQos" value="${mqtt.defaultQos}"/> 38 <property name="completionTimeout" value="${mqtt.completionTimeout}"/> 39 </bean> 40 41 <!-- 消息適配器 --> 42 <int-mqtt:message-driven-channel-adapter 43 id="mqttInbound" client-id="${mqtt.clientId}" url="${mqtt.serverURI1}" 44 topics="${mqtt.topic}" qos="${mqtt.defaultQos}" client-factory="clientFactory" auto-startup="true" 45 send-timeout="${mqtt.completionTimeout}" channel="startCase" /> 46 <int:channel id="startCase" /> 47 <!-- 消息處理類 --> 48 <int:service-activator id="handlerService" 49 input-channel="startCase" ref="mqttCaseService" method="handler" /> 50 51 <!-- 消息處理 --> 52 <bean id="mqttCaseService" class="com.loong.mqtt.service.impl.MqttServiceImpl" /> 53 54 </beans>
MQTT Service 接口
1 public interface MqttService { 2 3 public void send(String topic,String content) throws Exception; 4 5 public void handler(String message) throws Exception; 6 }
MQTT Service 實現
1 import com.mqtt.service.MqttService; 2 import lombok.extern.slf4j.Slf4j; 3 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; 4 import org.springframework.integration.mqtt.support.MqttHeaders; 5 import org.springframework.integration.support.MessageBuilder; 6 import org.springframework.messaging.Message; 7 import org.springframework.stereotype.Service; 8 9 import javax.annotation.Resource; 10 11 @Service("MqttService") 12 @Slf4j 13 public class MqttServiceImpl implements MqttService { 14 15 @Resource 16 private MqttPahoMessageHandler mqttHandler; 17 18 @Override 19 public void send(String topic, String content) throws Exception { 20 // 構建消息 21 Message<String> messages = 22 MessageBuilder.withPayload(content).setHeader(MqttHeaders.TOPIC, topic).build(); 23 // 發送消息 24 mqttHandler.handleMessage(messages); 25 } 26 27 28 @Override 29 public void handler(String message) throws Exception { 30 log.info("收到消息:"+message); 31 } 32 } 33
MQTT Controller
1 package com.loong.mqtt.controller; 2 3 import com.loong.api.model.ApiQueryOrderModel; 4 import com.loong.kuaizhi.service.KuaizhiService; 5 import com.loong.mqtt.service.MqttService; 6 import com.loong.ysf.dto.Protocol; 7 import com.loong.ysf.model.OrderInfo; 8 import lombok.extern.slf4j.Slf4j; 9 import org.jeecgframework.core.common.service.impl.RedisService; 10 import org.jeecgframework.core.util.ResourceUtil; 11 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.web.bind.annotation.*; 13 14 import javax.annotation.Resource; 15 import javax.servlet.http.HttpServletRequest; 16 import javax.servlet.http.HttpServletResponse; 17 18 /** 19 */ 20 @RestController 21 @RequestMapping("/mqttController") 22 @Slf4j 23 public class MqttController { 24 25 @Resource(name = "MqttService") 26 private MqttService mqttService; 27 28 @RequestMapping(params = "testSend", method = RequestMethod.POST) 29 @ResponseBody 30 public String testSend(HttpServletRequest request, HttpServletResponse response) { 31 try { 32 String topic = ResourceUtil.getParameter("topic"); 33 String content = ResourceUtil.getParameter("content"); 34 this.mqttService.send(topic,content); 35 } catch (Exception ex) { 36 ex.printStackTrace(); 37 log.error("發送失敗", ex); 38 } 39 40 return "發送成功"; 41 } 42 }
發送消息測試
- MQTTBOX先連上MQTT服務器,並訂閱topic為hello的消息
- POSTMAN向mqttController.testSend接口發送請求,topic為hello,內容為{"hello":"200315"}
- MQTTBOX里面的訂閱者收到消息
接收消息測試
- MQTTBOX創建一個消息發布者,發MQTT服務器發送topic為hello,內容為{"hello":"513002"}
- 項目后台日志輸出訂閱到的內容
作者:admin
原文地址:www.jiansword.com