本文基於rocketmq4.0版本,結合CommitlLog的刷盤過程,對消息隊列的刷盤過程源碼進行分析,進而對RocketMQ的刷盤原理和過程進行了解。
rocketmq 4.0版本中刷盤類型和以前的版本一樣有兩種:
public enum FlushDiskType { // 同步刷盤 SYNC_FLUSH, // 異步刷盤 ASYNC_FLUSH }
刷盤方式有三種:
線程服務 | 場景 | 寫消息性能 |
CommitRealTimeService | 異步刷盤 && 開啟內存字節緩沖區 | 第一 |
FlushRealTimeService | 異步刷盤 | 第二 |
GroupCommitService | 同步刷盤 | 第三 |
其中CommitRealTimeService是老一些版本中沒有的,它為開啟內存字節緩存的刷盤服務。
介紹各個線程工作之前,先需要重點了解一下waitForRunning方法,因為在三個刷盤服務線程中都頻繁使用該方法:
protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } }
這里要注意一下
waitPoint 這個共享變量,它是CountDownLatch2類型,,沒有細看CountDownLatch2的原理,猜測它和CountDownLatch類似,根據CountDownLatch的使用原理,大致可以猜測waitPoint的作用。
回顧一下CountDownLatch相關知識:
CountDownLatch能夠使一個線程等待其他線程完成各自的工作后再執行自己的任務,CountDownLatch是通過一個計數器來實現的,計數器的初始值為需要等待的線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。
因此,可以猜測waitForRunning的業務邏輯大致為:
(1). 通過閉鎖沒執行依次waitPoint.countDown(),當計數器值到達0時,結束阻塞狀態,開始執行等待線程的任務;
(2). 等待一定時間之后,結束阻塞狀態,開始執行等待線程的任務。
在rocketmq4.0版本中,調用了waitPoint.countDown()的地方有三處:
shutdown() stop() wakeup()
這里我們關心的是wakeup()方法,調用wakeup方法的幾處如下

