4.8.2.Broker異步刷盤
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
// Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } }
異步刷盤根據是否開啟transientStorePoolEnable機制,刷盤實現會有細微差別。如果transientStorePoolEnable 為true , RocketMQ 會單獨申請一個與目標物理文件( commitlog)同樣大小的堆外內存, 該堆外內存將使用內存鎖定,確保不會被置換到虛擬內存中去,消息首先追加到堆外內存,然后提交到與物理文件的內存映射內存中,再flush 到磁盤。如果transientStorePoolEnable 為flalse ,消息直接追加到與物理文件直接映射的內存中,然后刷寫到磁盤中。transientStorePoolEnable 為true 的磁盤刷寫流程如圖4-20 所示。
- 首先將消息直接追加到ByteBuffer (堆外內存DirectByteBuffer ), wrotePosition 隨着消息的不斷追加向后移動。
- CommitRealTimeService 線程默認每200ms 將ByteBuffer 新追加的內容( wrotePosihon減去commitedPosition )的數據提交到Ma ppedB yte Buff1巳r 中。
- MappedByt巳Buffer 在內存中追加提交的內容, wrotePosition 指針向前后移動, 然后返回。
- commit 操作成功返回,將commitedPosition 向前后移動本次提交的內容長度,此時wrotePosition 指針依然可以向前推進。
- FlushRealTimeService 線程默認每500ms 將MappedByteBuffer 中新追加的內存( wrotePosition 減去上一次刷寫位置flushedPositiont )通過調用MappedByteBuffer#force()方法將數據刷寫到磁盤。

image
CommitRealTimeService 提交線程工作機制
@Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.