開始研究JAVA及MQTT,寫下該文章方便使用參考
1、添加依賴 pom.xml
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.35</version> </dependency>
2、mqtt配置 application.yml
# Spring配置 spring: # MQTT mqtt: # 服務器連接地址,如果有多個,用逗號隔開 host: tcp://192.168.1.1:1883 # 連接服務器默認客戶端ID clientId: mqtt_jlsId_ # 默認的消息推送主題,實際可在調用接口時指定 topic: topic_a,topic_b,topic_c # 用戶名 username: # 密碼 password: # 連接超時 timeout: 30 # 心跳 keepalive: 30
3、獲取配置信息工具類
當然可以使用注解 @Value獲取配置信息
這里是網上查的資料直接使用以下類來獲取
package com.projuct.junlaishun.mqtt; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * 獲取配置信息 **/ public class PropertiesUtil { public static String MQTT_HOST; public static String MQTT_CLIENT_ID; public static String MQTT_USER_NAME; public static String MQTT_PASSWORD; public static String MQTT_TOPIC; public static Integer MQTT_TIMEOUT; public static Integer MQTT_KEEP_ALIVE; /** * mqtt配置 */ static { Properties properties = loadMqttProperties(); MQTT_HOST = properties.getProperty("host"); MQTT_CLIENT_ID = properties.getProperty("clientId")+Math.random(); MQTT_USER_NAME = properties.getProperty("username"); MQTT_PASSWORD = properties.getProperty("password"); MQTT_TOPIC = properties.getProperty("topic"); MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout")); MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive")); } private static Properties loadMqttProperties() { InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml"); Properties properties = new Properties(); try { properties.load(inputstream); return properties; } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (inputstream != null) { inputstream.close(); } } catch (IOException e) { throw new RuntimeException(e); } } } }
4、MQTT客戶端 MqttConsumer類
package com.projuct.junlaishun.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class MqttConsumer implements ApplicationRunner { private static MqttClient client; @Override public void run(ApplicationArguments args) { System.out.println("初始化並啟動mqtt......"); this.connect(); } /** * 連接mqtt服務器 */ private void connect() { try { // 1 創建客戶端 getClient(); // 2 設置配置 MqttConnectOptions options = getOptions(); String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); // 3 消息發布質量 int[] qos = getQos(topic.length); // 4 最后設置 create(options, topic, qos); } catch (Exception e) { System.out.println("mqtt連接異常:" + e); } } /** * 創建客戶端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); } System.out.println("創建mqtt客戶端:" ); } catch (Exception e) { System.out.println("創建mqtt客戶端異常:\" + e:" ); } } /** * 生成配置對象,用戶名,密碼等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(PropertiesUtil.MQTT_USER_NAME); options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); // 設置超時時間 options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); // 設置會話心跳時間 options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(false); System.out.println("--生成mqtt配置對象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTT協議中有三種消息發布服務質量: * * QOS0: “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。 * QOS1: “至少一次”,確保消息到達,但消息重復可能會發生。 * QOS2: “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果,資源開銷大 */ qos[i] = 1; } System.out.println("--設置消息發布質量"); return qos; } /** * 裝載各種實例和訂閱主題 --- 4 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); System.out.println("--添加回調處理類"); client.connect(options); } catch (Exception e) { System.out.println("裝載實例或訂閱主題異常:" + e); } } /** * 訂閱某個主題 * * @param topic * @param qos */ public static void subscribe(String topic, int qos) { try { System.out.println("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 發布,非持久化 * * qos根據文檔設置為1 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 發布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { System.out.println("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (!token.isComplete()) { System.out.println("消息發送成功"); } } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } }
5、mqtt回調處理類
package com.projuct.junlaishun.mqtt; import org.eclipse.paho.client.mqttv3.*; import java.util.Arrays; /** * mqtt回調處理類 */ public class MqttConsumerCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; private String[] topic; private int[] qos; public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** * 斷開重連 */ @Override public void connectionLost(Throwable cause) { System.out.println("MQTT連接斷開,發起重連......"); try { if (null != client && !client.isConnected()) { client.reconnect(); System.out.println("嘗試重新連接"); } else { client.connect(options); System.out.println("嘗試建立新連接"); } } catch (Exception e) { e.printStackTrace(); } } /** * 接收到消息調用令牌中調用 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + Arrays.toString(topic)); } /** * 消息處理 */ @Override public void messageArrived(String topic, MqttMessage message) { try { String msg = new String(message.getPayload()); System.out.println("收到topic:" + topic + " 消息:" + msg); System.out.println("收到消息后執行具體的業務邏輯操作,比如將消息存儲進數據庫"); } catch (Exception e) { System.out.println("處理mqtt消息異常:" + e); } } /** * mqtt連接后訂閱主題 */ @Override public void connectComplete(boolean b, String s) { try { if (null != topic && null != qos) { if (client.isConnected()) { client.subscribe(topic, qos); System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID); System.out.println("--訂閱主題::" + Arrays.toString(topic)); } else { System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID); } } } catch (Exception e) { System.out.println("mqtt訂閱主題異常:" + e); } } }
6、編寫測試方法MqttController
package com.projuct.junlaishun.mqtt; import com.alibaba.fastjson.JSONObject; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/mqtt") public class MqttController { /** * 推送消息 */ @ResponseBody //@GetMapping(value = "/push") @RequestMapping("sendMqttMessage") public JSONObject push(String topic, String message) { MqttConsumer.publish(topic, message); JSONObject res = new JSONObject(); res.put("code",0); res.put("msg","測試發布主題成功"); return res; } }
測試記錄截圖