MQTT研究之EMQ:【eclipse的paho之java客戶端使用注意事項】


這里,簡單記錄一下自己在最近項目中遇到的paho的心得,這里也涵蓋EMQX的問題。

 

1. cleanSession

這個標識,是確保client和server之間是否持久化狀態的一個標志,不管是client還是server重啟還是連接斷掉。下面是來自paho客戶端源碼的注釋。

Sets whether the client and server should remember state across restarts and reconnects.
  • If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
    • Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
    • The server will treat a subscription as durable.
  • If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
    • Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
    • The server will treat a subscription as non-durable

1)。 這個標志位,設置為true,那么,當連接斷掉,例如,調用EMQX的接口踢掉連接,此時,即便重連上了(無論是通過autoconnect設置為true,還是在connectonLost這個回調函數里面配置上重連的邏輯),MQTT客戶端程序都是無法進行重新訂閱數據的。這個行為,說明session里面保存了會話所采用的topic信息。

2)。這個標志位,設置為true,autoconnect設置為false,在connectLost這個回調函數里面,自行實現重新連接的邏輯,並且再次針對相同的topic和qos進行訂閱的話,當連接被踢掉,這個時候,會重新連接上,並且也會訂閱上數據,只是會出現很奇怪的現象,CPU占用率比連接斷開前提高很多。 我的應用(訂閱到數據后,對數據進行相應的邏輯處理,正常情況下,一條數據大概1~5ms處理完)壓測環境下,連接未斷前,1.3W的並發,CPU空閑率在40%左右,重連之后,CPU的空閑率只有10%左右,這個地方是個大坑,目前我還沒有搞清楚到底是什么原因導致,若有人遇到類似問題同仁,請給我留言,告知可能的原因。(我的paho是1.2.0版本,EMQX:V3.1.1)

3)。這個標志位,設置為false,autoconnect設置為false,在connectLost這個回調函數里面,自行實現重連的邏輯,但是不對topic進行重新訂閱,即便連接斷掉,重新連接上的話,依然會進行連接斷開之前的業務邏輯,訂閱到所需的數據,CPU的負荷也不會變大,基本和斷開之前的狀態持平。

 

下面配上connectLost這個回調函數(MqttCallback接口的一個方法)相關代碼:

public void connectionLost(Throwable cause) {
        // 連接丟失后,一般在這里面進行重連
        System.out.println(">>>>>>>>>>>>>>>" + cause.getMessage());
        System.out.println("連接斷開,可以做重連");
        for (int i = 0; i < 3; i++) {
            if(reconnect()) {
                break;
            }else{
                try {
                    Thread.sleep(i * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private boolean reconnect() {
        try {
            mqttClient.connect(mqttConnectOptions);
            Thread.sleep(100);
            if( mqttClient.isConnected() ) {
                //mqttClient.subscribe(this.topic, 0);
                return true;
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  false;
    }

 

2.  EMQX的承壓能力

網上標榜EMQX單節點處理能力多么牛逼,100W連接毫無壓力,這個數值,其實呢,我覺得要仔細看測試場景,單單看連接數,其實沒有什么意義,要看生產者消費者都存在的情況下,還有數據流通這種場景,連接能力或者數據處理能力如何。 我不是說100W連接能力是虛構的,我是想說純粹的連接其實沒有多大的價值,因為EMQX是消息總線,只有連接,不存在數據流動,有多大意義呢?

還是接着我上面的應用壓測,我們團隊開發的一個規則引擎,1.6W的消息並發(4000設備,每個設備每秒4條消息,當然是程序模擬出來的),規則引擎4C16G的服務器2台,每台跑3個實例,共享訂閱兩個EMQX節點(EMQX是集群),EMQX服務器配置4C16G。結果跑不了多久時間(1個小時不到,有時半個小時),就會出現EMQX平凡踢掉消費者連接的情況。

2019-09-19 14:22:42.710 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52388 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:22:55.746 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60404 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:08.131 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52394 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:14.374 [error] 72a25aca01164c8c8b4cf48451c4e316@10.95.198.25:52456 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:41.686 [error] cd56963bfb4e4c0c8275abe9a24078de@10.95.198.26:60462 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:52.638 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60514 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:06.015 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52496 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:13.541 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52474 [Connection] Shutdown exceptionally due to message_queue_too_long

針對這個問題,我咨詢過青雲的EMQ團隊的工程師,也在Github上咨詢過EMQX的維護者,都反饋說是消費者處理速度太慢,emq的消息隊列消息堆積導致。現象如此,怎么解決呢,似乎只能添加消費者服務,或者降低消息壓力,EMQX能否提升性能呢?我覺得EMQX現在共享訂閱的能力不行,就這4000個連接投遞消息,1.6W的並發,4000個topic,采用共享訂閱的方式,性能感覺不是很好,是我們程序設計的有問題,還是EMQX共享訂閱性能真的有待提升?為什么這么說能,我們測試過非共享訂閱,就是明確訂閱某個指定topic。非共享訂閱情況下,相同的服務器上,比共享訂閱性能好很多很多(差不多一半)。。。(歡迎探討)

 

從EMQX的配置中,針對上面這種消息隊列太長的問題,emqx.conf的配置文件中有相關信息,參考下面這個錯誤找到了相關的配置參數,EMQX的官方參數解釋或者支持真心跟不上,沒有國外開源組織社區營造的好,這個需要努力。

2019-09-19 14:22:30.362 [error] f2ac199b0314449d822e150c8d51de93 crasher:
    initial call: emqx_session:init/1
    pid: <0.20141.1>
    registered_name: []
    exception exit: killed
      in function  emqx_session:handle_info/2 (src/emqx_session.erl, line 641)
      in call from gen_server:try_dispatch/4 (gen_server.erl, line 637)
      in call from gen_server:handle_msg/6 (gen_server.erl, line 711)
    ancestors: [emqx_session_sup,emqx_sm_sup,emqx_sup,<0.1386.0>]
    message_queue_len: 0
    messages: []
    links: [<0.1577.0>]
    dictionary: [{force_shutdown_policy,
                      #{max_heap_size => 838860800,message_queue_len => 8000}},
                  {deliver_stats,676193},
                  {'$logger_metadata$',
                      #{client_id => <<"f2ac199b0314449d822e150c8d51de93">>}}]
    trap_exit: true
    status: running
    heap_size: 6772
    stack_size: 27
    reductions: 69920965
  neighbours:

再看看emqx.conf的配置文件中,和這個queue相關的配置:

## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
## of queued MQTT messages of QoS 1 and 2.
##
## Numbers delimited by `|'. Zero or negative is to disable.
##
## Default:
##   - 8000|800MB on ARCH_64 system
##   - 1000|100MB on ARCH_32 sytem
## zone.external.force_shutdown_policy = 8000|800MB

有人會說,你可以將這里的消息數量調大點啊,沒錯,這個調一下是可以改善,但是不能根治問題,自己想想吧,大點最多也就是對消息速率波動的韌性加大了,但是不能解決持續生成高於所謂的消費慢這種情況下的問題。 EMQ方說辭其實,在我們的這個場景下,我是不那么認同的,為什么這么說呢, 我的規則引擎消費日志里面顯示,每條消息處理的時間並沒有變長,CPU的忙碌程度並沒有惡化, 添加共享訂閱實例變多,EMQX性能下降了,我覺得EMQX在共享訂閱變多的情況下,對消費者端投遞消息的速率或者效率下降了,但是呢,EMQX這個broker從消息生產者這邊接收消息的能力沒有改變,導致EMQX的消息隊列消息積壓,最終出現踢連接的policy得以執行。。。

 

還有一個問題,不知道細心的讀者有沒有發現,消費者這邊消息消費的好好的,消息積壓了,EMQX為何要把消費者的連接給踢掉呢,為何不是將生產者的連接踢掉呢?這個邏輯我覺得有點不是很好理解,本來消息就積壓了,是不是要加快消費才能緩解或者解除消息積壓的問題?讀者你們是如何理解的,也可以留言探討!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM