Mqtt的坑,真的坑


業務需要訂閱第三方主題,於是碰見了Mqtt接着就發現了 在 springboot 中整合mqtt 回調方法 messageArrived()獲取數據時如果直接操作數據庫會使連接斷開

頭大,網上查了好久,看了好多哥們的博客,也沒記下來都借鑒了哪些的。感謝各位大佬,啥也不說了,貼代碼記錄一下

@Slf4j
public class McListener {



private MqttClient client_sub;
private MqttConnectOptions options_sub;
// 用戶名
private final String username = "****";
// 密鑰
private final String password = "********";
// 主題
private final String footballTopic = "*******";
// 主題
private final String basketballTopic = "*******";

private final String broker = "*******";//你訂閱的地址

private final String clientId = "MqttClient_"+ System.currentTimeMillis();

private static final McListener listener = new McListener();

public static McListener getInstance() {
return listener;
}

private McListener() {

}
public void initMQTTListener() {
try {
//獲取配置信息
// HOST_MQ為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
client_sub = new MqttClient(broker, clientId, new MemoryPersistence());
// MQTT的連接設置
options_sub = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,設置為true表示每次連接到服務器都以新的身份連接
options_sub.setCleanSession(false);
// 設置連接的用戶名
options_sub.setUserName(username);
// 設置連接的密碼
options_sub.setPassword(password.toCharArray());
// 設置會話心跳時間 單位為秒 服務器會每隔90秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
options_sub.setKeepAliveInterval(90);
//訂閱topic定義
int[] Qos = new int[]{0, 0};
String[] topics = new String[]{footballTopic, basketballTopic};

// 設置回調
client_sub.setCallback(new PushCallback());
//連接mqtt服務器broker
client_sub.connect(options_sub);
//訂閱消息
client_sub.subscribe(topics, Qos);
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("init listener MQTT err info: {}", e.toString());
System.exit(-1);
}
}
public void reConnect() {
try {
if (null != client_sub && !(client_sub.isConnected())) {
client_sub.reconnect();
log.error("=======嘗試重新連接==============");
}
} catch (MqttException e) {
log.error("=======重新連接失敗:{}==============", e.toString());
}
}
/**
* 向某個主題發布消息 默認qos:1
*
* @param topic:發布的主題
* @param msg:發布的消息
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = client_sub.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}

/**
* 向某個主題發布消息
*
* @param topic: 發布的主題
* @param msg: 發布的消息
* @param qos: 消息質量 Qos:0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = client_sub.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}

/**
* 訂閱某一個主題 ,此方法默認的的Qos等級為:1
*
* @param topic 主題
*/
public void sub(String topic) throws MqttException {
client_sub.subscribe(topic);
}

/**
* 訂閱某一個主題,可攜帶Qos
*
* @param topic 所要訂閱的主題
* @param qos 消息質量:0、1、2
*/
public void sub(String topic, int qos) throws MqttException {
client_sub.subscribe(topic, qos);
}
/**
* 關閉MQTT連接
*/
public void close() throws MqttException {
client_sub.close();
client_sub.disconnect();
}
}

=====================================分割線======================================================
上面這些看着人畜無害,但是就在 messageArrived()里處理數據的時候就出問題了,上面的代碼沒有,是修改之后的,然后一並這個方法也挪走了

不嗶嗶了,貼代碼
PushCallback 實現 MqttCallback 划重點


@Slf4j
public class PushCallback implements MqttCallback {

@Autowired
private McListener mcListener;


public void connectionLost(Throwable cause) {
mcListener.reConnect();
}

public void deliveryComplete(IMqttDeliveryToken token) {
log.error("deliveryComplete---------" + token.isComplete());
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
log.debug("接收消息主題 : " + topic);
String res = new String(message.getPayload());
log.debug("接收消息內容 : " + res);
SendMsg.sendMsg.sendMsgtoCust(res,topic);

}




   =============================================重點這里=============================================
    所有相關數據庫一類的操作也好其他邏輯也好在這里寫

    sendMsgtoCust()就是你的方法名,里面寫你的邏輯


@Component
public static class SendMsg{


private static SendMsg sendMsg; //必須要重寫一下!!


@PostConstruct
public void init(){
sendMsg = this;
}
      
public void sendMsgtoCust(String msg,String topic){
try {

int type;
if (topic.equals("sports/football/match.v1")){
type = 1;
} else {
type = 2;
}

WsClient.send(msg,type);
// 判斷是否連接成功,未成功后面發送消息時會報錯

log.info("推送成功");
} catch (Exception e) {

e.printStackTrace();
}
}

}

}

調用的時候     SendMsg.sendMsg.sendMsgtoCust(res,topic);  嗯,就可以了,理論太菜,有空研究


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM