MQTT簡介:
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級協議,該協議構建於TCP/IP協議之上,MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務(這是特點)。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
MQTT應用范圍:
MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
分享給大家一套SpringBoot的視頻教程,這套視頻由淺入深,帶你體驗Spring Boot的極速開發過程,內容豐富,涵蓋了SpringBoot開發的方方面面,並且同步更新到Spring Boot 2.x系列的最新版本,讓你一次性拿下Spring Boot開發框架。
視頻教程在線觀看:
SpringBoot框架從入門到實踐視頻課程 - Spring Boot - 動力節點在線 (bjpowernode.com)
資料及源碼免費下載:
SpringBoot學習視頻_全套SpringBoot基礎教程免費下載 - 動力節點 (bjpowernode.com)
引入maven依賴
1 <dependency> 2 <groupId>org.eclipse.paho</groupId> 3 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 4 <version>1.2.0</version> 5 </dependency>
1、Mqtt
1 @Component 2 public class Mqtt implements CommandLineRunner { 3 4 5 @Autowired 6 private CommitUserInfo commitUserInfol; 7 @Autowired 8 private UserInfoDao userInfoDao; // 數據庫CRUD接口 9 @Autowired 10 private ApplicationRedis applicationRedis; // redis 接口 11 @Autowired 12 JudgeEquipmentService judgeEquipmentService; // 查詢/修改---當前設備號碼狀態 13 14 @Override 15 public void run(String... args) throws Exception { 16 applicationRedis.test(); 17 MqttConnectionUtils.start(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol); 18 19 } 20 21 22 23 }
2.MqttConnectionUtils
1 private static MqttClient client; 2 3 private static MqttConnectOptions connectOptions; 4 5 private static String TOPIC; 6 7 private static String clientId; 8 9 private static final Logger LOG = LogManager.getLogger(MqttConnectionUtils.class); 10 static { 11 try { 12 clientId = UUID.randomUUID().toString().trim().replaceAll("-", ""); 13 client = new MqttClient("tcp://****:1883",clientId); 14 connectOptions=new MqttConnectOptions(); 15 connectOptions.setCleanSession(false); 16 connectOptions.setUserName("用戶名"); 17 connectOptions.setPassword(PropertiesReader.getPassword().toCharArray());//密碼 connectOptions.setConnectionTimeout(10); 18 client.setTimeToWait(10000); 19 client.connect(connectOptions); 20 TOPIC = PropertiesReader.getTopic(); 21 } catch (MqttException e) { 22 e.printStackTrace(); 23 } 24 } 25 26 27 /** 28 * 發送數據 29 */ 30 31 public static void publish(String topic,String content) throws MqttException { 32 MqttMessage message=new MqttMessage(content.getBytes()); 33 message.setQos(1); 34 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設置日期格式 35 LOG.info("發送時間========"+df.format(new Date())); 36 LOG.info(topic+"主題發送成功,內容:"+message); 37 client.publish(topic,message); 38 } 39 40 /** 41 * 接收數據 42 */ 43 44 public static void start(UserInfoDao userInfoDao, ApplicationRedis applicationRedis, 45 JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol) throws MqttException { 46 MqttTopic topic = client.getTopic(TOPIC); 47 // setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 48 connectOptions.setWill(topic, "close".getBytes(), 2, true); 49 50 // 訂閱消息 51 int[] Qos = { 1 }; 52 String[] topic1 = { TOPIC }; 53 client.subscribe(topic1, Qos); 54 // 設置回調 55 client.setCallback(new PushCallback(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol)); 56 LOG.info("WIFI版啟動成功================="); 57 } 58 59 60 /** 61 * mqtt重連 62 */ 63 public static void reConnect() { 64 while (true){ 65 try { 66 if (null != client && !(client.isConnected())) { 67 Thread.sleep(1000); 68 clientId = UUID.randomUUID().toString().trim().replaceAll("-", ""); 69 client.connect(connectOptions); 70 LOG.info("=======嘗試重新連接=============="); 71 break; 72 } 73 } catch (MqttException | InterruptedException e) { 74 LOG.info("=======重新連接失敗:{}==============", e.toString()); 75 continue; 76 } 77 } 78 79 }
3.PushCallback
1 import com.alibaba.fastjson.JSONObject; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttException; 6 import org.eclipse.paho.client.mqttv3.MqttMessage; 7 import org.springframework.beans.factory.annotation.Autowired; 8 9 import java.io.UnsupportedEncodingException; 10 import java.util.*; 11 12 public class PushCallback implements MqttCallback{ 13 14 public static PushCallback pushCallback; 15 16 @Autowired 17 private UserInfoDao userInfoDao; // 數據庫CRUD接口 18 19 @Autowired 20 private CommitUserInfo commitUserInfol; 21 22 @Autowired 23 private ApplicationRedis applicationRedis; // redis 接口 24 @Autowired 25 JudgeEquipmentService judgeEquipmentService; // 查詢/修改---當前設備號碼狀態 26 27 28 29 public PushCallback(UserInfoDao userInfoDao,ApplicationRedis applicationRedis, 30 JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol){ 31 this.userInfoDao = userInfoDao; 32 this.applicationRedis = applicationRedis; 33 this.judgeEquipmentService = judgeEquipmentService; 34 this.commitUserInfol = commitUserInfol; 35 } 36 @Override 37 public void connectionLost(Throwable throwable) { 38 // 連接丟失后,一般在這里面進行重連 39 System.out.println("WIFI版======連接斷開,可以做重連"); 40 MqttConnectionUtils.reConnect(); 41 } 42 43 @Override 44 public void messageArrived(String topic, MqttMessage message) throws Exception { 45 // subscribe后得到的消息會執行到這里面 46 String messages = new String(message.getPayload()); 47 if(!messages.equals("close")){ 48 System.out.println("接收消息主題 : " + topic); 49 System.out.println("接收消息Qos : " + message.getQos()); 50 System.out.println("接收消息內容 : " + new String(message.getPayload())); 51 try { 52 53 perform(topic,json); 54 55 }catch (Exception e){ 56 57 } 58 59 } 60 61 62 } 63 64 @Override 65 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 66 System.out.println("deliveryComplete---------" +iMqttDeliveryToken.isComplete()); 67 } 68 69 public void perform(String topicP,JSONObject json) throws MqttException, UnsupportedEncodingException { 70 //你的業務模塊 71 } 72 73 74 }
轉載自:CSDN作者JAVA葉知秋
原文鏈接:https://blog.csdn.net/qq_37996327/article/details/104797737