MQTT的使用介紹


之前項目中使用到了mqtt,剛開始用着用着都不知道是干啥的,后來百度了一下:

    • MQTT 
      MQTT基於訂閱者模型架構,客戶端如果互相通信,必須在同一訂閱主題下,即都訂閱了同一個topic,客戶端之間是沒辦法直接通訊的。訂閱模型顯而易見的好處是群發消息的話只需要發布到topic,所有訂閱了這個topic的客戶端就可以接收到消息了。 
      發送消息必須發送到某個topic,重點說明的是不管客戶端是否訂閱了該topic都可以向topic發送了消息,還有如果客戶端訂閱了該主題,那么自己發送的消息也會接收到。

    • MQTT特點

      • 使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。這一點很類似於XMPP,但是MQTT的信息冗余遠小於XMPP
      • 對負載內容屏蔽的消息傳輸 
        使用TCP/IP提供網絡連接。主流的MQTT是基於TCP連接進行數據推送的,但是同樣有基於UDP的版本,叫做MQTT-SN。這兩種版本由於基於不同的連接方式,優缺點自然也就各有不同了
      • 三種消息傳輸方式QoS: 
        • 0代表“至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
        • 1代表“至少一次”,確保消息到達,但消息重復可能會發生。
        • 2代表“只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。 (備注:由於服務端采用Mosca實現,Mosca目前只支持到QoS 1)

      如果發送的是臨時的消息,例如給某topic所有在線的設備發送一條消息,丟失的話也無所謂,0就可以了(客戶端登錄的時候要指明支持的QoS級別,同時發送消息的時候也要指明這條消息支持的QoS級別),如果需要客戶端保證能接收消息,需要指定QoS為1,如果同時需要加入客戶端不在線也要能接收到消息,那么客戶端登錄的時候要指定session的有效性,接收離線消息需要指定服務端要保留客戶端的session狀態。

 

使用流程:

1.導入pom文件:

   <!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>

2.在web.xml配置監聽器:
<!-- 配置Mqtt監聽器監聽器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<listener>
<listener-class>com.hx.lease.mqtt.MQTTServletContextListener</listener-class>
</listener>
3.編寫mqtt的類:
public class MQTTServletContextListener implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
System.out.println("--------------------------contextInitialized-------------------------------");
/**
* 開啟8個主題的MQTT線程
*
* @throws ServletException
*/
ClientMQTT.openThread();
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
System.out.println("--------------------------------MQTTServlet.destroy()-------------------------");
}
}
//線程類:
public class ClientMQTT extends Thread {

public static final String HOST = "tcp://xxxxxx:1883";
private String TOPIC1 = ""; //訂閱的主題為:連接
private static final String clientid = UUID.randomUUID().toString().replace("-", "");
private MqttClient client;
private MqttConnectOptions options;
private String userName = "saldjasl14kfc15jl985sjfi"; //非必須
private String passWord = "AADjv134,75sda"; //非必須


@Override
public void run() {
System.out.println(String.format("------------------------%s主題線程開啟--------------------------------------", TOPIC1));
try {
// host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設置
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(false);
// 設置連接的用戶名
options.setUserName(userName);
// 設置連接的密碼
options.setPassword(passWord.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(100);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設置回調
client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC1);
//setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
//遺囑 options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
//訂閱消息
int[] Qos = {1};
String[] topic1 = {TOPIC1};
client.subscribe(topic1, Qos);
} catch (Exception e) {
System.out.println(e.getMessage());
//e.printStackTrace();
}
}

public static void openThread() {
System.out.println("--------------------------MQTTServlet.init()--------------------");
/**
* 將主題添加到集合中
*/
List<String> topicList = new ArrayList<>();
topicList.add("connect");//訂閱主題
topicList.add("disconnect");//斷開連接
topicList.add("abnornaldisconnection");//非正常掉線連接
topicList.add("heartbeat");//心跳包
topicList.add("coinoperated");//
topicList.add("catchdoll");//抓到娃娃后上傳
topicList.add("upset");//
topicList.add("bind");

/**
* 循環遍歷主題集合
* 每個主題開啟一個線程
*/
for (String topic : topicList) {
ClientMQTT clientMqttThread = new ClientMQTT();
clientMqttThread.setTOPIC1(topic);
clientMqttThread.start();
}
}

public String getTOPIC1() {
return TOPIC1;
}

public void setTOPIC1(String TOPIC1) {
this.TOPIC1 = TOPIC1;
}

}
//創建主題添加到集合中線程類
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM