本篇記錄一下MQTT服務器,客戶端,JAVA客戶端的選擇開發與測試

MQTT服務端選擇與安裝過程;
MQTT客戶端測試工具安裝與測試;
MQTT JAVA客戶端的選擇與開發,測試


MQTT服務器選擇與安裝

目前主流的開源MQTT服務器主要是以下3個:
1) EMQX:github 4882 stars
2) Mosquitto:github 1645 stars
3) Apollo:ActiveMQ的升級版,github 109 stars

我們選擇EMQX作為本次MQTT的服務器,首先去EMQ官網去下載EMQX安裝版,截止目前最新版本已到V4.0.4

安裝步驟請參考官網給出的幫助文檔

按照官網操作無法啟動服務,卡在emqx start看過來
  這步很坑的,一定要需要系統先安裝好ERLANG開發環境,ERLANG跳轉地址,可能是因為EMQX是用ERLANG來解釋或開發的

安裝啟動完成后,如果是安裝在本機,可以在瀏覽器中打開http://127.0.0.1:18083,輸入默認用戶名“admin”和默認密碼“public”,能順利進入管理控制台說明EMQX服務器安裝完成。
微信截圖_20200316231950.png

MQTT客戶端安裝與測試

客戶端測試工具建議使用MQTTBox

MQTTBox有兩種使用的方式:一種為Chrome插件;另外一種為Windows程序安裝。

1) Chrome插件直接去Chrome插件市場搜索mqttbox即可(需要FQ)
2) Windows安裝包位置:點擊下載

連接MQTT服務器:

參考下圖設置連接服務器,協議選擇mqtt/tcp ; Host端口為1883;
其它參數暫不影響測試,有興趣自行研究。

微信截圖_20200316232820.png

成功連接服務器,Connected圖標會顯示為綠色
微信截圖_20200316233123.png

增加一個訂閱者

訂單topic為 "hello";Qos 為默認0

微信截圖_20200316233356.png

增加一個發布者

發布一個topic為"hello" 的一段消息{"hello":"world"},Qos默認為0

消息發布后,訂閱者窗口馬上會收到這段消息。至此,客戶端測試就完成。
微信截圖_20200316233529.png

MQTT JAVA客戶端開發與測試

JAVA客戶端選擇
EMQX SDK_TOOLS上羅列的幾乎所有開發語言下的開源客戶端工具。JAVA下的SDK目前主要有Eclipse Paho,Xenqtt,MeQanTT,mqtt-client。

因為Spring 的 Integration Endpoints去連接官方支持DEMO里面用的就是Eclipse Paho,所以本次JAVA客戶端我們選擇的也是Eclipse Paho .

POM引入

  <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

 

發布端開發示例

 1 import org.eclipse.paho.client.mqttv3.MqttClient;
 2 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 3 import org.eclipse.paho.client.mqttv3.MqttException;
 4 import org.eclipse.paho.client.mqttv3.MqttMessage;
 5 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 6 
 7 /**
 8  * 發布端未例
 9  */
10 public class PublishSample {
11     public static void main(String[] args) {
12 
13         String topic = "mqtt/okok";
14         String content = "hello 哈哈1";
15         int qos = 1;
16         String broker = "tcp://127.0.0.1:1883";
17         String userName = "test";
18         String password = "test";
19         String clientId = "pubClient1";
20         // 內存存儲
21         MemoryPersistence persistence = new MemoryPersistence();
22 
23         try {
24             // 創建客戶端
25             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
26             // 創建鏈接參數
27             MqttConnectOptions connOpts = new MqttConnectOptions();
28             // 在重新啟動和重新連接時記住狀態
29             connOpts.setCleanSession(false);
30             // 設置連接的用戶名
31             connOpts.setUserName(userName);
32             connOpts.setPassword(password.toCharArray());
33             // 建立連接
34             sampleClient.connect(connOpts);
35             // 創建消息
36             MqttMessage message = new MqttMessage(content.getBytes());
37             // 設置消息的服務質量
38             message.setQos(qos);
39             // 發布消息
40             sampleClient.publish(topic, message);
41             // 斷開連接
42             sampleClient.disconnect();
43             // 關閉客戶端
44             sampleClient.close();
45         } catch (MqttException me) {
46             System.out.println("reason " + me.getReasonCode());
47             System.out.println("msg " + me.getMessage());
48             System.out.println("loc " + me.getLocalizedMessage());
49             System.out.println("cause " + me.getCause());
50             System.out.println("excep " + me);
51             me.printStackTrace();
52         }
53     }
54 }

 

訂閱端發布實例

 1 import org.eclipse.paho.client.mqttv3.*;
 2 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 3 
 4 /**
 5  * 訂閱端
 6  */
 7 public class SubscribeSample {
 8     public static void main(String[] args) throws MqttException {
 9         String HOST = "tcp://127.0.0.1:1883";
10         String TOPIC = "mqtt/okok";
11         int qos = 1;
12         String clientid = "subClient1";
13         String userName = "test";
14         String passWord = "test";
15         try {
16             // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
17             MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
18             // MQTT的連接設置
19             MqttConnectOptions options = new MqttConnectOptions();
20             // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
21             options.setCleanSession(true);
22             // 設置連接的用戶名
23             options.setUserName(userName);
24             // 設置連接的密碼
25             options.setPassword(passWord.toCharArray());
26             // 設置超時時間 單位為秒
27             options.setConnectionTimeout(10);
28             // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
29             options.setKeepAliveInterval(20);
30             // 設置回調函數
31             client.setCallback(new MqttCallback() {
32 
33                 @Override
34                 public void connectionLost(Throwable cause) {
35                     System.out.println("connectionLost");
36                 }
37 
38                 @Override
39                 public void messageArrived(String topic, MqttMessage message) throws Exception {
40                     System.out.println("topic:"+topic);
41                     System.out.println("Qos:"+message.getQos());
42                     System.out.println("message content:"+new String(message.getPayload()));
43 
44                 }
45 
46                 @Override
47                 public void deliveryComplete(IMqttDeliveryToken token) {
48                     System.out.println("deliveryComplete---------"+ token.isComplete());
49                 }
50 
51             });
52             client.connect(options);
53             //訂閱消息
54             client.subscribe(TOPIC, qos);
55         } catch (Exception e) {
56             e.printStackTrace();
57         }
58     }
59 }

 

下一篇:MQTT整合進Spring Mvc

更多參考網站:

1.EMQX幫助中心 https://docs.emqx.io/broker/latest/cn/
2.Spring MQTT Support https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/mqtt.html