RocketMQ詳解(三)啟動運行原理


專題目錄

RocketMQ詳解(一)原理概覽

RocketMQ詳解(二)安裝使用詳解

RocketMQ詳解(三)啟動運行原理

RocketMQ詳解(四)核心設計原理

RocketMQ詳解(五)總結提高

 

引子

明白一個項目啟動時做了什么,更有利於理解整體運行原理。本節我們從rocketmq啟動流程,來探究運行原理。總體流程如下:

如上圖所示,rocketMQ啟動順序:nameserver->broker->producer->consumer。下面我們從4個模塊細化啟動流程。

一、nameserver啟動原理

 

1111

如上圖所示,nameServer名字服務作為rocketMQ的基於Topic路由的注冊中心,支持Broker的動態注冊與發現,nameServer啟動流程如下:

  1. 加載kv配置。
  2. 構建NettyRemotingServer初始化netty通信實例。
  3. 初始化遠程通信執行器。
  4. 注冊請求執行器。
  5. 定時輪訓清除宕機的broker。
  6. 注冊SSL證書變化監聽器。
  7. 啟動遠程通信服務端(netty)。
  8. 啟動SSL證書監聽服務。

其中,1-6步是初始化過程,7-8是啟動流程。

 

二、broker啟動原理

broker作為消息代理服務器,用於接收並存儲消息,等待被消費者拉取消費。broker啟動分兩大部分:

  • 初始化:構造createBrokerController-》初始化controller.initialize()
  • 啟動:controller.start()

如下圖:

2.1 broker初始化流程

  1. 載入各種配置文件(topic配置、消費偏移量、訂閱群組、消費過濾)。
  2. 初始化DefaultMessageStore消息存儲器,並load:載入CommitLog、ConsumeQueue、載入索引文件。
  3. 構造NettyRemotingServer netty遠程通信服務器(普通+vip)、9種執行器(sendMessageExecutor、pullMessageExecutor、replyMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、heartbeatExecutor、endTransactionExecutor、consumerManageExecutor)
  4. registerProcessor():注冊處理器(使用上面9個執行器,分別注冊到普通遠程通信服務器、VIP遠程通信服務器)具體通信機制見 RocketMQ詳解(三)核心設計原理
  5. 初始化定時任務:BrokerStats當天數據統計、每5S持久化一次消費Offset、每10s持久化一次消費者filter、每3分鍾執行一次broker保護(默認不執行)、每1s打印水位日志(線程池待處理隊列)、每1分鍾打印commitLog分發滯后bytes、每2分鍾(請求http://jmenv.tbsite.net:8080/rocketmq/nsaddr)拉取一次nameserver地址、主節點每1分鍾(開啟DLeger主從切換)打印從滯后主offset。
  6. 注冊SSL證書變化監聽器。
  7. 初始化事務相關服務、監聽器。
  8. 初始化AccessValidator
  9. 初始化RPC請求鈎子RPCHook(遠程通信服務端+VIP遠程通信服務端),包含doBeforeRequest請求前、doAfterResponse請求后2個方法。

 

2.2 broker啟動流程

  1. messageStore啟動消息存儲服務:把生產者發送的消息commitLog轉存為consumerQueue邏輯消費隊列+indexFile索引文件,分別用於消費者消費消息+根據key查詢消息。
  2. remotingServer.start(): 啟動遠程通信服務端(netty)
  3. fastRemotingServer.start():啟動VIP遠程通信服務端(netty)
  4. 注冊SSL證書變化監聽器。
  5. brokerOuterAPI.start():remotingClient.start()啟動OuterAPI專用遠程通信客戶端
  6. pullRequestHoldService.start():開啟請求長輪訓服務,默認LongPolling=5S,5S后,執行消息寫入。
  7. clientHousekeepingService.start():啟動netty客戶端探活服務(生產者、消費者、過濾器),定時10s執行一次,超時2分鍾,移除channel。
  8. filterServerManager.start():啟動過濾服務。
  9. 定時任務:每30s向nameserver注冊所有broker。
  10. brokerFastFailure.start():啟動快速失敗,每10ms,清除過期請求。( sendThreadPoolQueue、pullThreadPoolQueue、heartbeatThreadPoolQueue、endTransactionThreadPoolQueue)
     

三、producer啟動原理

如上圖,生產者作為消息發送方,producer啟動流程如下:

  1. checkConfig()校驗配置
  2. MQClientInstance創建客戶端實例。
  3. registerProducer向broker注冊生產者
  4. MQClientInstance.start()啟動客戶端實例
  5. 服務狀態更新為RUNNING。
其中第四步, MQClientInstance 客戶端實例啟動流程如下:
  1. 獲取nameserver地址。
  2. remotingClient.start():啟動netty遠程客戶端。
  3. startScheduledTask():開啟定時任務
  4. pullMessageService.start() :開啟守護線程,執行拉取消息
  5. rebalanceService.start():啟動負載均衡器
  6. DefaultMQProducerImpl().start():啟動消息生產者
  7. 客戶端實例狀態更新為RUNNING
其中第三步, 定時任務包含:
  • 1)延遲10ms后,獲取每2分鍾更新一次nameserver地址
  • 2)延遲10ms后,每30s去nameserver獲取最新topic路由
  • 3)延遲1s后,每30s移除一次下線的broker+給所有broker發送心跳(版本號)
  • 4)延遲10s后,每5s持久化“消費offset”到broker/本地。
  • 5)延遲1ms后,每1分鍾,根據待消息數,動態調整消費者線程池核心數。(被閹割,目前是空)
 
 

四、consumer啟動原理

消費者啟動流程,見第三節的圖。

消費者啟動流程如下:

  1. checkConfig():校驗配置
  2. copySubscription:復制訂閱信息
  3. MQClientInstance:創建客戶端實例
  4. rebalanceImpl:創建消費端負載均衡
  5. pullAPIWrapper:創建pull請求包裝器
  6. offsetStore初始化偏移量:廣播模式:client存儲;集群模式:broker存儲
  7. consumeMessageService.start():初始化消費消息服務,並啟動。1.並發消費:延遲15m后,每過15分鍾清理一次超時未處理完消息 2.順序消息:延遲1s后,每20s鎖一次broker中mq隊列+消費者端ProcessQueue
  8. registerConsumer:向broker注冊消費者。
  9. MQClientInstance.start():啟動客戶端實例
  10. 服務狀態更新為:RUNNING

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM