MQTT 3 ——MQTT與Spring Mvc整合


本篇記錄一下MQTT客戶端與Spring Mvc整合



  網絡上已經有很多的MQTT客戶端與SpringBoot整合的技術文檔,但是與Spring Mvc框架的整合文檔似乎並不太多,可能是因為SpringMvc框架已經逐漸被淘汰了。但很幸運,我這次JAVA后端項目就是用的SpringMvc,所以在整理了網上很多資料后,也為了方便其他后來人,把我這次的整合開發過程用文字記錄一下。

POM引入依賴
  千萬注意引入依賴的版本,一定要保持3個依賴的版本一致,不同的版本可能造成各種問題。
我在開發過程中碰到因為3個版本不一致,導致Class找不到的情況,所以對版本間區別不清楚的朋友們,就不要改版本。

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>4.1.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.1.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>4.1.0.RELEASE</version>
</dependency>

 

resource目錄下添加mqtt.properties配置文件

#用戶名
mqtt.username=mqttPubClient
#密碼
mqtt.password=123456
#是否清除會話
mqtt.cleanSession=false
#服務端url
mqtt.serverURI1=tcp://127.0.0.1:1883

mqtt.async=true
#超時時間
mqtt.completionTimeout=20000
#心跳
mqtt.keepAliveInterval=30
#客戶端id
mqtt.clientId=mqttPubClient
#默認的消息服務質量
mqtt.defaultQos=1
#MQTT-監聽的主題
mqtt.topic=hello

 

寫Spring Mqtt 的配置XML
在resource目錄下添加spring-mqtt.xml,要確保這個XML能被Spring啟動時加載進去。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4 xmlns:int="http://www.springframework.org/schema/integration"
 5 xmlns:context="http://www.springframework.org/schema/context"
 6 xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
 7 xsi:schemaLocation="
 8 http://www.springframework.org/schema/integration
 9 http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
10 http://www.springframework.org/schema/beans
11 http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
12 http://www.springframework.org/schema/integration/mqtt
13 http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
14 http://www.springframework.org/schema/context
15 http://www.springframework.org/schema/context/spring-context-3.1.xsd ">
16 
17 <context:property-placeholder location="classpath:mqtt.properties" ignore-unresolvable="true"/>
18 
19 <!--MQTT配置-->
20 <bean id="clientFactory"
21 class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
22 <property name="userName" value="${mqtt.username}"/>
23 <property name="password" value="${mqtt.password}"/>
24 <property name="cleanSession" value="${mqtt.cleanSession}"/>
25 <property name="keepAliveInterval" value="${mqtt.keepAliveInterval}"/>
26 <property name="serverURIs">
27 <array>
28 <value>${mqtt.serverURI1}</value>
29 </array>
30 </property>
31 </bean>
32 
33 <bean id="mqttHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
34 <constructor-arg name="clientId" value="${mqtt.clientId}"/>
35 <constructor-arg name="clientFactory" ref="clientFactory"/>
36 <property name="async" value="${mqtt.async}"/>
37 <property name="defaultQos" value="${mqtt.defaultQos}"/>
38 <property name="completionTimeout" value="${mqtt.completionTimeout}"/>
39 </bean>
40 
41 <!-- 消息適配器 -->
42 <int-mqtt:message-driven-channel-adapter
43 id="mqttInbound" client-id="${mqtt.clientId}" url="${mqtt.serverURI1}"
44 topics="${mqtt.topic}" qos="${mqtt.defaultQos}" client-factory="clientFactory" auto-startup="true"
45 send-timeout="${mqtt.completionTimeout}" channel="startCase" />
46 <int:channel id="startCase" />
47 <!-- 消息處理類 -->
48 <int:service-activator id="handlerService"
49 input-channel="startCase" ref="mqttCaseService" method="handler" />
50 
51 <!-- 消息處理 -->
52 <bean id="mqttCaseService" class="com.loong.mqtt.service.impl.MqttServiceImpl" />
53 
54 </beans>

 

MQTT Service 接口

1 public interface MqttService {
2 
3 public void send(String topic,String content) throws Exception;
4 
5 public void handler(String message) throws Exception;
6 }

 

MQTT Service 實現

 1 import com.mqtt.service.MqttService;
 2 import lombok.extern.slf4j.Slf4j;
 3 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 4 import org.springframework.integration.mqtt.support.MqttHeaders;
 5 import org.springframework.integration.support.MessageBuilder;
 6 import org.springframework.messaging.Message;
 7 import org.springframework.stereotype.Service;
 8 
 9 import javax.annotation.Resource;
10 
11 @Service("MqttService")
12 @Slf4j
13 public class MqttServiceImpl implements MqttService {
14 
15 @Resource
16 private MqttPahoMessageHandler mqttHandler;
17 
18 @Override
19 public void send(String topic, String content) throws Exception {
20 // 構建消息
21 Message<String> messages =
22 MessageBuilder.withPayload(content).setHeader(MqttHeaders.TOPIC, topic).build();
23 // 發送消息
24 mqttHandler.handleMessage(messages);
25 }
26 
27 
28 @Override
29 public void handler(String message) throws Exception {
30 log.info("收到消息:"+message);
31 }
32 }
33 

 

MQTT Controller

 1 package com.loong.mqtt.controller;
 2 
 3 import com.loong.api.model.ApiQueryOrderModel;
 4 import com.loong.kuaizhi.service.KuaizhiService;
 5 import com.loong.mqtt.service.MqttService;
 6 import com.loong.ysf.dto.Protocol;
 7 import com.loong.ysf.model.OrderInfo;
 8 import lombok.extern.slf4j.Slf4j;
 9 import org.jeecgframework.core.common.service.impl.RedisService;
10 import org.jeecgframework.core.util.ResourceUtil;
11 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.web.bind.annotation.*;
13 
14 import javax.annotation.Resource;
15 import javax.servlet.http.HttpServletRequest;
16 import javax.servlet.http.HttpServletResponse;
17 
18 /**
19  */
20 @RestController
21 @RequestMapping("/mqttController")
22 @Slf4j
23 public class MqttController {
24     
25     @Resource(name = "MqttService")
26     private MqttService mqttService;
27 
28     @RequestMapping(params = "testSend", method = RequestMethod.POST)
29     @ResponseBody
30     public String testSend(HttpServletRequest request, HttpServletResponse response) {
31         try {
32             String topic = ResourceUtil.getParameter("topic");
33             String content = ResourceUtil.getParameter("content");
34             this.mqttService.send(topic,content);
35         } catch (Exception ex) {
36             ex.printStackTrace();
37             log.error("發送失敗", ex);
38         }
39 
40         return "發送成功";
41     }
42 }

 

發送消息測試

  • MQTTBOX先連上MQTT服務器,並訂閱topic為hello的消息
  • POSTMAN向mqttController.testSend接口發送請求,topic為hello,內容為{"hello":"200315"}
  • MQTTBOX里面的訂閱者收到消息

接收消息測試

  • MQTTBOX創建一個消息發布者,發MQTT服務器發送topic為hello,內容為{"hello":"513002"}

     

  • 項目后台日志輸出訂閱到的內容


作者:admin
原文地址:www.jiansword.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM