rocketmq最佳實踐


最佳實踐


1 生產者

1.1 發送消息注意事項

1 Tags的使用

一個應用盡可能用一個Topic,而消息子類型則可以用tags來標識。tags可以由應用自由設置,只有生產者在發送消息設置了tags,消費方在訂閱消息時才可以利用tags通過broker做消息過濾:message.setTags("TagA")。

2 Keys的使用

每個消息在業務層面的唯一標識碼要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic、key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。

   // 訂單Id   
   String orderId = "20034568923546";   
   message.setKeys(orderId);   

3 日志的打印

​消息發送成功或者失敗要打印消息日志,務必要打印SendResult和key字段。send消息方法只要不拋異常,就代表發送成功。發送成功會有多個狀態,在sendResult里定義。以下對每個狀態進行說明:

  • SEND_OK

消息發送成功。要注意的是消息發送成功也不意味着它是可靠的。要確保不會丟失任何消息,還應啟用同步Master服務器或同步刷盤,即SYNC_MASTER或SYNC_FLUSH。

  • FLUSH_DISK_TIMEOUT

消息發送成功但是服務器刷盤超時。此時消息已經進入服務器隊列(內存),只有服務器宕機,消息才會丟失。消息存儲配置參數中可以設置刷盤方式和同步刷盤時間長度,如果Broker服務器設置了刷盤方式為同步刷盤,即FlushDiskType=SYNC_FLUSH(默認為異步刷盤方式),當Broker服務器未在同步刷盤時間內(默認為5s)完成刷盤,則將返回該狀態——刷盤超時。

  • FLUSH_SLAVE_TIMEOUT

消息發送成功,但是服務器同步到Slave時超時。此時消息已經進入服務器隊列,只有服務器宕機,消息才會丟失。如果Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master即ASYNC_MASTER),並且從Broker服務器未在同步刷盤時間(默認為5秒)內完成與主服務器的同步,則將返回該狀態——數據同步到Slave服務器超時。

  • SLAVE_NOT_AVAILABLE

消息發送成功,但是此時Slave不可用。如果Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master服務器即ASYNC_MASTER),但沒有配置slave Broker服務器,則將返回該狀態——無Slave服務器可用。

1.2 消息發送失敗處理方式

Producer的send方法本身支持內部重試,重試邏輯如下:

  • 至多重試2次(同步發送為2次,異步發送為0次)。
  • 如果發送失敗,則輪轉到下一個Broker。這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s。
  • 如果本身向broker發送消息產生超時異常,就不會再重試。

以上策略也是在一定程度上保證了消息可以發送成功。如果業務對消息可靠性要求比較高,建議應用增加相應的重試邏輯:比如調用send同步方法發送失敗時,則嘗試將消息存儲到db,然后由后台線程定時重試,確保消息一定到達Broker。

上述db重試方式為什么沒有集成到MQ客戶端內部做,而是要求應用自己去完成,主要基於以下幾點考慮:首先,MQ的客戶端設計為無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡。其次,如果MQ客戶端內部集成一個KV存儲模塊,那么數據只有同步落盤才能較可靠,而同步落盤本身性能開銷較大,所以通常會采用異步落盤,又由於應用關閉過程不受MQ運維人員控制,可能經常會發生 kill -9 這樣暴力方式關閉,造成數據沒有及時落盤而丟失。第三,Producer所在機器的可靠性較低,一般為虛擬機,不適合存儲重要數據。綜上,建議重試過程交由應用來控制。

1.3選擇oneway形式發送

通常消息的發送是這樣一個過程:

  • 客戶端發送請求到服務器
  • 服務器處理請求
  • 服務器向客戶端返回應答

所以,一次消息發送的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求並不高,例如日志收集類應用,此類應用可以采用oneway形式調用,oneway形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個操作系統系統調用的開銷,即將數據寫入客戶端的socket緩沖區,此過程耗時通常在微秒級。

2 消費者

2.1 消費過程冪等

RocketMQ無法避免消息重復(Exactly-Once),所以如果業務對消費重復非常敏感,務必要在業務層面進行去重處理。可以借助關系數據庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內容中的唯一標識字段,例如訂單Id等。在消費之前判斷唯一鍵是否在關系數據庫中存在。如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)

msgId一定是全局唯一標識符,但是實際使用中,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發、因客戶端重投機制導致的重復等),這種情況就需要使業務字段進行重復消費。

2.2 消費速度慢的處理方式

1 提高消費並行度

絕大部分消息消費行為都屬於 IO 密集型,即可能是操作數據庫,或者調用 RPC,這類消費行為的消費速度在於后端數據庫或者外系統的吞吐量,通過增加消費並行度,可以提高總的消費吞吐量,但是並行度增加到一定程度,反而會下降。所以,應用必須要設置合理的並行度。 如下有幾種修改消費並行度的方法:

  • 同一個 ConsumerGroup 下,通過增加 Consumer 實例數量來提高並行度(需要注意的是超過訂閱隊列數的 Consumer 實例無效)。可以通過加機器,或者在已有機器啟動多個進程的方式。
  • 提高單個 Consumer 的消費並行線程,通過修改參數 consumeThreadMin、consumeThreadMax實現。

2 批量方式消費

某些業務流程如果支持批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設置 consumer的 consumeMessageBatchMaxSize 返個參數,默認是 1,即一次只消費一條消息,例如設置為 N,那么每次消費的消息數小於等於 N。

3 跳過非重要消息

發生消息堆積時,如果消費速度一直追不上發送速度,如果業務對數據要求不高的話,可以選擇丟棄不重要的消息。例如,當某個隊列的消息數堆積到100000條以上,則嘗試丟棄部分或全部消息,這樣就可以快速追上發送消息的速度。示例代碼如下:

    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
            // TODO 消息堆積情況的特殊處理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消費過程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }    

4 優化每條消息消費過程

舉例如下,某條消息的消費過程如下:

  • 根據消息從 DB 查詢【數據 1】
  • 根據消息從 DB 查詢【數據 2】
  • 復雜的業務計算
  • 向 DB 插入【數據 3】
  • 向 DB 插入【數據 4】

這條消息的消費過程中有4次與 DB的 交互,如果按照每次 5ms 計算,那么總共耗時 20ms,假設業務計算耗時 5ms,那么總過耗時 25ms,所以如果能把 4 次 DB 交互優化為 2 次,那么總耗時就可以優化到 15ms,即總體性能提高了 40%。所以應用如果對時延敏感的話,可以把DB部署在SSD硬盤,相比於SCSI磁盤,前者的RT會小很多。

2.3 消費打印日志

如果消息量較少,建議在消費入口方法打印消息,消費耗時等,方便后續排查問題。

   public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
        // TODO 正常消費過程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }   

如果能打印每條消息消費耗時,那么在排查消費慢等線上問題時,會更方便。

2.4 其他消費建議

1 關於消費者和訂閱

​第一件需要注意的事情是,不同的消費者組可以獨立的消費一些 topic,並且每個消費者組都有自己的消費偏移量,請確保同一組內的每個消費者訂閱信息保持一致。

2 關於有序消息

消費者將鎖定每個消息隊列,以確保他們被逐個消費,雖然這將會導致性能下降,但是當你關心消息順序的時候會很有用。我們不建議拋出異常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作為替代。

3 關於並發消費

顧名思義,消費者將並發消費這些消息,建議你使用它來獲得良好性能,我們不建議拋出異常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作為替代。

4 關於消費狀態Consume Status

對於並發的消費監聽器,你可以返回 RECONSUME_LATER 來通知消費者現在不能消費這條消息,並且希望可以稍后重新消費它。然后,你可以繼續消費其他消息。對於有序的消息監聽器,因為你關心它的順序,所以不能跳過消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。

5 關於Blocking

不建議阻塞監聽器,因為它會阻塞線程池,並最終可能會終止消費進程

6 關於線程數設置

消費者使用 ThreadPoolExecutor 在內部對消息進行消費,所以你可以通過設置 setConsumeThreadMin 或 setConsumeThreadMax 來改變它。

7 關於消費位點

當建立一個新的消費者組時,需要決定是否需要消費已經存在於 Broker 中的歷史消息CONSUME_FROM_LAST_OFFSET 將會忽略歷史消息,並消費之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 將會消費每個存在於 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 來消費在指定時間戳后產生的消息。

3 Broker

3.1 Broker 角色

​ Broker 角色分為 ASYNC_MASTER(異步主機)、SYNC_MASTER(同步主機)以及SLAVE(從機)。如果對消息的可靠性要求比較嚴格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果對消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是測試方便,則可以選擇僅ASYNC_MASTER或僅SYNC_MASTER的部署方式。

3.2 FlushDiskType

​ SYNC_FLUSH(同步刷新)相比於ASYNC_FLUSH(異步處理)會損失很多性能,但是也更可靠,所以需要根據實際的業務場景做好權衡。

3.3 Broker 配置

參數名 默認值 說明
listenPort 10911 接受客戶端連接的監聽端口
namesrvAddr null nameServer 地址
brokerIP1 網卡的 InetAddress 當前 broker 監聽的 IP
brokerIP2 跟 brokerIP1 一樣 存在主從 broker 時,如果在 broker 主節點上配置了 brokerIP2 屬性,broker 從節點會連接主節點配置的 brokerIP2 進行同步
brokerName null broker 的名稱
brokerClusterName DefaultCluster 本 broker 所屬的 Cluser 名稱
brokerId 0 broker id, 0 表示 master, 其他的正整數表示 slave
storePathCommitLog $HOME/store/commitlog/ 存儲 commit log 的路徑
storePathConsumerQueue $HOME/store/consumequeue/ 存儲 consume queue 的路徑
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) commit log 的映射文件大小
deleteWhen 04 在每天的什么時間刪除已經超過文件保留時間的 commit log
fileReservedTime 72 以小時計算的文件保留時間
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保證在收到確認生產者之前將消息刷盤。ASYNC_FLUSH 模式下的 broker 則利用刷盤一組消息的模式,可以取得更好的性能。

4 NameServer

​RocketMQ 中,Name Servers 被設計用來做簡單的路由管理。其職責包括:

  • Brokers 定期向每個名稱服務器注冊路由數據。
  • 名稱服務器為客戶端,包括生產者,消費者和命令行客戶端提供最新的路由信息。 ​

5 客戶端配置

​ 相對於RocketMQ的Broker集群,生產者和消費者都是客戶端。本小節主要描述生產者和消費者公共的行為配置。

5.1 客戶端尋址方式

RocketMQ可以令客戶端找到Name Server, 然后通過Name Server再找到Broker。如下所示有多種配置方式,優先級由高到低,高優先級會覆蓋低優先級。

  • 代碼中指定Name Server地址,多個namesrv地址之間用分號分割 
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  

consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Java啟動參數中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  
  • 環境變量指定Name Server地址
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   
  • HTTP靜態服務器尋址(默認)

客戶端啟動后,會定時訪問一個靜態HTTP服務器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個URL的返回內容如下:

192.168.0.1:9876;192.168.0.2:9876   

客戶端默認每隔2分鍾訪問一次這個HTTP服務器,並更新本地的Name Server地址。URL已經在代碼中硬編碼,可通過修改/etc/hosts文件來改變要訪問的服務器,例如在/etc/hosts增加如下配置:

10.232.22.67    jmenv.taobao.net   

推薦使用HTTP靜態服務器尋址方式,好處是客戶端部署簡單,且Name Server集群可以熱升級。

5.2 客戶端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都繼承於ClientConfig類,ClientConfig為客戶端的公共配置類。客戶端的配置都是get、set形式,每個參數都可以用spring來配置,也可以在代碼中配置,例如namesrvAddr這個參數可以這樣配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他參數同理。

1 客戶端的公共配置

參數名 默認值 說明
namesrvAddr   Name Server地址列表,多個NameServer地址用分號隔開
clientIP 本機IP 客戶端本機IP地址,某些機器會發生無法識別客戶端IP地址情況,需要應用在代碼中強制指定
instanceName DEFAULT 客戶端實例名稱,客戶端創建的多個Producer、Consumer實際是共用一個內部實例(這個實例包含網絡連接、線程資源等)
clientCallbackExecutorThreads 4 通信層異步回調線程數
pollNameServerInteval 30000 輪詢Name Server間隔時間,單位毫秒
heartbeatBrokerInterval 30000 向Broker發送心跳間隔時間,單位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消費進度間隔時間,單位毫秒

2 Producer配置

參數名 默認值 說明
producerGroup DEFAULT_PRODUCER Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組
createTopicKey TBW102 在發送消息時,自動創建服務器不存在的topic,需要指定Key,該Key可用於配置發送消息所在topic的默認路由。
defaultTopicQueueNums 4 在發送消息,自動創建服務器不存在的topic時,默認創建的隊列數
sendMsgTimeout 10000 發送消息超時時間,單位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節
retryAnotherBrokerWhenNotStoreOK FALSE 如果發送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發送
retryTimesWhenSendFailed 2 如果消息發送失敗,最大重試次數,該參數只對同步發送模式起作用
maxMessageSize 4MB 客戶端限制的消息大小,超過報錯,同時服務端也會限制,所以需要跟服務端配合使用。
transactionCheckListener   事務消息回查監聽器,如果發送事務消息,必須設置
checkThreadPoolMinSize 1 Broker回查Producer事務狀態時,線程池最小線程數
checkThreadPoolMaxSize 1 Broker回查Producer事務狀態時,線程池最大線程數
checkRequestHoldMax 2000 Broker回查Producer事務狀態時,Producer本地緩沖請求隊列大小
RPCHook null 該參數是在Producer創建時傳入的,包含消息發送前的預處理和消息響應后的處理兩個接口,用戶可以在第一個接口中做一些安全控制或者其他操作。

3 PushConsumer配置

參數名 默認值 說明
consumerGroup DEFAULT_CONSUMER Consumer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組
messageModel CLUSTERING 消費模型支持集群消費和廣播消費兩種
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer啟動后,默認從上次消費的位置開始消費,這包含兩種情況:一種是上次消費的位置未過期,則消費從上次中止的位置進行;一種是上次消費位置已經過期,則從當前隊列第一條消息開始消費
consumeTimestamp 半個小時前 只有當consumeFromWhere值為CONSUME_FROM_TIMESTAMP時才起作用。
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法實現策略
subscription   訂閱關系
messageListener   消息監聽器
offsetStore   消費進度存儲
consumeThreadMin 10 消費線程池最小線程數
consumeThreadMax 20 消費線程池最大線程數
consumeConcurrentlyMaxSpan 2000 單隊列並行消費允許的最大跨度
pullThresholdForQueue 1000 拉消息本地隊列緩存消息最大數
pullInterval 0 拉消息間隔,由於是長輪詢,所以為0,但是如果應用為了流控,也可以設置大於0的值,單位毫秒
consumeMessageBatchMaxSize 1 批量消費,一次消費多少條消息
pullBatchSize 32 批量拉消息,一次最多拉多少條

4 PullConsumer配置

參數名 默認值 說明
consumerGroup DEFAULT_CONSUMER Consumer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組
brokerSuspendMaxTimeMillis 20000 長輪詢,Consumer拉消息請求在Broker掛起最長時間,單位毫秒
consumerTimeoutMillisWhenSuspend 30000 長輪詢,Consumer拉消息請求在Broker掛起超過指定時間,客戶端認為超時,單位毫秒
consumerPullTimeoutMillis 10000 非長輪詢,拉消息超時時間,單位毫秒
messageModel BROADCASTING 消息支持兩種模式:集群消費和廣播消費
messageQueueListener   監聽隊列變化
offsetStore   消費進度存儲
registerTopics   注冊的topic集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法實現策略

5 Message數據結構

字段名 默認值 說明
Topic null 必填,消息所屬topic的名稱
Body null 必填,消息體
Tags null 選填,消息標簽,方便服務器過濾使用。目前只支持每個消息設置一個tag
Keys null 選填,代表這條消息的業務關鍵詞,服務器會根據keys創建哈希索引,設置后,可以在Console系統根據Topic、Keys來查詢消息,由於是哈希索引,請盡可能保證key唯一,例如訂單號,商品Id等。
Flag 0 選填,完全由應用來設置,RocketMQ不做干預
DelayTimeLevel 0 選填,消息延時級別,0表示不延時,大於0會延時特定的時間才會被消費
WaitStoreMsgOK TRUE 選填,表示消息是否在服務器落盤后才返回應答。

6 系統配置

本小節主要介紹系統(JVM/OS)相關的配置。

6.1 JVM選項

​ 推薦使用最新發布的JDK 1.8版本。通過設置相同的Xms和Xmx值來防止JVM調整堆大小以獲得更好的性能。簡單的JVM配置如下所示: ​
​ ​-server -Xms8g -Xmx8g -Xmn4g ​ ​ ​
如果您不關心RocketMQ Broker的啟動時間,還有一種更好的選擇,就是通過“預觸摸”Java堆以確保在JVM初始化期間每個頁面都將被分配。那些不關心啟動時間的人可以啟用它: ​ -XX:+AlwaysPreTouch
禁用偏置鎖定可能會減少JVM暫停, ​ -XX:-UseBiasedLocking
至於垃圾回收,建議使用帶JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m   
-XX:G1ReservePercent=25 
-XX:InitiatingHeapOccupancyPercent=30

​ 這些GC選項看起來有點激進,但事實證明它在我們的生產環境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值設置太小,否則JVM將使用一個小的年輕代來實現這個目標,這將導致非常頻繁的minor GC,所以建議使用rolling GC日志文件:

-XX:+UseGCLogFileRotation   
-XX:NumberOfGCLogFiles=5 
-XX:GCLogFileSize=30m

如果寫入GC文件會增加代理的延遲,可以考慮將GC日志文件重定向到內存文件系統:

-Xloggc:/dev/shm/mq_gc_%p.log123   

6.2 Linux內核參數

​ os.sh腳本在bin文件夾中列出了許多內核參數,可以進行微小的更改然后用於生產用途。下面的參數需要注意,更多細節請參考/proc/sys/vm/*的文檔

  • vm.extra_free_kbytes,告訴VM在后台回收(kswapd)啟動的閾值與直接回收(通過分配進程)的閾值之間保留額外的可用內存。RocketMQ使用此參數來避免內存分配中的長延遲。(與具體內核版本相關)
  • vm.min_free_kbytes,如果將其設置為低於1024KB,將會巧妙的將系統破壞,並且系統在高負載下容易出現死鎖。
  • vm.max_map_count,限制一個進程可能具有的最大內存映射區域數。RocketMQ將使用mmap加載CommitLog和ConsumeQueue,因此建議將為此參數設置較大的值。(agressiveness --> aggressiveness)
  • vm.swappiness,定義內核交換內存頁面的積極程度。較高的值會增加攻擊性,較低的值會減少交換量。建議將值設置為10來避免交換延遲。
  • File descriptor limits,RocketMQ需要為文件(CommitLog和ConsumeQueue)和網絡連接打開文件描述符。我們建議設置文件描述符的值為655350。
  • Disk scheduler,RocketMQ建議使用I/O截止時間調度器,它試圖為請求提供有保證的延遲。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM