RocketMQ 主從同步


RocketMQ 的主和從一直在使用 nio 進行數據同步:

master

master 監聽端口
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept

master 建立連接
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run

master 讀取 slave 上報的 maxOffset
org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run

master 傳輸數據給 slave
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run

slave

slave 連接 master
org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster

slave 定時報告 maxOffset 給 master
org.apache.rocketmq.store.ha.HAService.HAClient#run

slave 接收 master 傳輸來的數據
org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent

這里的同步,暫時只涉及到 commitLog。

同步雙寫的本質,master 先寫磁盤,然后等待 slave 同步消息成功。

寫磁盤:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

同步到 slave:

// org.apache.rocketmq.store.CommitLog#handleHA
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // SYNC_MASTER 則執行邏輯
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            // slave 沒有落后 master 太多
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                // 創建 GroupCommitRequest 放入 GroupTransferService 的 requestsWrite 中
                // GroupTransferService.run 會一直比較 GroupCommitRequest#nextOffset 和 slave 已提交的位移
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                // 等待 5 秒,檢查 slave 的同步結果
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM