rocketmq刷盤過程


 本文基於rocketmq4.0版本,結合CommitlLog的刷盤過程,對消息隊列的刷盤過程源碼進行分析,進而對RocketMQ的刷盤原理和過程進行了解。
 
rocketmq 4.0版本中刷盤類型和以前的版本一樣有兩種:
public enum FlushDiskType {
    // 同步刷盤
    SYNC_FLUSH,
    // 異步刷盤    
    ASYNC_FLUSH
}
 
刷盤方式有三種:
線程服務 場景 寫消息性能
CommitRealTimeService 異步刷盤 && 開啟內存字節緩沖區 第一
FlushRealTimeService 異步刷盤  第二
GroupCommitService 同步刷盤 第三

其中CommitRealTimeService是老一些版本中沒有的,它為開啟內存字節緩存的刷盤服務。

 

介紹各個線程工作之前,先需要重點了解一下waitForRunning方法,因為在三個刷盤服務線程中都頻繁使用該方法:

protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }
        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
這里要注意一下  waitPoint 這個共享變量,它是CountDownLatch2類型,,沒有細看CountDownLatch2的原理,猜測它和CountDownLatch類似,根據CountDownLatch的使用原理,大致可以猜測waitPoint的作用。
 
回顧一下CountDownLatch相關知識:
CountDownLatch能夠使一個線程等待其他線程完成各自的工作后再執行自己的任務,CountDownLatch是通過一個計數器來實現的,計數器的初始值為需要等待的線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。
 
因此,可以猜測waitForRunning的業務邏輯大致為:
(1). 通過閉鎖沒執行依次waitPoint.countDown(),當計數器值到達0時,結束阻塞狀態,開始執行等待線程的任務;
(2). 等待一定時間之后,結束阻塞狀態,開始執行等待線程的任務。
 
在rocketmq4.0版本中,調用了waitPoint.countDown()的地方有三處:
shutdown()
stop()
wakeup()

 這里我們關心的是wakeup()方法,調用wakeup方法的幾處如下

其中與commitLog刷盤相關的有:
service.wakeup()、flushCommitLogService.wakeup()、commitLogService.wakeup(),其中service.wakeup()的service是GroupCommitService類型。
 
由此引入了本文所要講述的FlushRealTimeService、CommitRealTimeService以及GroupCommitService三個線程刷盤服務。
 
 
GroupCommitService
broker啟動后,會啟動許多服務線程,包括刷盤服務線程,如果刷盤服務線程類型是SYNC_FLUSH (同步刷盤類型:對寫入的數據同步刷盤,只在broker==master時使用),則開啟GroupCommitService服務,該服務線程啟動后每隔10毫秒或該線程調用了wakeup()方法后停止阻塞,執行doCommit()方法。doCommit里面執行具體的刷盤邏輯業務。GroupCommitService服務線程體如下:
 
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }
 
當broker類型為master時,每寫入一條消息成功寫入mapedFile文件后,調用handleDiskFlush方法,如果該消息滿足messageExt.isWaitStoreMsgOK(),則將這一條成功寫入的消息生成GroupCommitRequest對象,將該對像放入GroupCommitService的requestsWrite列表中(List<GroupCommitRequest>),等待刷盤線程調用doCommit,對列表中的消息進行刷盤,doCommit中每對一個request處理完成后,會調用wakeupCustomer。等待時間5s后或者request的countDownLatch記數為0時,則將這條消息是否已經刷盤成功進行匯報,如果沒有刷盤成功,則再日志中記錄錯誤,並將putMessageResult設置為FLUSH_DISK_TIMEOUT。代碼如下:
 
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
}

這條消息是否已經刷盤成功進行匯報的邏輯 -- waitForFlush方法:

public static class GroupCommitRequest {
        private final long nextOffset;
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile boolean flushOK = false;

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }

        public long getNextOffset() {
            return nextOffset;
        }

        public void wakeupCustomer(final boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }

        public boolean waitForFlush(long timeout) {
            try {
 // 阻塞當前工作線程,等待時間5s后或者countDownLatch記數為0時,停止阻塞,執行下一條語句
                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return this.flushOK;
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
                return false;
            }
        }
}
對GroupCommitRequest類中的兩個方法的說明:
在waitForFlush方法阻塞的時候,doCommit方法對寫入requestsWrite列表中(List<GroupCommitRequest>)所有GroupCommitRequest對象依次進行了 wakeupCustomer方法調用,wakeupCustomer調用后,countDownLatch 閉鎖記數減一,等待時間5s后或者countDownLatch記數為0時,返回調用wakeupCustomer的GroupCommitRequest對應的消息的刷盤結果。
 
GroupCommitService的doCommit方法
說明一下:分析doCommit方法之前,先提及一下swapRequests這個方法,之前提過,GroupCommitService服務線程該每隔10毫秒或調用了該線程的wakeup()方法后執行doCommit()方法,具體地要涉及到waitForRunning方法,waitForRunning方法中onWaitEnd的作用在這里就可以提及一下了,它的作用就是將requestsWrite 轉換為requestsRead ,這個與消息存儲過程中處理dispatchRequest是類似的。
 private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
}
 doCommit代碼:
private void doCommit() {
    //
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }

                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
}
 
其中,具體的刷盤核心代碼:CommitLog.this.mappedFileQueue.flush(0); 接下來看其他兩個刷盤服務線程,對CommitLog.this.mappedFileQueue.flush(0)下文將具體講解。
 
 
 
FlushRealTimeService
// 刷新策略(默認是實時刷盤)
flushCommitLogTimed
// 刷盤時間間隔(默認0.5s)
interval = flushIntervalCommitLog
// 每次刷盤至少需要多少個page(默認是4個)
flushPhysicQueueLeastPages
// 徹底刷盤間隔時間(默認10s)
flushPhysicQueueThoroughInterval
 
大致邏輯:
-- 如果 當前時間 >(最后一次刷盤時間 + 徹底刷盤間隔時間(10s)),則將最新一次刷盤時間更新為當前時間
 
-- 如果是實時刷盤,每隔一定時間間隔,該線程休眠500毫秒
如果不是實時刷盤,則調用waitForRunning,即每隔500毫秒或該刷盤服務線程調用了wakeup()方法之后結束阻塞。
 
-- 調用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
 
 
CommitRealTimeService
CommitRealTimeService 比較特殊,它會包含提交和異步刷盤邏輯,專門為開啟內存字節緩沖區的刷盤服務。
// 提交到FileChannel的時間間隔,只在TransientStorePool 打開的情況下使用,默認0.2s
interva l= commitIntervalCommitLog
//每次提交到File至少需要多少個page(默認是4個)
commitDataLeastPages = commitCommitLogLeastPages
/ 提交完成間隔時間(默認0.2s)
commitDataThoroughInterval
 
大致邏輯:
如果 當前時間 >(最后一次提交時間 + 提交完成間隔時間),更新lastCommitTimestamp之后,執行提交的核心邏輯:
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
如果result == false 則意味着有新的數據 committed,此時需要wakeup刷盤線程,即:
flushCommitLogService.wakeup(); 進行異步刷盤處理。
 
可知道,刷盤的下一層核心邏輯:
mappedFileQueue.flush
mappedFileQueue.commit
 
  flush
 public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

  commit

public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }
 
從上代碼可以看出,刷盤過程與MappedFile有很大關系,通過findMappedFileByOffset方法找到要刷盤的MappedFile,然后MappedFile中采用數據刷盤技術將數據刷入到磁盤

MappedFile的刷盤方式有兩種:
1. 寫入內存字節緩沖區(writeBuffer) ----> 從內存字節緩沖區(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)flush到磁盤
2.寫入映射文件字節緩沖區(mappedByteBuffer) ----> 映射文件字節緩沖區(mappedByteBuffer)flush
 
(MappedFile的刷盤方式待具體分析,待補充...)
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM