一.MQTT介紹
1.簡介
MQTT(message queuing telemetry transport)是IBM開發的即時通訊協議,是一種發布/訂閱極其輕量級的消息傳輸協議,專門為網絡受限設備、低寬帶以及高延遲和不可靠的網絡而設計的。由於以上輕量級的特點,是實現智能家居的首選傳輸協議,相比於XMPP,更加輕量級而且占用寬帶低。
2.特點
a.由於采用發布/訂閱的消息模式,可以提供一對多的消息發布
b.輕量級,網絡開銷小
c.對負載內容會有屏蔽的消息傳輸
d.有三種消息發布質量(Qos):
qos=0:“至多一次”,這一級別會發生消息丟失或重復,消息發布依賴於TCP/IP網絡
qos=1:“至少一次”,確保消息到達,但消息重復可能會發生
qos=2:“只有一次”,確保消息到達一次
e.通知機制,異常中斷時會通知雙方
3.原理

MQTT協議有三種身份:發布者、代理、訂閱者,發布者和訂閱者都為客戶端,代理為服務器,同時消息的發布者也可以是訂閱者(為了節約內存和流量發布者和訂閱者一般都會定義在一起)。
MQTT傳輸的消息分為主題(Topic,可理解為消息的類型,訂閱者訂閱后,就會收到該主題的消息內容(payload))和負載(payload,可以理解為消息的內容)兩部分。
二.MQTT DEMO
windos下 下載並安裝mqtt 服務 運行並通過瀏覽器打開 http://127.0.0.1:18083/#/connections
MQTT 啟動說明:
進入bin目錄用cmd方式啟動 啟動命令emqx start, 關閉 emqx stop, 注意是 bin目錄下。 管理網頁http://192.168.128.8:18083/#/topics 賬戶 admin/public
jar包:主要是paho包
java demo 代碼
1.監控客戶端狀態:
package com.yt.mqtt.simple; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; /** * 監控客戶端狀態 * * @author YT * 2020-05-08 */ public class ClientStatus { public static void main(String[] args) throws MqttException { String HOST = "tcp://127.0.0.1:11883"; // String TOPIC = "mqtt/test"; // String TOPIC = "$SYS/broker/clients/connected"; String TOPIC ="$SYS/brokers/+/clients/#"; int qos = 1; String clientid = "subClient2"; String userName = "admin"; String passWord = "test"; try { // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 MqttConnectOptions options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調函數 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost"); } // public void messageArrived(String topic, MqttMessage message) throws Exception { // System.out.println("topic:"+topic); // System.out.println("Qos:"+message.getQos()); // System.out.println("message content:"+new String(message.getPayload())); // // } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------"+ token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); // System.out.println("topic:"+topic); // System.out.println("Qos:"+message.getQos()); System.out.println("Message content:"+msg); try { JSONObject jsonObject = JSON.parseObject(msg); String clientId = String.valueOf(jsonObject.get("clientid")); if (topic.endsWith("disconnected")) { System.out.println("客戶端已掉線:" +clientId); } else { System.out.println("客戶端已上線:"+clientId); } } catch (JSONException e) { System.out.println("JSON Format Parsing Exception : "+ msg); } } }); client.connect(options); //訂閱消息 client.subscribe(TOPIC, qos); } catch (Exception e) { e.printStackTrace(); } } }
消息發布類
package com.yt.mqtt.simple; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 消息發布 * * @author YT * 2020-05-08 */ public class PublishSample { public static void main(String[] args) { String topic = "mqtt/test"; String content = "hello 哈哈"; int qos = 1; String broker = "tcp://127.0.0.1:11883"; String userName = "admin"; String password = "test"; String clientId = "pubClient"; // 內存存儲 MemoryPersistence persistence = new MemoryPersistence(); try { // 創建客戶端 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); // 創建鏈接參數 MqttConnectOptions connOpts = new MqttConnectOptions(); // 在重新啟動和重新連接時記住狀態 connOpts.setCleanSession(false); // 設置連接的用戶名 connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); // 建立連接 sampleClient.connect(connOpts); // 創建消息 MqttMessage message = new MqttMessage(content.getBytes()); // 設置消息的服務質量 message.setQos(qos); // 發布消息 sampleClient.publish(topic, message); // 斷開連接 sampleClient.disconnect(); // 關閉客戶端 sampleClient.close(); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
消息訂閱類
package com.yt.mqtt.simple; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 消息訂閱 * * @param args * @throws MqttException */ public class SubscribeSample { // <dependencies> // <dependency> // <groupId>org.eclipse.paho</groupId> // <artifactId>org.eclipse.paho.client.mqttv3</artifactId> // <version>1.2.0</version> // </dependency> //</dependencies> public static void main(String[] args) throws MqttException { String HOST = "tcp://127.0.0.1:11883"; String TOPIC = "mqtt/test"; int qos = 1; String clientid = "subClient1"; String userName = "admin"; String passWord = "test"; try { // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 MqttConnectOptions options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調函數 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost"); } public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("topic:" + topic); System.out.println("Qos:" + message.getQos()); System.out.println("message content:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }); client.connect(options); // 訂閱消息 client.subscribe(TOPIC, qos); } catch (Exception e) { e.printStackTrace(); } } }
執行結果:
ClientStatus:
Message content:{"clean_start":true,"clientid":"subClient1","connack":0,"ipaddress":"127.0.0.1","keepalive":20,"proto_name":"MQTT","proto_ver":4,"ts":1588933626,"username":"admin"} 客戶端已上線:subClient1 Message content:{"clean_start":false,"clientid":"pubClient","connack":0,"ipaddress":"127.0.0.1","keepalive":60,"proto_name":"MQTT","proto_ver":4,"ts":1588933634,"username":"admin"} 客戶端已上線:pubClient Message content:{"clientid":"pubClient","username":"admin","reason":"normal","ts":1588933634} 客戶端已掉線:pubClient
SubscribeSample:
topic:mqtt/test Qos:1 message content:hello 哈哈