這里,簡單記錄一下自己在最近項目中遇到的paho的心得,這里也涵蓋EMQX的問題。
1. cleanSession
這個標識,是確保client和server之間是否持久化狀態的一個標志,不管是client還是server重啟還是連接斷掉。下面是來自paho客戶端源碼的注釋。
Sets whether the client and server should remember state across restarts and reconnects.
- If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
- Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
- The server will treat a subscription as durable.
- If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
- Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
- The server will treat a subscription as non-durable
1)。 這個標志位,設置為true,那么,當連接斷掉,例如,調用EMQX的接口踢掉連接,此時,即便重連上了(無論是通過autoconnect設置為true,還是在connectonLost這個回調函數里面配置上重連的邏輯),MQTT客戶端程序都是無法進行重新訂閱數據的。這個行為,說明session里面保存了會話所采用的topic信息。
2)。這個標志位,設置為true,autoconnect設置為false,在connectLost這個回調函數里面,自行實現重新連接的邏輯,並且再次針對相同的topic和qos進行訂閱的話,當連接被踢掉,這個時候,會重新連接上,並且也會訂閱上數據,只是會出現很奇怪的現象,CPU占用率比連接斷開前提高很多。 我的應用(訂閱到數據后,對數據進行相應的邏輯處理,正常情況下,一條數據大概1~5ms處理完)壓測環境下,連接未斷前,1.3W的並發,CPU空閑率在40%左右,重連之后,CPU的空閑率只有10%左右,這個地方是個大坑,目前我還沒有搞清楚到底是什么原因導致,若有人遇到類似問題同仁,請給我留言,告知可能的原因。(我的paho是1.2.0版本,EMQX:V3.1.1)
3)。這個標志位,設置為false,autoconnect設置為false,在connectLost這個回調函數里面,自行實現重連的邏輯,但是不對topic進行重新訂閱,即便連接斷掉,重新連接上的話,依然會進行連接斷開之前的業務邏輯,訂閱到所需的數據,CPU的負荷也不會變大,基本和斷開之前的狀態持平。
下面配上connectLost這個回調函數(MqttCallback接口的一個方法)相關代碼:
public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println(">>>>>>>>>>>>>>>" + cause.getMessage()); System.out.println("連接斷開,可以做重連"); for (int i = 0; i < 3; i++) { if(reconnect()) { break; }else{ try { Thread.sleep(i * 2); } catch (InterruptedException e) { e.printStackTrace(); } } } } private boolean reconnect() { try { mqttClient.connect(mqttConnectOptions); Thread.sleep(100); if( mqttClient.isConnected() ) { //mqttClient.subscribe(this.topic, 0); return true; } } catch (MqttException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; }
2. EMQX的承壓能力
網上標榜EMQX單節點處理能力多么牛逼,100W連接毫無壓力,這個數值,其實呢,我覺得要仔細看測試場景,單單看連接數,其實沒有什么意義,要看生產者消費者都存在的情況下,還有數據流通這種場景,連接能力或者數據處理能力如何。 我不是說100W連接能力是虛構的,我是想說純粹的連接其實沒有多大的價值,因為EMQX是消息總線,只有連接,不存在數據流動,有多大意義呢?
還是接着我上面的應用壓測,我們團隊開發的一個規則引擎,1.6W的消息並發(4000設備,每個設備每秒4條消息,當然是程序模擬出來的),規則引擎4C16G的服務器2台,每台跑3個實例,共享訂閱兩個EMQX節點(EMQX是集群),EMQX服務器配置4C16G。結果跑不了多久時間(1個小時不到,有時半個小時),就會出現EMQX平凡踢掉消費者連接的情況。
2019-09-19 14:22:42.710 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52388 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:22:55.746 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60404 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:23:08.131 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52394 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:23:14.374 [error] 72a25aca01164c8c8b4cf48451c4e316@10.95.198.25:52456 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:23:41.686 [error] cd56963bfb4e4c0c8275abe9a24078de@10.95.198.26:60462 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:23:52.638 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60514 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:24:06.015 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52496 [Connection] Shutdown exceptionally due to message_queue_too_long 2019-09-19 14:24:13.541 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52474 [Connection] Shutdown exceptionally due to message_queue_too_long
針對這個問題,我咨詢過青雲的EMQ團隊的工程師,也在Github上咨詢過EMQX的維護者,都反饋說是消費者處理速度太慢,emq的消息隊列消息堆積導致。現象如此,怎么解決呢,似乎只能添加消費者服務,或者降低消息壓力,EMQX能否提升性能呢?我覺得EMQX現在共享訂閱的能力不行,就這4000個連接投遞消息,1.6W的並發,4000個topic,采用共享訂閱的方式,性能感覺不是很好,是我們程序設計的有問題,還是EMQX共享訂閱性能真的有待提升?為什么這么說能,我們測試過非共享訂閱,就是明確訂閱某個指定topic。非共享訂閱情況下,相同的服務器上,比共享訂閱性能好很多很多(差不多一半)。。。(歡迎探討)
從EMQX的配置中,針對上面這種消息隊列太長的問題,emqx.conf的配置文件中有相關信息,參考下面這個錯誤找到了相關的配置參數,EMQX的官方參數解釋或者支持真心跟不上,沒有國外開源組織社區營造的好,這個需要努力。
2019-09-19 14:22:30.362 [error] f2ac199b0314449d822e150c8d51de93 crasher: initial call: emqx_session:init/1 pid: <0.20141.1> registered_name: [] exception exit: killed in function emqx_session:handle_info/2 (src/emqx_session.erl, line 641) in call from gen_server:try_dispatch/4 (gen_server.erl, line 637) in call from gen_server:handle_msg/6 (gen_server.erl, line 711) ancestors: [emqx_session_sup,emqx_sm_sup,emqx_sup,<0.1386.0>] message_queue_len: 0 messages: [] links: [<0.1577.0>] dictionary: [{force_shutdown_policy, #{max_heap_size => 838860800,message_queue_len => 8000}}, {deliver_stats,676193}, {'$logger_metadata$', #{client_id => <<"f2ac199b0314449d822e150c8d51de93">>}}] trap_exit: true status: running heap_size: 6772 stack_size: 27 reductions: 69920965 neighbours:
再看看emqx.conf的配置文件中,和這個queue相關的配置:
## Max message queue length and total heap size to force shutdown ## connection/session process. ## Message queue here is the Erlang process mailbox, but not the number ## of queued MQTT messages of QoS 1 and 2. ## ## Numbers delimited by `|'. Zero or negative is to disable. ## ## Default: ## - 8000|800MB on ARCH_64 system ## - 1000|100MB on ARCH_32 sytem ## zone.external.force_shutdown_policy = 8000|800MB
有人會說,你可以將這里的消息數量調大點啊,沒錯,這個調一下是可以改善,但是不能根治問題,自己想想吧,大點最多也就是對消息速率波動的韌性加大了,但是不能解決持續生成高於所謂的消費慢這種情況下的問題。 EMQ方說辭其實,在我們的這個場景下,我是不那么認同的,為什么這么說呢, 我的規則引擎消費日志里面顯示,每條消息處理的時間並沒有變長,CPU的忙碌程度並沒有惡化, 添加共享訂閱實例變多,EMQX性能下降了,我覺得EMQX在共享訂閱變多的情況下,對消費者端投遞消息的速率或者效率下降了,但是呢,EMQX這個broker從消息生產者這邊接收消息的能力沒有改變,導致EMQX的消息隊列消息積壓,最終出現踢連接的policy得以執行。。。
還有一個問題,不知道細心的讀者有沒有發現,消費者這邊消息消費的好好的,消息積壓了,EMQX為何要把消費者的連接給踢掉呢,為何不是將生產者的連接踢掉呢?這個邏輯我覺得有點不是很好理解,本來消息就積壓了,是不是要加快消費才能緩解或者解除消息積壓的問題?讀者你們是如何理解的,也可以留言探討!
