MQTT java 實現 案例 demo


一.MQTT介紹

1.簡介

MQTT(message queuing telemetry transport)是IBM開發的即時通訊協議,是一種發布/訂閱極其輕量級的消息傳輸協議,專門為網絡受限設備、低寬帶以及高延遲和不可靠的網絡而設計的。由於以上輕量級的特點,是實現智能家居的首選傳輸協議,相比於XMPP,更加輕量級而且占用寬帶低。

2.特點

a.由於采用發布/訂閱的消息模式,可以提供一對多的消息發布
b.輕量級,網絡開銷小
c.對負載內容會有屏蔽的消息傳輸
d.有三種消息發布質量(Qos):
qos=0:“至多一次”,這一級別會發生消息丟失或重復,消息發布依賴於TCP/IP網絡
qos=1:“至少一次”,確保消息到達,但消息重復可能會發生
qos=2:“只有一次”,確保消息到達一次
e.通知機制,異常中斷時會通知雙方

3.原理

 
 

MQTT協議有三種身份:發布者、代理、訂閱者,發布者和訂閱者都為客戶端,代理為服務器,同時消息的發布者也可以是訂閱者(為了節約內存和流量發布者和訂閱者一般都會定義在一起)。
MQTT傳輸的消息分為主題(Topic,可理解為消息的類型,訂閱者訂閱后,就會收到該主題的消息內容(payload))和負載(payload,可以理解為消息的內容)兩部分。

二.MQTT DEMO

windos下 下載並安裝mqtt 服務 運行並通過瀏覽器打開  http://127.0.0.1:18083/#/connections

 

MQTT 啟動說明:
進入bin目錄用cmd方式啟動 啟動命令emqx start, 關閉 emqx stop, 注意是 bin目錄下。 管理網頁http:
//192.168.128.8:18083/#/topics 賬戶 admin/public

 

 

jar包:主要是paho包

 

 

 

java demo 代碼 

1.監控客戶端狀態:

package com.yt.mqtt.simple;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;

/**
 * 監控客戶端狀態
 * 
 * @author YT
 * 2020-05-08
 */
public class ClientStatus {
     public static void main(String[] args) throws MqttException {   
            String HOST = "tcp://127.0.0.1:11883";  
//            String TOPIC = "mqtt/test";
//            String TOPIC = "$SYS/broker/clients/connected";
            String TOPIC ="$SYS/brokers/+/clients/#";
            int qos = 1;
            String clientid = "subClient2";
            String userName = "admin";
            String passWord = "test";
            try {
                // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
                MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的連接設置
                MqttConnectOptions options = new MqttConnectOptions();
                // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
                options.setCleanSession(true);
                // 設置連接的用戶名
                options.setUserName(userName);
                // 設置連接的密碼
                options.setPassword(passWord.toCharArray());
                // 設置超時時間 單位為秒
                options.setConnectionTimeout(10);
                // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
                options.setKeepAliveInterval(20);
                // 設置回調函數
                client.setCallback(new MqttCallback() {

                    public void connectionLost(Throwable cause) {
                        System.out.println("connectionLost");
                    }

//                    public void messageArrived(String topic, MqttMessage message) throws Exception {
//                        System.out.println("topic:"+topic);
//                        System.out.println("Qos:"+message.getQos());
//                        System.out.println("message content:"+new String(message.getPayload()));
//                        
//                    }

                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("deliveryComplete---------"+ token.isComplete());
                    }
                    
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        String msg = new String(message.getPayload());
//                        System.out.println("topic:"+topic);
//                        System.out.println("Qos:"+message.getQos());
                        System.out.println("Message content:"+msg); 
                        try {
                            JSONObject jsonObject = JSON.parseObject(msg);
                            String clientId = String.valueOf(jsonObject.get("clientid"));
                            if (topic.endsWith("disconnected")) {
                                 System.out.println("客戶端已掉線:" +clientId);
                            } else {
                                  System.out.println("客戶端已上線:"+clientId);
                            }
                        } catch (JSONException e) {
                              System.out.println("JSON Format Parsing Exception : "+ msg);
                        }
                    }
                });
                client.connect(options);
                //訂閱消息
                client.subscribe(TOPIC, qos);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
}

消息發布類

package com.yt.mqtt.simple;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 消息發布
 * 
 * @author YT
 * 2020-05-08
 */
public class PublishSample {
    public static void main(String[] args) {

        String topic = "mqtt/test";
        String content = "hello 哈哈";
        int qos = 1;
        String broker = "tcp://127.0.0.1:11883";  
        String userName = "admin";
        String password = "test";
        String clientId = "pubClient";
        // 內存存儲
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            // 創建客戶端
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            // 創建鏈接參數
            MqttConnectOptions connOpts = new MqttConnectOptions();
            // 在重新啟動和重新連接時記住狀態
            connOpts.setCleanSession(false);
            // 設置連接的用戶名
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
            // 建立連接
            sampleClient.connect(connOpts);
            // 創建消息
            MqttMessage message = new MqttMessage(content.getBytes());
            // 設置消息的服務質量
            message.setQos(qos);
            // 發布消息
            sampleClient.publish(topic, message);
            // 斷開連接
            sampleClient.disconnect();
            // 關閉客戶端
            sampleClient.close();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

消息訂閱類

package com.yt.mqtt.simple;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


/**
 * 消息訂閱
 * 
 * @param args
 * @throws MqttException
 */
public class SubscribeSample {
//    <dependencies>
//    <dependency>
//        <groupId>org.eclipse.paho</groupId>
//        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
//        <version>1.2.0</version>
//    </dependency>
//</dependencies>

    public static void main(String[] args) throws MqttException {
        String HOST = "tcp://127.0.0.1:11883";
        String TOPIC = "mqtt/test";
        int qos = 1;
        String clientid = "subClient1";
        String userName = "admin";
        String passWord = "test";
        try {
            // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
            MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的連接設置
            MqttConnectOptions options = new MqttConnectOptions();
            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
            options.setCleanSession(true);
            // 設置連接的用戶名
            options.setUserName(userName);
            // 設置連接的密碼
            options.setPassword(passWord.toCharArray());
            // 設置超時時間 單位為秒
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
            options.setKeepAliveInterval(20);
            // 設置回調函數
            client.setCallback(new MqttCallback() {

                public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost");
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("topic:" + topic);
                    System.out.println("Qos:" + message.getQos());
                    System.out.println("message content:" + new String(message.getPayload()));

                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------" + token.isComplete());
                }

            });
            client.connect(options);
            // 訂閱消息
            client.subscribe(TOPIC, qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

執行結果:

ClientStatus:

Message content:{"clean_start":true,"clientid":"subClient1","connack":0,"ipaddress":"127.0.0.1","keepalive":20,"proto_name":"MQTT","proto_ver":4,"ts":1588933626,"username":"admin"}
客戶端已上線:subClient1
Message content:{"clean_start":false,"clientid":"pubClient","connack":0,"ipaddress":"127.0.0.1","keepalive":60,"proto_name":"MQTT","proto_ver":4,"ts":1588933634,"username":"admin"}
客戶端已上線:pubClient
Message content:{"clientid":"pubClient","username":"admin","reason":"normal","ts":1588933634}
客戶端已掉線:pubClient

SubscribeSample:

topic:mqtt/test
Qos:1
message content:hello 哈哈

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM