開始研究JAVA及MQTT,寫下該文章方便使用參考
1、添加依賴 pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.35</version>
</dependency>
2、mqtt配置 application.yml
# Spring配置
spring:
# MQTT
mqtt:
# 服務器連接地址,如果有多個,用逗號隔開
host: tcp://192.168.1.1:1883
# 連接服務器默認客戶端ID
clientId: mqtt_jlsId_
# 默認的消息推送主題,實際可在調用接口時指定
topic: topic_a,topic_b,topic_c
# 用戶名
username:
# 密碼
password:
# 連接超時
timeout: 30
# 心跳
keepalive: 30
3、獲取配置信息工具類
當然可以使用注解 @Value獲取配置信息
這里是網上查的資料直接使用以下類來獲取
package com.projuct.junlaishun.mqtt;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 獲取配置信息
**/
public class PropertiesUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENT_ID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static String MQTT_TOPIC;
public static Integer MQTT_TIMEOUT;
public static Integer MQTT_KEEP_ALIVE;
/**
* mqtt配置
*/
static {
Properties properties = loadMqttProperties();
MQTT_HOST = properties.getProperty("host");
MQTT_CLIENT_ID = properties.getProperty("clientId")+Math.random();
MQTT_USER_NAME = properties.getProperty("username");
MQTT_PASSWORD = properties.getProperty("password");
MQTT_TOPIC = properties.getProperty("topic");
MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));
MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));
}
private static Properties loadMqttProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
4、MQTT客戶端 MqttConsumer類
package com.projuct.junlaishun.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class MqttConsumer implements ApplicationRunner { private static MqttClient client; @Override public void run(ApplicationArguments args) { System.out.println("初始化並啟動mqtt......"); this.connect(); } /** * 連接mqtt服務器 */ private void connect() { try { // 1 創建客戶端 getClient(); // 2 設置配置 MqttConnectOptions options = getOptions(); String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); // 3 消息發布質量 int[] qos = getQos(topic.length); // 4 最后設置 create(options, topic, qos); } catch (Exception e) { System.out.println("mqtt連接異常:" + e); } } /** * 創建客戶端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); } System.out.println("創建mqtt客戶端:" ); } catch (Exception e) { System.out.println("創建mqtt客戶端異常:\" + e:" ); } } /** * 生成配置對象,用戶名,密碼等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(PropertiesUtil.MQTT_USER_NAME); options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); // 設置超時時間 options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); // 設置會話心跳時間 options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(false); System.out.println("--生成mqtt配置對象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTT協議中有三種消息發布服務質量: * * QOS0: “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。 * QOS1: “至少一次”,確保消息到達,但消息重復可能會發生。 * QOS2: “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果,資源開銷大 */ qos[i] = 1; } System.out.println("--設置消息發布質量"); return qos; } /** * 裝載各種實例和訂閱主題 --- 4 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); System.out.println("--添加回調處理類"); client.connect(options); } catch (Exception e) { System.out.println("裝載實例或訂閱主題異常:" + e); } } /** * 訂閱某個主題 * * @param topic * @param qos */ public static void subscribe(String topic, int qos) { try { System.out.println("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 發布,非持久化 * * qos根據文檔設置為1 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 發布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { System.out.println("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (!token.isComplete()) { System.out.println("消息發送成功"); } } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } }
5、mqtt回調處理類
package com.projuct.junlaishun.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import java.util.Arrays;
/**
* mqtt回調處理類
*/
public class MqttConsumerCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
/**
* 斷開重連
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("MQTT連接斷開,發起重連......");
try {
if (null != client && !client.isConnected()) {
client.reconnect();
System.out.println("嘗試重新連接");
} else {
client.connect(options);
System.out.println("嘗試建立新連接");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 接收到消息調用令牌中調用
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + Arrays.toString(topic));
}
/**
* 消息處理
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String msg = new String(message.getPayload());
System.out.println("收到topic:" + topic + " 消息:" + msg);
System.out.println("收到消息后執行具體的業務邏輯操作,比如將消息存儲進數據庫");
} catch (Exception e) {
System.out.println("處理mqtt消息異常:" + e);
}
}
/**
* mqtt連接后訂閱主題
*/
@Override
public void connectComplete(boolean b, String s) {
try {
if (null != topic && null != qos) {
if (client.isConnected()) {
client.subscribe(topic, qos);
System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
System.out.println("--訂閱主題::" + Arrays.toString(topic));
} else {
System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
}
}
} catch (Exception e) {
System.out.println("mqtt訂閱主題異常:" + e);
}
}
}
6、編寫測試方法MqttController
package com.projuct.junlaishun.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mqtt")
public class MqttController {
/**
* 推送消息
*/
@ResponseBody
//@GetMapping(value = "/push")
@RequestMapping("sendMqttMessage")
public JSONObject push(String topic,
String message) {
MqttConsumer.publish(topic, message);
JSONObject res = new JSONObject();
res.put("code",0);
res.put("msg","測試發布主題成功");
return res;
}
}
測試記錄截圖

