<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.3</version>
</dependency>
mqtt:
urls: tcp://127.0.0.1:1883
clientId: WebKn9wu
username: king
password: 12341234
completionTimeout: 30000
/**
* Mqtt 屬性配置類
*/
@Component
@Configuration
@Setter
@Getter
public class MqttConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.urls}")
private String urls;
@Value("${spring.mqtt.clientId}")
private String clientId;
/** automaticReconnection 是否自動重連 默認false */
private boolean automaticReconnection = true;
/**
* 連接超時
*/
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout;
}
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* Mqtt 創建連接 消息發布 訂閱
*/
@Component
public class MyMqttClient {
private static final Logger logger = LoggerFactory.getLogger(MyMqttClient.class);
@Autowired
private MqttConfig config;
@Autowired
private PushCallback pushCallback;
@Autowired
private MqttLogService mqttLogService;
private org.eclipse.paho.client.mqttv3.MqttClient client;
private static MemoryPersistence memoryPersistence = null;
/** options MQTT Cleint連接配置 */
private MqttConnectOptions options;
public org.eclipse.paho.client.mqttv3.MqttClient getClient() {
return client;
}
public void setClient(org.eclipse.paho.client.mqttv3.MqttClient client) {
this.client = client;
}
/**
* connect : 客戶端連接
*/
@PostConstruct
public void connect() {
try {
// 設置持久化方式
memoryPersistence = new MemoryPersistence();
this.client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getUrls(), config.getClientId(), memoryPersistence);
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
// `重連清空session`
options.setCleanSession(true);
// 設置連接的用戶名
options.setUserName(config.getUsername());
// 設置連接的密碼
options.setPassword(config.getPassword().toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(config.getCompletionTimeout());
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線,但這個方法並沒有重連的機制
options.setKeepAliveInterval(30);
// 是否重連
options.setAutomaticReconnect(true);
// options.setMaxInflight(config.getMaxInflight());
this.setClient(client);
client.setCallback(pushCallback);
// 如果是new的回調 可以把 業務接口傳過去
// client.setCallback(new DefaultCallback(this, mqttLogService));
if (null !=client){
client.connect(options);
}
} catch (MqttException e) {
logger.error(e.getCause().getLocalizedMessage());
e.printStackTrace();
}
}
/**
* publish : 發布
*
* @param qos 連接方式
* @param retained 是否保留
* @param topic 主題
* @param pushMessage 消息體
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = this.getClient().getTopic(topic);
if (null == mTopic) {
logger.info("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱某個主題
* @param topic 主題
* @param qos 消息質量 0 1 2
*/
public void subscribe(String topic, int qos) {
if (null != client && client.isConnected()) {
logger.info("開始訂閱主題" + topic);
try {
this.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}else {
System.out.println("mqttClient is error");
}
}
/**
* 取消訂閱
* @param topic 要取消的主題
*/
public void cleanTopic(String topic) {
if (null != client && !client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("mqttClient is error");
}
}
/**關閉連接*/
@PreDestroy
public void closeConnect() {
//關閉存儲方式
if (null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
logger.error("memoryPersistence is null");
}
// 關閉連接
if (null != client) {
if (client.isConnected()) {
try {
client.disconnect();
client.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("關閉連接異常={}",e.getMessage());
}
} else {
logger.error("關閉連接異常={}","mqttClient is not connect");
}
} else {
logger.error("關閉連接異常={}","mqttClient is null");
}
}
}
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/***
* Mqtt 消息返回指定接口
*/
@Slf4j
@Component
public class PushCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
@Autowired
private MyMqttClient client;
@Autowired
private MqttConfig mqttConfig;
// 這是保存數據的接口
@Autowired
private MqttLogService mqttLogService;
/**
* 丟失了對服務端的連接后觸發的回調
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
new Thread(() -> {
logger.info("[MQTT] 連接斷開,1s之后嘗試重連...", throwable);
boolean reconnecting = false;
for (int i = 1; i < 5; i++) {
try {
if (client.getClient().isConnected()) {
break;
}
Thread.sleep(1000);
boolean needReconnect = !mqttConfig.isAutomaticReconnection() && !reconnecting && !client.getClient().isConnected();
if (needReconnect) {
logger.info("開始重連...");
client.getClient().reconnect();
reconnecting = true;
}
} catch (Exception e) {
logger.info("mqtt重連失敗,繼續重連,reason:" + e.getMessage(), e);
continue;
}
}
}).start();
}
/**
* 應用收到消息后觸發的回調
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("=====接收消息主題 : " + topic);
String msg = new String(mqttMessage.getPayload());
log.info("=====接收消息內容 : " + msg);
String[] tsplit = topic.split("/");
// 調用 `MqttLogService` 內的存儲方法
}
/**
* 消息發布者消息發布完成產生的回調
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
/**
* 連接完成 觸發回調
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
// 訂閱間隔自動上報數據
client.subscribe("$king/devices/xxx/#", 0);
// 訂閱設備信息
client.subscribe("$king/devices/xxx/#", 0);
log.info("mqtt連接成功,客戶端ID:" + mqttConfig.getUrls());
// 對連接成功 保存日志
}
}
第二種Callback
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/***
* Mqtt 消息返回指定接口
*/
@Slf4j
@Component
public class DefaultCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(DefaultCallback.class);
private MqttLogService mqttLogService;
private MyMqttClient client;
public DefaultCallback(){
}
public DefaultCallback(MyMqttClient pushClient, MqttLogService service) {
super();
this.client = pushClient;
this.mqttLogService = service;
}
/**
* 丟失了對服務端的連接后觸發的回調
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
new Thread(() -> {
logger.info("[MQTT] 連接斷開,1s之后嘗試重連...", throwable);
MqttClient mqttClient = client.getClient();
boolean reconnecting = false;
for (int i = 1; i < 5; i++) {
try {
if (mqttClient.isConnected()) {
break;
}
Thread.sleep(1000);
boolean needReconnect = !client.config.isAutomaticReconnection() && !reconnecting && !mqttClient.isConnected();
if (needReconnect) {
logger.info("開始重連...");
mqttClient.reconnect();
reconnecting = true;
}
} catch (Exception e) {
logger.info("mqtt重連失敗,繼續重連,reason:" + e.getMessage(), e);
continue;
}
}
}).start();
}
/**
* 應用收到消息后觸發的回調
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("=====接收消息主題 : " + topic);
String msg = new String(mqttMessage.getPayload());
log.info("=====接收消息內容 : " + msg);
String[] tsplit = topic.split("/");
// mqttLogService.saveMqtt(mqttLog);
log.info("-----對數據進行持久化");
}
/**
* 消息發布者消息發布完成產生的回調
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
/**
* 連接完成 觸發回調
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
// 訂閱間隔自動上報數據
client.subscribe("$king/devices/xxx/#", 0);
// 訂閱設備信息
client.subscribe("$king/devices/xxx/#", 0);
log.info("mqtt連接成功,客戶端ID:" + client.config.getUrls());
// 對連接成功 保存日志
}
}