Broker 主從同步機制
一、主從同步概述
Broker 有2種角色:
-
- Master:主要用於處理生產者、消費者的請求和存儲數據。
- Slave:從 Master 同步所有數據到本地。具體體現在以下2個方面:
-
- Broker 服務高可用。一般生產環境會部署兩個主Broker節點和兩個從Broker(2m2s),一個 Master 宕機后,另一個 Master 可以接管工作;如果兩個 Master 都宕機,消費者可以通過連接 Slave 繼續消費。這樣可以保證服務的高可用。
- 提高服務性能。如果消費者從 Master Broker 拉取消息時,發現拉取消息的 offset 和 CommitLog 的物理 offset 相差太多,會轉向 Slave 拉取消息,這樣可以減輕 Master 的壓力,從而提高性能。
Broker 同步數據的方式有2種:
-
- 同步復制:指客戶端發送消息到 Master,Master 將消息同步復制到 Slave 的過程,可以通過設置參數 brokerRole=BrokerRole.SYNC_MASTER 來實現。這種消息配置的可靠性很強,但是效率比較低,適用於金融、在線教育等對消息有強可靠需求的場景。
- 異步復制:指客戶端發送消息到 Master,再由異步線程 HAService 異步同步到 Slave的過程,可以通過設置參數 brokerRole=BrokerRole.ASYNC_MASTER 來實現。這種消息配置的效率非常高,可靠性比同步復制差,適用於大部分業務場景。
Broker 主從同步數據有兩種:
-
- 配置數據:包含 Topic 配置、消息者位點信息、延遲消息位點信息、訂閱關系配置等。
- 消息數據:生產者發送的消息,保存在 CommitLog中,由 HAService 服務實時同步到 Slave Broker 中。所有實現類都在 org.apache.rocketmq.store.ha 包下。
Broker 主從同步的邏輯是通過 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\slave\SlaveSynchronize.syncAll() 方法實現的,該方法在 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.initialize()方法中初始化,每 60s 同步一次,並且同步周期不能修改。
二、主從同步流程
2.1 名詞解釋
2.2 配置數據同步流程
每種配置數據由一個繼承自 ConfigManager 的類來管理,配置數據包含4種類型:
-
- Topic 配置
- 消息位點
- 延遲位點
- 訂閱關系配置
Slave 如何從 Master 同步這些配置呢?先來看一下初始化服務的步驟:
第一步:Master Broker 在啟動時,初始化一個 BrokerOuterAPI,這個服務的功能包含 Broker 注冊到 Namesrv、Broker 從 Namesrv 解綁、獲取 Topic 配置信息、獲取消費位點信息、獲取延遲位點信息及訂閱關系等。
第二步:Slave Broker 在初始化 Controller 的定時任務時,會初始化 SlaveSynchronize 服務,每 60s 調用一次 SlaveSynchronize.syncAll() 方法。
第三步:syncAll() 方法依次調用 4 種配置數據(Topic配置、消費者位點、延遲位點、訂閱關系配置)的同步方法同步全量數據。
第四步:syncAll()中執行的4個方法都通過 Remoting 模塊同步調用 BrokerOuterAPI,並從 Master Broker 獲取數據,保存到 Slave 中。
第五步:Topic 配置和訂閱關系配置隨着保存內存信息的同事持久化到磁盤上;消費者位點通過 BrokerController 初始化定時任務持久化到磁盤上;延遲位點信息通過 ScheduleMessageService 定時將內存持久化到磁盤上,如下圖:
2.3 CommitLog 數據同步流程
CommitLog 的數據同步分為2種:
- 同步復制:生產者生產消息后,等待 Master Broker 將數據同步到 Slave Broker 后,再返回生產者數據存儲狀態。
- 異步復制:生產者生產消息后,不用等待 Master Broker 將數據同步到 Slave Broker,直接返回 Master 存儲結果。
詳解:
(1)同步復制:在CommitLog 消息存儲到 Page Cache 后,調用 CommitLog.handleHA() 方法處理同步復制,代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java,具體代碼如下:
1 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { #當前Master Broker 需要同步將消息“發送”到Slave. 3 HAService service = this.defaultMessageStore.getHaService(); 4 if (messageExt.isWaitStoreMsgOK()) { 5 // Determine whether to wait 6 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 7 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 8 service.putRequest(request); 9 service.getWaitNotifyObject().wakeupAll(); 10 PutMessageStatus replicaStatus = null; 11 try { 12 replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 13 TimeUnit.MILLISECONDS); 14 } catch (InterruptedException | ExecutionException | TimeoutException e) { 15 } 16 if (replicaStatus != PutMessageStatus.PUT_OK) { 17 log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " 18 + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); 19 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 20 } 21 } 22 // Slave problem 23 else { 24 // Tell the producer, slave not available 25 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 26 } 27 } 28 } 29 }
當 BrokerRole 配置為 SYNC_MASTER 時,表示當前 Master Broker 需要同步將消息 "發送" 到 Slave。根據 Master Broker CommitLog 的存儲結果構造一個 GroupCommitRequest 放入 HAService 中,再將 GroupCommitRequest 放入 GroupTransferService 服務中,等待 GroupTransferService 同步成功的鎖。如果同步成功那么 GroupCommitRequest 中的鎖會被喚醒,並設置 flushOK 為 True,表示生產者發送的消息被 Master Broker 和 Slave Broker 同時保存。
一個 Master Broker 可以配置多個 Slave Broker,當需要同步數據時,通過 service.getWaitNotifyObject().wakeupAll() 來喚醒全部的 Slave 同步。雖然多個 Slave 都同步了數據,但是一旦 Master Broker 不可用時,消費者只會從一個 Slave 中拉取消息,所以生產環境建議 SLave 不要配置太多。
Slave 在發送請求數據的 Request 時,會帶上 Slave 請求的位點 HAConnection.slaveRequestOffset,該值如果等於 -1(默認),則表示沒有 Slave 請求過位點數據。
ReadSocketService 后台服務不斷接受 Slave Broker 上報的 offset,每上報一次都通知 HAService.notifyTransferSome() 方法,判斷 Slave 同步的位點是否大於 Master 標記的已同步位點。如果大於則更新標記值,同時通知同步復制服務 GroupTransferService。GroupTransferService 掃描所有的同步請求,依次判斷哪些 GroupCommitRequest 的待同步復制的位點是比已同步位點小的,釋放 GroupCommitRequest 中的鎖,消息處理線程可以將消息存儲成功過的結果返回給生產者。
消息隊列文件(Consume Queue)和索引文件(Index File)為什么沒有同步給 Slave 呢?因為這兩個文件都可以在 Slave Broker 上追加 CommitLog 后由 ReputMessageService 進行創建,所以不需要同步。
(2)異步復制:Master Broker 啟動時會啟動 HAService.AcceptSocketService 服務,當監聽來自 Slave 的注冊請求時會創建一個 HAConnection,同時 HAConnection 會創建 ReadSocketService 和 WrieteSocketServcie 兩個服務並啟動,開始主從數據同步。
ReadSocketService 接收 Slave 同步數據請求,並將這些信息保存在 HAConnection 中。WriteSocketService 根據 HAConnection 中保存的 Slave 同步請求,從 CommitLog 中查詢數據,並發送給 Slave。