SpringBoot整合MQTT實踐開發


MQTT簡介:

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級協議,該協議構建於TCP/IP協議之上,MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務(這是特點)。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。

MQTT應用范圍:

MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。

分享給大家一套SpringBoot的視頻教程,這套視頻由淺入深,帶你體驗Spring Boot的極速開發過程,內容豐富,涵蓋了SpringBoot開發的方方面面,並且同步更新到Spring Boot 2.x系列的最新版本,讓你一次性拿下Spring Boot開發框架。

視頻教程在線觀看:

SpringBoot框架從入門到實踐視頻課程 - Spring Boot - 動力節點在線 (bjpowernode.com)

資料及源碼免費下載:

SpringBoot學習視頻_全套SpringBoot基礎教程免費下載 - 動力節點 (bjpowernode.com)

引入maven依賴

1 <dependency>
2 <groupId>org.eclipse.paho</groupId>
3 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
4 <version>1.2.0</version>
5 </dependency>
1、Mqtt
 1 @Component
 2 public class Mqtt implements CommandLineRunner {
 3 
 4 
 5 @Autowired
 6 private CommitUserInfo commitUserInfol;
 7 @Autowired
 8 private UserInfoDao userInfoDao; // 數據庫CRUD接口
 9 @Autowired
10 private ApplicationRedis applicationRedis; // redis 接口
11 @Autowired
12 JudgeEquipmentService judgeEquipmentService; // 查詢/修改---當前設備號碼狀態
13 
14 @Override
15 public void run(String... args) throws Exception {
16 applicationRedis.test();
17 MqttConnectionUtils.start(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol);
18 
19 }
20 
21 
22 
23 }

2.MqttConnectionUtils

 1 private static MqttClient client;
 2 
 3 private static MqttConnectOptions connectOptions;
 4 
 5 private static String TOPIC;
 6 
 7 private static String clientId;
 8 
 9 private static final Logger LOG = LogManager.getLogger(MqttConnectionUtils.class);
10 static {
11 try {
12 clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
13 client = new MqttClient("tcp://****:1883",clientId);
14 connectOptions=new MqttConnectOptions();
15 connectOptions.setCleanSession(false);
16 connectOptions.setUserName("用戶名");
17 connectOptions.setPassword(PropertiesReader.getPassword().toCharArray());//密碼 connectOptions.setConnectionTimeout(10);
18 client.setTimeToWait(10000);
19 client.connect(connectOptions);
20 TOPIC = PropertiesReader.getTopic();
21 } catch (MqttException e) {
22 e.printStackTrace();
23 }
24 }
25 
26 
27 /**
28 * 發送數據
29 */
30 
31 public static void publish(String topic,String content) throws MqttException {
32 MqttMessage message=new MqttMessage(content.getBytes());
33 message.setQos(1);
34 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設置日期格式
35 LOG.info("發送時間========"+df.format(new Date()));
36 LOG.info(topic+"主題發送成功,內容:"+message);
37 client.publish(topic,message);
38 }
39 
40 /**
41 * 接收數據
42 */
43 
44 public static void start(UserInfoDao userInfoDao, ApplicationRedis applicationRedis,
45 JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol) throws MqttException {
46 MqttTopic topic = client.getTopic(TOPIC);
47 // setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
48 connectOptions.setWill(topic, "close".getBytes(), 2, true);
49 
50 // 訂閱消息
51 int[] Qos = { 1 };
52 String[] topic1 = { TOPIC };
53 client.subscribe(topic1, Qos);
54 // 設置回調
55 client.setCallback(new PushCallback(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol));
56 LOG.info("WIFI版啟動成功=================");
57 }
58 
59 
60 /**
61 * mqtt重連
62 */
63 public static void reConnect() {
64 while (true){
65 try {
66 if (null != client && !(client.isConnected())) {
67 Thread.sleep(1000);
68 clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
69 client.connect(connectOptions);
70 LOG.info("=======嘗試重新連接==============");
71 break;
72 }
73 } catch (MqttException | InterruptedException e) {
74 LOG.info("=======重新連接失敗:{}==============", e.toString());
75 continue;
76 }
77 }
78 
79 }

3.PushCallback

 1 import com.alibaba.fastjson.JSONObject;
 2 
 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 4 import org.eclipse.paho.client.mqttv3.MqttCallback;
 5 import org.eclipse.paho.client.mqttv3.MqttException;
 6 import org.eclipse.paho.client.mqttv3.MqttMessage;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 
 9 import java.io.UnsupportedEncodingException;
10 import java.util.*;
11 
12 public class PushCallback implements MqttCallback{
13 
14 public static PushCallback pushCallback;
15 
16 @Autowired
17 private UserInfoDao userInfoDao; // 數據庫CRUD接口
18 
19 @Autowired
20 private CommitUserInfo commitUserInfol;
21 
22 @Autowired
23 private ApplicationRedis applicationRedis; // redis 接口
24 @Autowired
25 JudgeEquipmentService judgeEquipmentService; // 查詢/修改---當前設備號碼狀態
26 
27 
28 
29 public PushCallback(UserInfoDao userInfoDao,ApplicationRedis applicationRedis,
30 JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol){
31 this.userInfoDao = userInfoDao;
32 this.applicationRedis = applicationRedis;
33 this.judgeEquipmentService = judgeEquipmentService;
34 this.commitUserInfol = commitUserInfol;
35 }
36 @Override
37 public void connectionLost(Throwable throwable) {
38 // 連接丟失后,一般在這里面進行重連
39 System.out.println("WIFI版======連接斷開,可以做重連");
40 MqttConnectionUtils.reConnect();
41 }
42 
43 @Override
44 public void messageArrived(String topic, MqttMessage message) throws Exception {
45 // subscribe后得到的消息會執行到這里面
46 String messages = new String(message.getPayload());
47 if(!messages.equals("close")){
48 System.out.println("接收消息主題 : " + topic);
49 System.out.println("接收消息Qos : " + message.getQos());
50 System.out.println("接收消息內容 : " + new String(message.getPayload()));
51 try {
52 
53 perform(topic,json);
54 
55 }catch (Exception e){
56 
57 }
58 
59 }
60 
61 
62 }
63 
64 @Override
65 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
66 System.out.println("deliveryComplete---------" +iMqttDeliveryToken.isComplete());
67 }
68 
69 public void perform(String topicP,JSONObject json) throws MqttException, UnsupportedEncodingException {
70 //你的業務模塊
71 }
72 
73 
74 }

 

 

 


 

轉載自:CSDN作者JAVA葉知秋

原文鏈接:https://blog.csdn.net/qq_37996327/article/details/104797737

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM