RocketMq刷盤機制
handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//① 同步刷盤使用GroupCommitService來刷盤
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//MessageConst.PROPERTY_WAIT_STORE_MSG_OK屬性是否為true 默認Message構造中 都為true
if (messageExt.isWaitStoreMsgOK()) {
//創建commit請求 將數據從mappedByteBuffer中刷盤到磁盤中
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//阻塞當前進程,等待刷盤完成或者5秒超時返回
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
//如果刷盤失敗打印日志
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush ②
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//transientStorePoolEnable沒有開啟或者是從broker
flushCommitLogService.wakeup();
} else {
//transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole() 開啟transientStorePoolEnable,且刷盤模式是異步刷盤,且角色不說從broker
commitLogService.wakeup();
}
}
}
①同步刷盤使用GroupCommitService
②異步刷盤 且開啟了transientStorePoolEnable且不是從服務器,使用CommitLogService 否則使用FlushCommitLogService刷盤
GroupCommitService
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
putRequest 提交刷盤請求
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//等待運行 沒有任務就睡眠10毫秒
this.waitForRunning(10);
//提交刷盤
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
//處理刷盤請求
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
//刷盤
for (int i = 0; i < 2 && !flushOK; i++) {
//刷盤指針是否大於寫指針
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
//如果還有數據可以刷就進行刷盤
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
//喚醒等待刷盤完成的阻塞線程
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
這里調用了mappedFileQueue.flush(0)進行刷盤
mappedFileQueue.flush(0)
public boolean flush(final int flushLeastPages) {
boolean result = true;
//①根據刷盤指針找到對應的文件
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//②核心邏輯調用 mappedFile#flush刷盤
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
①根據刷盤指針找到對應的文件
②核心邏輯調用 mappedFile#flush刷盤
mappedFile.flush(0)
public int flush(final int flushLeastPages) {
//①判斷是否需要進行刷盤 這里傳0的話表示強制刷盤
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
//this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); writeBuffer存在就用wrotePosition指針,否則用committedPosition指針
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
//writeBuffer存在或者fileChannel的position不為0用fileChannel刷盤
this.fileChannel.force(false);
} else {
//直接用mappedByteBuffer刷盤
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
//設置刷盤指針
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
//返回刷盤指針位置
return this.getFlushedPosition();
}
FlushRealTimeService 異步刷盤
未開啟transientStorePoolEnable
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//刷盤線程是否休眠 默認false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//間隔500毫秒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//刷盤至少滿4頁
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//強制刷盤間隔 10秒 (未滿4頁也刷盤 防止數據丟失)
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
//具體上次強制刷盤時間超過10秒
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
//設置成0,將觸發強制刷盤
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
//刷盤線程是否休眠 默認false
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
//線程阻塞500毫秒 中途可被喚醒
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
//至少滿4頁才刷盤 但是每10秒將會強制刷盤一次,flushPhysicQueueLeastPages會被設置為0
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
CommitRealTimeService
異步將writeBuffer的數據刷到fileChannel
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//默認200毫秒阻塞等待喚醒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//至少4頁才把數據commit到fileChannel
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//強制commit 每隔200毫秒
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//核心邏輯 將數據commit到fileChannel
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
//result是false 表示有數據commit了
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
//喚醒刷盤線程
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
mappedFileQueue.commit
public boolean commit(final int commitLeastPages) {
boolean result = true;
//根據當前已提交偏移量找到對應的文件
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
//委托mappedFile commit數據
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
this.committedWhere = where;
}
return result;
}
mappedFile.commit
public int commit(final int commitLeastPages) {
//writeBuffer是null,這應該是不正常的
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
//是否可以進行commit,至少堆積commitLeastPages頁數據 為0的話表示強制commit
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
//調用commit0
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel. 文件已經寫滿且已經全commit,可以把writeBuffer歸還給池子里了
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
//返回commit指針
return this.committedPosition.get();
}
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
//將lastCommittedPosition和writePos之間的數據刷到fileChannel中
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
//設置已提交指針為writePos
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
