RocketMQ:(7) 主從同步(HA)機制


一、RocketMQ主從復制原理

  為了提高消息消費的高可用性,避免Broker發生單點故障引起存儲在Broker上的消息無法及時消費,RocketMQ引入了Broker主備機制,即消息消費到達主服務器后需要將消息同步到消息從服務器,如果主服務器Broker宕機后,消息消費者可以從從服務器拉取消息。

HAService 整體工作機制

RocketMQ HA 的實現原理如下 。
1 )主服務器啟動,並在特定端口上監聽從服務器的連接。
2 )從服務器主動連接主服務器,主服務器接收客戶端的連接,並建立相關TCP連接。
3 )從服務器主動向主服務器發送待拉取消息偏移量,主服務器解析請求並返回消息給從服務器 。
4 )從服務器保存消息並繼續發送新的消息同步請求 。

RocketMQ HA 7個核心類實現

1 ) HAService: RocketMQ 主從同步核心實現類 。
2 ) HAService$AcceptSocketService:HA Master 端監昕客戶端連接實現類。實現Master 端監聽Slave連接。
3 ) HAService$GroupTransferService:主從同步通知實現類。

  GroupTransferService的職責是負責當主從同步復制結束后通知由於等待 HA 同步結果而阻塞的消息發送者線程。判斷主從同步是否完成的依據是 Slave 中已成功復制的最大偏移量是否大於等於消息生產者發送消息后消息服務端返回下一條消息的起始偏移量,如果是則表示主從同步復制已經完成,喚醒消息發送線程,否則等待 1s 再次判斷,每一個任務在一批任務中循環判斷 5 次。消息發送者返回有兩種情況:等待超過5s或GroupTransferService通知主從復制完成 。

4 ) HAService$HAClient: HAClient是主從同步Slave端的核心實現類。

  Step1:Slave 服務器連接 Master 服務器。如果 socketChannel 為空, 則嘗試連接Master,建立到Master的TCP連接。在Broker啟動時,如果 Broker角色為SLAVE時將讀取Broker配置文件中的haMasterAddress屬性並更新 HAClient 的 masterAddrees,如果角色為 SLAVE 並且 haMasterAddress 為空,啟動並不會報錯,但不會執行主從同步復制,該方法最終返回是否成功連接上Master。
  Step2:判斷是否需要向Master反饋當前待拉取偏移量,Master與Slave的 HA 心跳發送間隔默認為5S。
  Step3:向 Master 服務器反饋拉取偏移量 。這里有兩重意義,對於 Slave 端來說,是發送下次待拉取消息偏移量,而對於 Master 服務端來說,既可以認為是 Slave 本次請求拉取的消息偏移量,也可以理解為 Slave 的消息同步 ACK 確認消息。
  Step4:進行事件選擇,其執行間隔為 1s 。
  Step5:處理網絡讀請求,即處理從 Master 服務器傳回的消息數據。

5 ) HAConnection:HA Master 服務端 HA 連接對象的封裝,與 Broker 從服務器的網絡讀寫實現類 。Master 服務器在收到從服務器的連接請求后,會將主從服務器的連接 SocketChannel 封裝成 HAConnection 對象,實現主服務器與從服務器的讀寫操作 。

6 ) HAConnection$ReadSocketService:HA Master 網絡讀實現類 。
7 ) HAConnection$WriteSocketServicce:HA Master 網絡寫實現類 。

 

二、RocketMQ 讀寫分離機制

  RocketMQ 根據 MessageQueue查找 Broker 地址的唯一依據是 brokerName,從 RocketMQ 的 Broker 組織結構中得知同一組 Broker ( M-S )服務器,它們的 brokerName 相同但 brokerId 不同,主服務器的 brokerId 為 0,從服務器的 brokerId 大於 0。
  消息消費拉取線程PullMessageService根據PullRequest請求從主服務器拉取消息后會返回下一次建議拉取的brokerId,消息消費者線程在收到消息后,會根據主服務器的建議拉取brokerId來更新pullFromWhichNodeTable,pullFromWhichNodeTable緩存表中存儲該消息隊列的brokerId。

消息服務端是根據何種規則來建議哪個消息消費隊列該從哪台 Broker 服務器上拉取消息呢?

1 ) maxOffsetPy:代表當前主服務器消息存儲文件最大偏移量。
2 ) maxPhyOffsetPulling:此次拉取消息最大偏移量 。
3 ) diff:對於 PullMessageService 線程來說,當前未被拉取到消息消費端的消息長度。
4 ) TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ 所在服務器總內存大小 。accessMessagelnMemoryMaxRatio 表示 RocketMQ 所能使用的最大內存比例,超過該內存,消息將被置換出內存;memory 表示 RocketMQ 消息常駐內存的大小,超過該大小,RocketMQ會將舊的消息置換回磁盤。
5 )如果 diff 大於 memory,表示當前需要拉取的消息已經超出了常駐內存的大小,表示主服務器繁忙,此時才建議從從服務器拉取。
  如果主服務器繁忙則建議下一次從從服務器拉取消息,如果一個 Master 擁有多台Slave服務器,參與消息拉取負載的從服務器只會是其中一個。

 

三、總結

1)RocketMQ 的 HA 機制,其核心實現是從服務器在啟動的時候主動向主服務器建立 TCP長連接,然后獲取服務器的 commitlog 最大偏移量,以此偏移量向主服務器主動拉取消息,主服務器根據偏移量,與自身 commitlog 文件的最大偏移量進行比較,如果大於從服務器的 commitlog 偏移量,主服務器將向從服務器返回一定數量的消息,該過程循環進行,達到主從服務器數據同步。

2)RocketMQ 讀寫分離與其他中間件的實現方式完全不同,RocketMQ 是消費者首先向主服務器發起拉取消息請求,然后主服務器返回一批消息,然后會根據主服務器負載壓力與主從同步情況,向消息消費者建議下次消息拉取是從主服務器還是從從服務器拉取。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM