RocketMQ吐血總結


RocketMQ吐血總結

架構

 

概念模型

最基本的概念模型與擴展后段概念模型 

存儲模型

 

RocketMQ吐血總結

User Guide

  • RocketMQ是一款分布式消息中間件,最初是由阿里巴巴消息中間件團隊研發並大規模應用於生產系統,滿足線上海量消息堆積的需求, 在2016年底捐贈給Apache開源基金會成為孵化項目,經過不到一年時間正式成為了Apache頂級項目;早期阿里曾經基於ActiveMQ研發消息系統, 隨着業務消息的規模增大,瓶頸逐漸顯現,后來也考慮過Kafka,但因為在低延遲和高可靠性方面沒有選擇,最后才自主研發了RocketMQ, 各方面的性能都比目前已有的消息隊列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也經常被拿來對比;RocketMQ默認采用長輪詢的拉模式, 單機支持千萬級別的消息堆積,可以非常好的應用在海量消息系統中。
  • NameServer可以部署多個,相互之間獨立,其他角色同時向多個NameServer機器上報狀態信息,從而達到熱備份的目的。 NameServer本身是無狀態的,也就是說NameServer中的Broker、Topic等狀態信息不會持久存儲,都是由各個角色定時上報並 存儲到內存中的(NameServer支持配置參數的持久化,一般用不到)。
  • 為何不用ZooKeeper?ZooKeeper的功能很強大,包括自動Master選舉等,RocketMQ的架構設計決定了它不需要進行Master選舉, 用不到這些復雜的功能,只需要一個輕量級的元數據服務器就足夠了。值得注意的是,NameServer並沒有提供類似Zookeeper的watcher機制, 而是采用了每30s心跳機制。
  • 心跳機制
    • 單個Broker跟所有Namesrv保持心跳請求,心跳間隔為30秒,心跳請求中包括當前Broker所有的Topic信息。Namesrv會反查Broer的心跳信息, 如果某個Broker在2分鍾之內都沒有心跳,則認為該Broker下線,調整Topic跟Broker的對應關系。但此時Namesrv不會主動通知Producer、Consumer有Broker宕機。
    • Consumer跟Broker是長連接,會每隔30秒發心跳信息到Broker。Broker端每10秒檢查一次當前存活的Consumer,若發現某個Consumer 2分鍾內沒有心跳, 就斷開與該Consumer的連接,並且向該消費組的其他實例發送通知,觸發該消費者集群的負載均衡(rebalance)。
    • 生產者每30秒從Namesrv獲取Topic跟Broker的映射關系,更新到本地內存中。再跟Topic涉及的所有Broker建立長連接,每隔30秒發一次心跳。 在Broker端也會每10秒掃描一次當前注冊的Producer,如果發現某個Producer超過2分鍾都沒有發心跳,則斷開連接。
  • Namesrv壓力不會太大,平時主要開銷是在維持心跳和提供Topic-Broker的關系數據。但有一點需要注意,Broker向Namesrv發心跳時, 會帶上當前自己所負責的所有Topic信息,如果Topic個數太多(萬級別),會導致一次心跳中,就Topic的數據就幾十M,網絡情況差的話, 網絡傳輸失敗,心跳失敗,導致Namesrv誤認為Broker心跳失敗。
  • 每個主題可設置隊列個數,自動創建主題時默認4個,需要順序消費的消息發往同一隊列,比如同一訂單號相關的幾條需要順序消費的消息發往同一隊列, 順序消費的特點的是,不會有兩個消費者共同消費任一隊列,且當消費者數量小於隊列數時,消費者會消費多個隊列。至於消息重復,在消 費端處理。RocketMQ 4.3+支持事務消息,可用於分布式事務場景(最終一致性)。
  • 關於queueNums:
    • 客戶端自動創建,Math.min算法決定最多只會創建8個(BrokerConfig)隊列,若要超過8個,可通過控制台創建/修改,Topic配置保存在store/config/topics.json
    • 消費負載均衡的最小粒度是隊列,Consumer的數量應不大於隊列數
    • 讀寫隊列數(writeQueueNums/readQueueNums)是RocketMQ特有的概念,可通過console修改。當readQueueNums不等於writeQueueNums時,會有什么影響呢?

   
   
  
  
          
  1. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
  2. if (topicRouteData != null) {
  3. for (QueueData data : topicRouteData.getQueueDatas()) {
  4. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  5. data.setReadQueueNums(queueNums);
  6. data.setWriteQueueNums(queueNums);
  7. }
  8. }
  • Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上。Producer的發送機制保證消息盡量平均分布到 所有隊列中,最終效果就是所有消息都平均落在每個Broker上。
  • RocketMQ的消息的存儲是由ConsumeQueue和CommitLog配合來完成的,ConsumeQueue中只存儲很少的數據,消息主體都是通過CommitLog來進行讀寫。 如果某個消息只在CommitLog中有數據,而ConsumeQueue中沒有,則消費者無法消費,RocketMQ的事務消息實現就利用了這一點。
    • CommitLog:是消息主體以及元數據的存儲主體,對CommitLog建立一個ConsumeQueue,每個ConsumeQueue對應一個(概念模型中的)MessageQueue,所以只要有 CommitLog在,ConsumeQueue即使數據丟失,仍然可以恢復出來。
    • ConsumeQueue:是一個消息的邏輯隊列,存儲了這個Queue在CommitLog中的起始offset,log大小和MessageTag的hashCode。每個Topic下的每個Queue都有一個對應的 ConsumeQueue文件,例如Topic中有三個隊列,每個隊列中的消息索引都會有一個編號,編號從0開始,往上遞增。並由此一個位點offset的概念,有了這個概念,就可以對 Consumer端的消費情況進行隊列定義。
  • RocketMQ的高性能在於順序寫盤(CommitLog)、零拷貝和跳躍讀(盡量命中PageCache),高可靠性在於刷盤和Master/Slave,另外NameServer 全部掛掉不影響已經運行的Broker,Producer,Consumer。
  • 發送消息負載均衡,且發送消息線程安全(可滿足多個實例死循環發消息),集群消費模式下消費者端負載均衡,這些特性加上上述的高性能讀寫, 共同造就了RocketMQ的高並發讀寫能力。
  • 刷盤和主從同步均為異步(默認)時,broker進程掛掉(例如重啟),消息依然不會丟失,因為broker shutdown時會執行persist。 當物理機器宕機時,才有消息丟失的風險。另外,master掛掉后,消費者從slave消費消息,但slave不能寫消息。
  • RocketMQ具有很好動態伸縮能力(非順序消息),伸縮性體現在Topic和Broker兩個維度。
    • Topic維度:假如一個Topic的消息量特別大,但集群水位壓力還是很低,就可以擴大該Topic的隊列數,Topic的隊列數跟發送、消費速度成正比。
    • Broker維度:如果集群水位很高了,需要擴容,直接加機器部署Broker就可以。Broker起來后向Namesrv注冊,Producer、Consumer通過Namesrv 發現新Broker,立即跟該Broker直連,收發消息。
  • Producer: 失敗默認重試2次;sync/async;ProducerGroup,在事務消息機制中,如果發送消息的producer在還未commit/rollback前掛掉了,broker會在一段時間后回查ProducerGroup里的其他實例,確認消息應該commit/rollback
  • Consumer: DefaultPushConsumer/DefaultPullConsumer,push也是用pull實現的,采用的是長輪詢方式;CLUSTERING模式下,一條消息只會被ConsumerGroup里的一個實例消費,但可以被多個不同的ConsumerGroup消費,BROADCASTING模式下,一條消息會被ConsumerGroup里的所有實例消費。
  • DefaultPushConsumer: Broker收到新消息請求后,如果隊列里沒有新消息,並不急於返回,通過一個循環不斷查看狀態,每次waitForRunning一段時間(5s),然后在check。當一直沒有新消息,第三次check時,等待時間超過suspendMaxTimeMills(15s),就返回空結果。在等待的過程中,Broker收到了新的消息后會直接調用notifyMessageArriving返回請求結果。“長輪詢”的核心是,Broker端Hold住(掛起)客戶端客戶端過來的請求一小段時間,在這個時間內有新消息到達,就利用現有的連接立刻返回消息給Consumer。“長輪詢”的主動權還是掌握在Consumer手中,Broker即使有大量消息積壓,也不會主動推送給Consumer。長輪詢方式的局限性,是在Hold住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數可控的場景中。
  • DefaultPullConsumer: 需要用戶自己處理遍歷MessageQueue、保存Offset,所以PullConsumer有更多的自主性和靈活性。
  • 對於集群模式的非順序消息,消費失敗默認重試16次,延遲等級為3~18。(messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h")
  • MQClientInstance是客戶端各種類型的Consumer和Producer的底層類,由它與NameServer和Broker打交道。如果創建Consumer或Producer 類型的時候不手動指定instanceName,進程中只會有一個MQClientInstance對象,即當一個Java程序需要連接多個MQ集群時,必須手動指定不同的instanceName。需要一提的是,當消費者(不同jvm實例)都在同一台物理機上時,若指定instanceName,消費負載均衡將失效(每個實例都將消費所有消息)。另外,在一個jvm里模擬集群消費時,必須指定不同的instanceName,否則啟動時會提示ConsumerGroup已存在。

More

原文總結:https://blog.csdn.net/javahongxi/article/details/84931747


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM