本文基於rocketmq4.0版本,結合CommitlLog的刷盤過程,對消息隊列的刷盤過程源碼進行分析,進而對RocketMQ的刷盤原理和過程進行了解。
rocketmq 4.0版本中刷盤類型和以前的版本一樣有兩種:
public enum FlushDiskType {
// 同步刷盤
SYNC_FLUSH,
// 異步刷盤
ASYNC_FLUSH
}
刷盤方式有三種:
| 線程服務 | 場景 | 寫消息性能 |
| CommitRealTimeService | 異步刷盤 && 開啟內存字節緩沖區 | 第一 |
| FlushRealTimeService | 異步刷盤 | 第二 |
| GroupCommitService | 同步刷盤 | 第三 |
其中CommitRealTimeService是老一些版本中沒有的,它為開啟內存字節緩存的刷盤服務。
介紹各個線程工作之前,先需要重點了解一下waitForRunning方法,因為在三個刷盤服務線程中都頻繁使用該方法:
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
這里要注意一下
waitPoint 這個共享變量,它是CountDownLatch2類型,,沒有細看CountDownLatch2的原理,猜測它和CountDownLatch類似,根據CountDownLatch的使用原理,大致可以猜測waitPoint的作用。
回顧一下CountDownLatch相關知識:
CountDownLatch能夠使一個線程等待其他線程完成各自的工作后再執行自己的任務,CountDownLatch是通過一個計數器來實現的,計數器的初始值為需要等待的線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。
因此,可以猜測waitForRunning的業務邏輯大致為:
(1). 通過閉鎖沒執行依次waitPoint.countDown(),當計數器值到達0時,結束阻塞狀態,開始執行等待線程的任務;
(2). 等待一定時間之后,結束阻塞狀態,開始執行等待線程的任務。
在rocketmq4.0版本中,調用了waitPoint.countDown()的地方有三處:
shutdown() stop() wakeup()
這里我們關心的是wakeup()方法,調用wakeup方法的幾處如下
其中與commitLog刷盤相關的有:
service.wakeup()、flushCommitLogService.wakeup()、commitLogService.wakeup(),其中service.wakeup()的service是GroupCommitService類型。
由此引入了本文所要講述的FlushRealTimeService、CommitRealTimeService以及GroupCommitService三個線程刷盤服務。
GroupCommitService
broker啟動后,會啟動許多服務線程,包括刷盤服務線程,如果刷盤服務線程類型是SYNC_FLUSH (同步刷盤類型:對寫入的數據同步刷盤,只在broker==master時使用),則開啟GroupCommitService服務,該服務線程啟動后每隔10毫秒或該線程調用了wakeup()方法后停止阻塞,執行doCommit()方法。doCommit里面執行具體的刷盤邏輯業務。GroupCommitService服務線程體如下:
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
當broker類型為master時,每寫入一條消息成功寫入mapedFile文件后,調用handleDiskFlush方法,如果該消息滿足messageExt.isWaitStoreMsgOK(),則將這一條成功寫入的消息生成GroupCommitRequest對象,將該對像放入GroupCommitService的requestsWrite列表中(List<GroupCommitRequest>),等待刷盤線程調用doCommit,對列表中的消息進行刷盤,doCommit中每對一個request處理完成后,會調用wakeupCustomer。等待時間5s后或者request的countDownLatch記數為0時,則將這條消息是否已經刷盤成功進行匯報,如果沒有刷盤成功,則再日志中記錄錯誤,並將putMessageResult設置為FLUSH_DISK_TIMEOUT。代碼如下:
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
這條消息是否已經刷盤成功進行匯報的邏輯 -- waitForFlush方法:
public static class GroupCommitRequest {
private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false;
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
public boolean waitForFlush(long timeout) {
try {
// 阻塞當前工作線程,等待時間5s后或者countDownLatch記數為0時,停止阻塞,執行下一條語句
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
log.error("Interrupted", e);
return false;
}
}
}
對GroupCommitRequest類中的兩個方法的說明:
在waitForFlush方法阻塞的時候,doCommit方法對寫入requestsWrite列表中(List<GroupCommitRequest>)所有GroupCommitRequest對象依次進行了 wakeupCustomer方法調用,wakeupCustomer調用后,countDownLatch 閉鎖記數減一,等待時間5s后或者countDownLatch記數為0時,返回調用wakeupCustomer的GroupCommitRequest對應的消息的刷盤結果。
GroupCommitService的doCommit方法:
說明一下:分析doCommit方法之前,先提及一下swapRequests這個方法,之前提過,GroupCommitService服務線程該每隔10毫秒或調用了該線程的wakeup()方法后執行doCommit()方法,具體地要涉及到waitForRunning方法,waitForRunning方法中onWaitEnd的作用在這里就可以提及一下了,它的作用就是將requestsWrite 轉換為requestsRead ,這個與消息存儲過程中處理dispatchRequest是類似的。
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
doCommit代碼:
private void doCommit() {
//
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
其中,具體的刷盤核心代碼:CommitLog.this.mappedFileQueue.flush(0); 接下來看其他兩個刷盤服務線程,對CommitLog.this.mappedFileQueue.flush(0)下文將具體講解。
FlushRealTimeService
// 刷新策略(默認是實時刷盤)
flushCommitLogTimed
// 刷盤時間間隔(默認0.5s)
interval = flushIntervalCommitLog
// 每次刷盤至少需要多少個page(默認是4個)
flushPhysicQueueLeastPages
// 徹底刷盤間隔時間(默認10s)
flushPhysicQueueThoroughInterval
大致邏輯:
-- 如果 當前時間 >(最后一次刷盤時間 + 徹底刷盤間隔時間(10s)),則將最新一次刷盤時間更新為當前時間
-- 如果是實時刷盤,每隔一定時間間隔,該線程休眠500毫秒
如果不是實時刷盤,則調用waitForRunning,即每隔500毫秒或該刷盤服務線程調用了wakeup()方法之后結束阻塞。
-- 調用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
CommitRealTimeService
CommitRealTimeService 比較特殊,它會包含提交和異步刷盤邏輯,專門為開啟內存字節緩沖區的刷盤服務。
// 提交到FileChannel的時間間隔,只在TransientStorePool 打開的情況下使用,默認0.2s
interva l= commitIntervalCommitLog
//每次提交到File至少需要多少個page(默認是4個)
commitDataLeastPages = commitCommitLogLeastPages
/ 提交完成間隔時間(默認0.2s)
commitDataThoroughInterval
大致邏輯:
如果 當前時間 >(最后一次提交時間 + 提交完成間隔時間),更新lastCommitTimestamp之后,執行提交的核心邏輯:
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
如果result == false 則意味着有新的數據 committed,此時需要wakeup刷盤線程,即:
flushCommitLogService.wakeup(); 進行異步刷盤處理。
可知道,刷盤的下一層核心邏輯:
mappedFileQueue.flush
mappedFileQueue.commit
flush
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
commit
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
this.committedWhere = where;
}
return result;
}
從上代碼可以看出,刷盤過程與MappedFile有很大關系,通過findMappedFileByOffset方法找到要刷盤的MappedFile,然后MappedFile中采用數據刷盤技術將數據刷入到磁盤
MappedFile的刷盤方式有兩種:
1. 寫入內存字節緩沖區(writeBuffer) ----> 從內存字節緩沖區(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)flush到磁盤
2.寫入映射文件字節緩沖區(mappedByteBuffer) ----> 映射文件字節緩沖區(mappedByteBuffer)flush
(MappedFile的刷盤方式待具體分析,待補充...)