其中與commitLog刷盤相關的有:
service.wakeup()、flushCommitLogService.wakeup()、commitLogService.wakeup(),其中service.wakeup()的service是GroupCommitService類型。
由此引入了本文所要講述的FlushRealTimeService、CommitRealTimeService以及GroupCommitService三個線程刷盤服務。
GroupCommitService
broker啟動后,會啟動許多服務線程,包括刷盤服務線程,如果刷盤服務線程類型是SYNC_FLUSH (同步刷盤類型:對寫入的數據同步刷盤,只在broker==master時使用),則開啟GroupCommitService服務,該服務線程啟動后每隔10毫秒或該線程調用了wakeup()方法后停止阻塞,執行doCommit()方法。doCommit里面執行具體的刷盤邏輯業務。GroupCommitService服務線程體如下:
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }
當broker類型為master時,每寫入一條消息成功寫入mapedFile文件后,調用handleDiskFlush方法,如果該消息滿足messageExt.isWaitStoreMsgOK(),則將這一條成功寫入的消息生成GroupCommitRequest對象,將該對像放入GroupCommitService的requestsWrite列表中(List<GroupCommitRequest>),等待刷盤線程調用doCommit,對列表中的消息進行刷盤,doCommit中每對一個request處理完成后,會調用wakeupCustomer。等待時間5s后或者request的countDownLatch記數為0時,則將這條消息是否已經刷盤成功進行匯報,如果沒有刷盤成功,則再日志中記錄錯誤,並將putMessageResult設置為FLUSH_DISK_TIMEOUT。代碼如下:
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } }
這條消息是否已經刷盤成功進行匯報的邏輯 -- waitForFlush方法:
public static class GroupCommitRequest { private final long nextOffset; private final CountDownLatch countDownLatch = new CountDownLatch(1); private volatile boolean flushOK = false; public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } public long getNextOffset() { return nextOffset; } public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } public boolean waitForFlush(long timeout) { try { // 阻塞當前工作線程,等待時間5s后或者countDownLatch記數為0時,停止阻塞,執行下一條語句 this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { log.error("Interrupted", e); return false; } } }
對GroupCommitRequest類中的兩個方法的說明:
在waitForFlush方法阻塞的時候,doCommit方法對寫入requestsWrite列表中(List<GroupCommitRequest>)所有GroupCommitRequest對象依次進行了 wakeupCustomer方法調用,wakeupCustomer調用后,countDownLatch 閉鎖記數減一,等待時間5s后或者countDownLatch記數為0時,返回調用wakeupCustomer的GroupCommitRequest對應的消息的刷盤結果。
GroupCommitService的doCommit方法:
說明一下:分析doCommit方法之前,先提及一下swapRequests這個方法,之前提過,GroupCommitService服務線程該每隔10毫秒或調用了該線程的wakeup()方法后執行doCommit()方法,具體地要涉及到waitForRunning方法,waitForRunning方法中onWaitEnd的作用在這里就可以提及一下了,它的作用就是將requestsWrite 轉換為requestsRead ,這個與消息存儲過程中處理dispatchRequest是類似的。
private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
doCommit代碼:
private void doCommit() { // synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
其中,具體的刷盤核心代碼:CommitLog.this.mappedFileQueue.flush(0); 接下來看其他兩個刷盤服務線程,對CommitLog.this.mappedFileQueue.flush(0)下文將具體講解。
FlushRealTimeService
// 刷新策略(默認是實時刷盤)
flushCommitLogTimed
// 刷盤時間間隔(默認0.5s)
interval = flushIntervalCommitLog
// 每次刷盤至少需要多少個page(默認是4個)
flushPhysicQueueLeastPages
// 徹底刷盤間隔時間(默認10s)
flushPhysicQueueThoroughInterval
大致邏輯:
-- 如果 當前時間 >(最后一次刷盤時間 + 徹底刷盤間隔時間(10s)),則將最新一次刷盤時間更新為當前時間
-- 如果是實時刷盤,每隔一定時間間隔,該線程休眠500毫秒
如果不是實時刷盤,則調用waitForRunning,即每隔500毫秒或該刷盤服務線程調用了wakeup()方法之后結束阻塞。
-- 調用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
CommitRealTimeService
CommitRealTimeService 比較特殊,它會包含提交和異步刷盤邏輯,專門為開啟內存字節緩沖區的刷盤服務。
// 提交到FileChannel的時間間隔,只在TransientStorePool 打開的情況下使用,默認0.2s
interva l= commitIntervalCommitLog
//每次提交到File至少需要多少個page(默認是4個)
commitDataLeastPages = commitCommitLogLeastPages
/ 提交完成間隔時間(默認0.2s)
commitDataThoroughInterval
大致邏輯:
如果 當前時間 >(最后一次提交時間 + 提交完成間隔時間),更新lastCommitTimestamp之后,執行提交的核心邏輯:
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
如果result == false 則意味着有新的數據 committed,此時需要wakeup刷盤線程,即:
flushCommitLogService.wakeup(); 進行異步刷盤處理。
可知道,刷盤的下一層核心邏輯:
mappedFileQueue.flush
mappedFileQueue.commit
flush
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
commit
public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.committedWhere; this.committedWhere = where; } return result; }
從上代碼可以看出,刷盤過程與MappedFile有很大關系,通過findMappedFileByOffset方法找到要刷盤的MappedFile,然后MappedFile中采用數據刷盤技術將數據刷入到磁盤
MappedFile的刷盤方式有兩種:
1. 寫入內存字節緩沖區(writeBuffer) ----> 從內存字節緩沖區(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)flush到磁盤
2.寫入映射文件字節緩沖區(mappedByteBuffer) ----> 映射文件字節緩沖區(mappedByteBuffer)flush
(MappedFile的刷盤方式待具體分析,待補充...)