本篇記錄一下MQTT服務器,客戶端,JAVA客戶端的選擇開發與測試
MQTT服務端選擇與安裝過程;
MQTT客戶端測試工具安裝與測試;
MQTT JAVA客戶端的選擇與開發,測試
MQTT服務器選擇與安裝
目前主流的開源MQTT服務器主要是以下3個:
1) EMQX:github 4882 stars
2) Mosquitto:github 1645 stars
3) Apollo:ActiveMQ的升級版,github 109 stars
我們選擇EMQX作為本次MQTT的服務器,首先去EMQ官網去下載EMQX安裝版,截止目前最新版本已到V4.0.4
安裝步驟請參考官網給出的幫助文檔
按照官網操作無法啟動服務,卡在emqx start看過來
這步很坑的,一定要需要系統先安裝好ERLANG開發環境,ERLANG跳轉地址,可能是因為EMQX是用ERLANG來解釋或開發的
安裝啟動完成后,如果是安裝在本機,可以在瀏覽器中打開http://127.0.0.1:18083,輸入默認用戶名“admin”和默認密碼“public”,能順利進入管理控制台說明EMQX服務器安裝完成。
MQTT客戶端安裝與測試
客戶端測試工具建議使用MQTTBox
MQTTBox有兩種使用的方式:一種為Chrome插件;另外一種為Windows程序安裝。
1) Chrome插件直接去Chrome插件市場搜索mqttbox即可(需要FQ)
2) Windows安裝包位置:點擊下載
連接MQTT服務器:
參考下圖設置連接服務器,協議選擇mqtt/tcp ; Host端口為1883;
其它參數暫不影響測試,有興趣自行研究。

成功連接服務器,Connected圖標會顯示為綠色
增加一個訂閱者
訂單topic為 "hello";Qos 為默認0

增加一個發布者
發布一個topic為"hello" 的一段消息{"hello":"world"},Qos默認為0
消息發布后,訂閱者窗口馬上會收到這段消息。至此,客戶端測試就完成。
MQTT JAVA客戶端開發與測試
JAVA客戶端選擇
EMQX SDK_TOOLS上羅列的幾乎所有開發語言下的開源客戶端工具。JAVA下的SDK目前主要有Eclipse Paho,Xenqtt,MeQanTT,mqtt-client。
因為Spring 的 Integration Endpoints去連接官方支持DEMO里面用的就是Eclipse Paho,所以本次JAVA客戶端我們選擇的也是Eclipse Paho .
POM引入
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
發布端開發示例
1 import org.eclipse.paho.client.mqttv3.MqttClient; 2 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 3 import org.eclipse.paho.client.mqttv3.MqttException; 4 import org.eclipse.paho.client.mqttv3.MqttMessage; 5 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 6 7 /** 8 * 發布端未例 9 */ 10 public class PublishSample { 11 public static void main(String[] args) { 12 13 String topic = "mqtt/okok"; 14 String content = "hello 哈哈1"; 15 int qos = 1; 16 String broker = "tcp://127.0.0.1:1883"; 17 String userName = "test"; 18 String password = "test"; 19 String clientId = "pubClient1"; 20 // 內存存儲 21 MemoryPersistence persistence = new MemoryPersistence(); 22 23 try { 24 // 創建客戶端 25 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 26 // 創建鏈接參數 27 MqttConnectOptions connOpts = new MqttConnectOptions(); 28 // 在重新啟動和重新連接時記住狀態 29 connOpts.setCleanSession(false); 30 // 設置連接的用戶名 31 connOpts.setUserName(userName); 32 connOpts.setPassword(password.toCharArray()); 33 // 建立連接 34 sampleClient.connect(connOpts); 35 // 創建消息 36 MqttMessage message = new MqttMessage(content.getBytes()); 37 // 設置消息的服務質量 38 message.setQos(qos); 39 // 發布消息 40 sampleClient.publish(topic, message); 41 // 斷開連接 42 sampleClient.disconnect(); 43 // 關閉客戶端 44 sampleClient.close(); 45 } catch (MqttException me) { 46 System.out.println("reason " + me.getReasonCode()); 47 System.out.println("msg " + me.getMessage()); 48 System.out.println("loc " + me.getLocalizedMessage()); 49 System.out.println("cause " + me.getCause()); 50 System.out.println("excep " + me); 51 me.printStackTrace(); 52 } 53 } 54 }
訂閱端發布實例
1 import org.eclipse.paho.client.mqttv3.*; 2 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 3 4 /** 5 * 訂閱端 6 */ 7 public class SubscribeSample { 8 public static void main(String[] args) throws MqttException { 9 String HOST = "tcp://127.0.0.1:1883"; 10 String TOPIC = "mqtt/okok"; 11 int qos = 1; 12 String clientid = "subClient1"; 13 String userName = "test"; 14 String passWord = "test"; 15 try { 16 // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 17 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); 18 // MQTT的連接設置 19 MqttConnectOptions options = new MqttConnectOptions(); 20 // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 21 options.setCleanSession(true); 22 // 設置連接的用戶名 23 options.setUserName(userName); 24 // 設置連接的密碼 25 options.setPassword(passWord.toCharArray()); 26 // 設置超時時間 單位為秒 27 options.setConnectionTimeout(10); 28 // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 29 options.setKeepAliveInterval(20); 30 // 設置回調函數 31 client.setCallback(new MqttCallback() { 32 33 @Override 34 public void connectionLost(Throwable cause) { 35 System.out.println("connectionLost"); 36 } 37 38 @Override 39 public void messageArrived(String topic, MqttMessage message) throws Exception { 40 System.out.println("topic:"+topic); 41 System.out.println("Qos:"+message.getQos()); 42 System.out.println("message content:"+new String(message.getPayload())); 43 44 } 45 46 @Override 47 public void deliveryComplete(IMqttDeliveryToken token) { 48 System.out.println("deliveryComplete---------"+ token.isComplete()); 49 } 50 51 }); 52 client.connect(options); 53 //訂閱消息 54 client.subscribe(TOPIC, qos); 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } 58 } 59 }
下一篇:MQTT整合進Spring Mvc
更多參考網站:
1.EMQX幫助中心 https://docs.emqx.io/broker/latest/cn/
2.Spring MQTT Support https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/mqtt.html
