RocketMQ 的主和從一直在使用 nio 進行數據同步:
master
master 監聽端口
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept
master 建立連接
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
master 讀取 slave 上報的 maxOffset
org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
master 傳輸數據給 slave
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run
slave
slave 連接 master
org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster
slave 定時報告 maxOffset 給 master
org.apache.rocketmq.store.ha.HAService.HAClient#run
slave 接收 master 傳輸來的數據
org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent
這里的同步,暫時只涉及到 commitLog。
同步雙寫的本質,master 先寫磁盤,然后等待 slave 同步消息成功。
寫磁盤:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
同步到 slave:
// org.apache.rocketmq.store.CommitLog#handleHA public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // SYNC_MASTER 則執行邏輯 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait // slave 沒有落后 master 太多 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); // 創建 GroupCommitRequest 放入 GroupTransferService 的 requestsWrite 中 // GroupTransferService.run 會一直比較 GroupCommitRequest#nextOffset 和 slave 已提交的位移 service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); // 等待 5 秒,檢查 slave 的同步結果 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }