EMQ X - EmqxBroker Window10環境安裝部署以及發布訂閱測試


                   

    在EMQ官網上拿張圖哈^_^;;

    本來就是在做物聯網項目嘛,MQTT協議肯定是必須要的嘛,但之前不是我來負責這一塊的,就沒有對MQTT以及EMQ有更多的理解,只是會用能用罷了,要是讓我說個1 2 3 ,肯定是不行的呀,最近有一個項目剛好我來對接開發,而且是MQTT協議的,由於受測試環境的限制,只能在本地筆記本上window上搭建一套EMQ環境來試試啦;

一、EMQX安裝;

    1、下載EMQ X Broker壓縮包;這里放了官網最新版的https://www.emqx.io/downloads/broker/v4.1.0/emqx-windows-v4.1.0.zip;下面文章中的使用的是 “emqx-windows10-v3.2.0.zip”;

    2、在筆記本本地目錄直接解壓就行啦;

    3、進入解壓后的目錄;C:\OldData_Win7\emqx-windows10-v3.2.0\emqx\bin;

    4、進入EMQX啟動命令目錄;按下shift鍵+鼠標右鍵,選擇 ‘open PowerShell window here’;

    5、啟動EMQX服務;輸入./emqx console;會彈出erlang的后台界面;

                                      輸入./emqx stop;即可停止服務啦;

                                      輸入./emqx start;即可輕松啟動服務啦;(推薦)

    6、啟動成功后,EMQX自帶的dashboard既可以訪問啦;http://localhost:18083/#/login 默認u/p:admin/public

 

     7、到這里我們的EMQ X Broker就已經可以使用了; 

二、java連接EMQX,並發起訂閱發布的操作;

    1、因為客戶端和服務端是都可以發布和訂閱的;所以我們就以發布和訂閱來區分吧;

         訂閱端sub:(啟動main方法就可以連接到我們本地的EMQX了,哦,我這里使用了共享訂閱的,random策略)

package com.daopin.project.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubMsg0 {
    private static final Logger logger = LoggerFactory.getLogger(SubMsg0.class);
    //private static String topic = "$share/group/test1";
    //private static String topic = "$queue/test1";
    //private static String topic = "test1";
    private static int qos = 0;
    private static String broker = "tcp://127.0.0.1:1883";
    private static String userName = "COAP";
    private static String passWord = "coap";
    private static String clientId = "nokia-mqtt-cluster-0";

    /**
     * 有三種消息發布服務質量:
     * <p>
     *   “至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
     * <p>
     *   “至少一次”,確保消息到達,但消息重復可能會發生。
     * <p>
     *   “只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。
     */
    private static MqttClient connect(String clientId) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        //String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"};
        // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
        // 這里設置為true表示每次連接到服務器都以新的身份連接(客戶端再次上線時,將不再關心之前所有的訂閱關系以及離線消息)
        // 這里設置為false表示客戶端再次上線時,還需要處理之前的離線消息,而之前的訂閱關系也會持續生效
        connOpts.setCleanSession(true);
        connOpts.setUserName(userName);
        connOpts.setPassword(passWord.toCharArray());
        connOpts.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
        connOpts.setKeepAliveInterval(20);
        connOpts.setAutomaticReconnect(true);
        connOpts.setMaxInflight(10);
        //connOpts.setServerURIs(uris);
        //setWill方法(遺囑),如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
        //connOpts.setWill(topic, "close".getBytes(), 2, true);
        // MemoryPersistence設置clientid的保存形式,默認為以內存保存
        MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectionLost(Throwable throwable) {
                //在斷開連接時調用 連接丟失后,一般在這里面進行重連
                logger.warn("connectionLost ... ; We will do something ...");
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) {
                //接收已經預訂的發布
                logger.info("topic - > " + topic + ", mqttMessage - > " + mqttMessage);

            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                //接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用
                logger.warn("deliveryComplete ... ; We will do something ...");
            }

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /*subscribe();*/ //連接成功,需要上傳客戶端所有的訂閱關系
                logger.warn("connectComplete ... ; We will do something ...");
            }
        });
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    public static void sub(MqttClient mqttClient, String topic) throws MqttException {
        int[] Qos = {qos};
        String[] topics = {topic};
        mqttClient.subscribe(topics, Qos);
        logger.info("sub >> " + topic);
    }


    private static void runsub(String clientId, String topic) throws MqttException {
        MqttClient mqttClient = connect(clientId);
        if (mqttClient != null) {
            sub(mqttClient, topic);
        }
    }

    public static void main(String[] args) throws MqttException {
        runsub(clientId, "$queue/qdq02mzl6kvs/coap-server/uplinkMsg");
    }
}

    2、發布端pub; 

package com.daopin.project.mqtt;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.concurrent.*;

public class PubMsg {
    private static final Logger logger = LoggerFactory.getLogger(PubMsg.class);
    private static int qos = 0;
    private static String broker = "tcp://127.0.0.1:1883";
    private static String userName = "COAP";
    private static String passWord = "coap";

    public static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    public static ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    private static MqttClient connect(String clientId, String userName,
                                      String password) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        connOpts.setConnectionTimeout(10);
        connOpts.setKeepAliveInterval(20);
        //String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
        //connOpts.setServerURIs(uris);  //起到負載均衡和高可用的作用
        MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
        mqttClient.setCallback(new PushCallback("test"));
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    private static void pub(MqttClient sampleClient, String msg, String topic)
            throws Exception {
        while (true){
            MqttMessage message = new MqttMessage((msg+" "+new Date()).getBytes());
            message.setQos(qos);
            message.setRetained(false);
            sampleClient.publish(topic, message);
            logger.info("pub-->" + message);
            Thread.sleep(3000L);
        }
    }

    private static void publish(String str, String clientId, String topic)  {
        try {
            MqttClient mqttClient = connect(clientId, userName, passWord);

            if (mqttClient != null) {
                pub(mqttClient, str, topic);
            }

            if (mqttClient != null) {
                mqttClient.disconnect();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        //singleThreadPool.execute( ()-> publish("message content", "868333030030008", "sl6o3lbk94xg/868333030030008/uplinkMsg/0/data"));
        publish("AAAA0000", "nokia-mqtt-server", "qdq02mzl6kvs/coap-server/uplinkMsg");
    }
}

class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
    private String threadId;

    public PushCallback(String threadId) {
        this.threadId = threadId;
    }
    @Override
    public void connectionLost(Throwable cause) {
        //在斷開連接時調用 連接丟失后,一般在這里面進行重連
        logger.warn("connectionLost ... ; We will do something ...");
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //System.out.println("deliveryComplete---------" + token.isComplete());
        //接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用
        logger.warn("deliveryComplete ... ; We will do something ...");
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        System.out.println(threadId + " " + msg);
    }
}

 我們的訂閱端也已經收到這條pub的數據了。。。

 

    3、我們也可以在 dashboard來觀察到這兩個會話;

三、總結;

    1、共享訂閱          

              包含一個主題過濾器和訂閱選項,唯一的區別在於共享訂閱的主題過濾器格式必須是 $share/{ShareName}/{filter} 這種形式。這幾個的字段的含義分別是:

              $share 前綴表明這將是一個共享訂閱
              {ShareName} 是一個不包含 “/”, “+” 以及 “#” 的字符串。訂閱會話通過使用相同的 {ShareName} 表示共享同一個訂閱,匹配該訂閱的消息每次只會發布給其中一個會話
               {filter} 即非共享訂閱中的主題過濾器

                    

                共享訂閱使得訂閱端能夠負載均衡地消費消息,但 MQTT 協議並沒有規定 Server 應當使用什么負載均衡策略。作為參考,EMQ X 提供了 random, round_robin, sticky, hash 四種策略供用戶自行選擇。

    2、connetcLost方法 

              我們最好要在這里做一些事情,拋出異常或者打印日志,這樣你才能知道到底什么時候連接丟失了的

    3、 消息發布服務質量

           如何選擇QoS:
                 QoS 級別越高,流程越復雜,系統資源消耗越大。應用程序可以根據自己的網絡場景和業務需求,選擇合適的 QoS 級別,比如在同一個子網內部的服務間的消息交互往往選用 QoS 0;而通過互聯網的實時消息通信往往選用 QoS 1;QoS 2 使用的場景相對少一些,適合一些支付請求之類的要求較高的場景。

    4、CleanSession參數;若是共享訂閱模式下,需要將此字段配置為true,保證在訂閱端有一個掉線的情況下,可以清除掉session信息,這樣就不會收到訂閱的pub信息了,避免了數據丟失的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM