rocketmq的broker如何同步信息的?


 

public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}


一個haservice下面有accpet和haclient分別對應客戶端和服務端,grouptranserservie用來控制消息是否獲取到,下面具體講。

 

拿haclient舉例子,在主線程做的事情

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);

                        boolean ok = this.processReadEvent();
                       
                    } else {
                        this.waitForRunning(1000 * 5);
                    }

也就是在rocketmq里面,一個具體的任務就是單獨分配一個線程,從而發揮多線程優勢,在主線程上面休眠等待喚醒或者超時喚醒然后執行io動作。

一個典型的基於bytebuffer的寫操作,通過positon、limit來判斷是否數據寫完:

private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);

            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }

            return !this.reportOffset.hasRemaining();
        }

  

haservice里面所有的io沒有走netty,全部使用原始select做異步io,然后直接使用nio的bytebuff做read和write操作

 

另外rocketmq里面的每個線程實現都有一個特別的標志位:

public abstract class ServiceThread implements Runnable {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    private static final long JOIN_TIME = 90 * 1000;

    protected final Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;

  

這個hasnotified和countdownlatch是配合一起使用的。如果一個線程被countdown過、喚醒過,那么hasnotified就通過cas被設置成true,下一個循環進入wait的時候,不用等待超時也不用等待下一次喚醒,直接通過hasnotified這個標志位可以直接喚醒,相當於第一次喚醒我的時候 我當時沒有在阻塞,那么第一次喚醒我的時候 先設置一個標示hasnotified,下次進入阻塞的時候可以直接走喚醒流程,不用等待。

 

 

 

下面具體講下每個模塊

HAclient:

干了兩個事情:

1 備broker去nameserv注冊的時候,可以從nameserv拿到master-broker的ha-address,拿到這個地址以后,通過haclient去連接master-broker。定期給主機broker上報自己的currentReportedOffset,也就是備機broker自己當前的commit-log在什么地方了

2 在channel上面嘗試讀取數據,這個就是主機broker發過來的具體數據提交到自己的commit-log里面。

也就是對於一個備機broker而言,發布自己的ack-offset和接收主機broker的實際數據都在ha-client一個線程完成的:

3 ha-client用到了雙緩沖reallocateByteBuffer,因為主機broker發過來的數據有可能備機broker的bytebuffer已經存不下了,只能存一半,這時候需要把已經落盤的數據從bytebuffer清理掉,然后寫了一半的bytebuffer從后半部分移動到前半部分,那么需要有一個第三者tmp做swap,bytebufferbackup就是這個tmp,大小跟bytebufferread一樣,防止極端情況:

 private void reallocateByteBuffer() {
            int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
            if (remain > 0) {
                this.byteBufferRead.position(this.dispatchPostion);

                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                this.byteBufferBackup.put(this.byteBufferRead);
            }

            this.swapByteBuffer();

            this.byteBufferRead.position(remain);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
            this.dispatchPostion = 0;
        }

        private void swapByteBuffer() {
            ByteBuffer tmp = this.byteBufferRead;
            this.byteBufferRead = this.byteBufferBackup;
            this.byteBufferBackup = tmp;
        }

  

 

 

 

 
        

AcceptSocketService:

干一件事:綁定端口以后,作為accpet,在主循環的select里面,監聽accpet事件,如果有客戶端連接進來,那么生成一個haconnection。

所以看得出來,只有主機broker才有這個accpetsocketservice和haconnection。下面具體說下HaConnection

 

public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }

 socketchannel是服務端accept后拿到的、跟客戶端通信的channel。

   SO_LINGER選項,使用默認的

   最主要的是有兩個線程,一個是writesocketService一個是readSocketService

 

 

   writesocketservice:當主機寫入commit-log以后offset肯定會長,但是備機傳過來的ack-offset沒有增長。通過這種方式主機知道此時需要把什么數據傳給備機。

   這個線程沒有雙緩沖,也沒有swap-bytebuffer,全部數據通過網絡io寫出去即可,不涉及磁盤io。這個線程平時不需要工作,只有在有新數據的時候才需要工作,啥時候被喚醒的呢?

   是在service.getWaitNotifyObject().wakeupAll()業務線程進行喚醒的

 

   readSocketService: 專門接收備機發過來的ack-offset的,收到新的ack以后,通過HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset)喚醒GroupTransferService,后者專門處理消息是否真的已經被接收。

 

  GroupTransferService:

  雙緩沖邏輯,在主循環的waitforEnding結束后的onWaitEnd中,執行swapRequests,把requestsWrite和requestsRead互換,因為這個線程在處理的時候需要用synchronnized鎖整個requestsRead,別人無法put了,所以弄一個requestsWrite出來,其他線程可以在這個里面put,跟自己的線程鎖住的requestsRead不沖突。

  

  這個模塊本質上就是一個thread,干兩個事情:

  1 在waitfoRunning中等待

  2 等待超時或者被喚醒的話,那么針對requestRead里面所有request,push2SlaveMaxOffset(這個就是備機的ack-offset)大於request的offset的話,那么說明備機當前已經有這個數據了,那么wakeupCustomer把在request上的CountdownLatch去掉,並且把GroupCommitRequest的flushOK=ture。

如果備機的ack-offset比GroupCommitRequest小的話,那么循環5次等待,阻塞在notifyTransferObject,嘗試等待5次看看備機的ack-offset 也就是push2SlaveMaxOffset能不能追上來,從而也讓這個GroupCommitRequest的flushOK=true。

private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

  

 

 

 

那么誰在request上的countdownlatch等待呢?flushok啥含義

如果是同步master的話,兩個地方:handleHA和handleDiskFlush

以前者舉例子,handleHA里面:

 

 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

  

主機在sendmessage以后,執行service.putRequest

1 會在GroupTransferService的requestWrite里面放入新的request,讓GroupTransferService去檢查這個request是否已經被備機同步了

2 對GroupTransferService做waitpoint.countDown,讓GroupTransferService干活必須先要喚醒他。

public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

  

GroupTransferService每次被喚醒的時候,首先把requestWrite放入到requestRead里面,然后檢查request的offset和備機ackoffset是否ok。

service.getWaitNotifyObject().wakeupAll(); 這個是喚醒writeSockeService,新的數據來了,那么寫線程需要工作了。

4 在每個request上面做waitforFlush,也就是在request上面countdown等待,然后檢查flushok。

 

所以主機上面新數據來了以后,業務線程喚醒writeSocketService去發數據給備機broker(writeSocketService通過檢查commit-log的offset感知主機數據offset增長了),然后喚醒grouptransferservice去檢查每個request是否已經ok。

 

grouptransferservice的工作沒有放在writeSocketService,而是單獨一個線程來做,還是利用多核並發處理。

grouptransferservice即使被業務線程在putRequest中waitPoint.countDown();被喚醒還會被this.notifyTransferObject.waitForRunning(1000)阻塞,因為被業務線程喚醒也不能表示立馬可以更新request和ack-offset的關系,比較備機新的ack-offset還沒來,所以還需要readSocketService在拿到新的ackoffset以后,通過this.groupTransferService.notifyTransferSome();進一步喚醒groupTransferService,此時才能真正更新request是狀態flushok狀態

private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM