概述
在 Broker 的通用請求處理器將一個消息進行分發后,就來到了 Broker 的專門處理消息存儲的業務處理器部分。本篇文章,我們將要探討關於 RocketMQ 高效的原因之一:文件結構的良好設計與對 Page Cache 的極致"壓榨"。
文件系統的結構設計
在 RocketMQ 的 Broker 中,有一類叫做 CommitLog 的文件,所有在該 Broker 上的 Topic 上的消息都會順序的寫入到這個文件中。
該消息的元信息存儲着消息所在的 Topic 與 Queue。當消費者要進行消費時,會通過 ConsumerQueue 文件來找到自己想要消費的隊列。
該隊列不存儲具體的消息,而是存儲消息的基本信息與偏移量。消費者通過偏移量去 CommitLog 中找到自己需要消費的信息然后取出,就可以進行消費。
並且,Broker 還可以對 CommitLog 來建立 Hash 索引文件 IndexFile,這樣就可以通過 消息的 key 來找到消息。
官網上的這張圖很好的表示了三類文件之間的關系。當然這章我們還是先只來看 CommitLog,其他兩個留給下一章。

消息管理的結構層次
在學習 Broker 對於消息的處理時,我們可以跟着下面這張圖走,這樣可以對 Broker 的文件系統有一個清晰的了解

上圖的主要思路來源於 該圖 ,由於找不到原作者,故進行了重制與拓展
業務處理層
在 上一篇文章 中,我們看到在 BrokerController
中,SendMessageProcessor
注冊了以下請求碼
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
同時,在處理業務請求時,通用請求處理器是,通過調用 AsyncNettyRequestProcessor
的 asyncProcessRequest
方法來處理連接和命令的,雖然也有同步的調用,但實際上大部分的業務處理 handler 都實現了異步的請求處理方法。
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
在 SendMessageProcessor
中,它首先是構造了一個異步處理方法(asyncProcessRequest
),然后由自己的線程池去執行(thenAcceptAsync
)。
根據上一章最后的那張線程模型的圖,我們能知道構造這個異步方法和該方法的調用,都是通過自己的線程池來執行的,所以和同步執行的區別不是很大。
進入到異步方法的構造
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// 重建請求頭
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
// 構建 MQ trace 上下文
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
在這里對不同的請求進行分別處理,我們現在在意的是 SEND_MESSAGE
,所以先進入到 asyncSendMessage
// 重組
final RemotingCommand response = preSend(ctx, request, requestHeader);
// 構建響應頭
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
// 獲取隊列與 Topic 配置
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
// 以內部消息的格式存儲
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 對於重試消息和延遲消息的處理
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
/* pass:這里設置了一堆其他屬性 */
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
// 對於寫入結果,構造響應
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
在這里,它做了這幾件事:
- 將消息通過內部消息(
MessageExtBrokerInner
)的格式保存 - 將重試消息、延時消息、事務消息由其他方法處理
- 除了完請求后,構造響應結果並返回
然后,我們進入了 Rocket 的存儲組件層
存儲組件層
這一層主要是負責操作下一層的邏輯文件對象來響應上一層的下發的請求。
工作的類是 MessageStore
,主要的實現是 DefaultMessageStore
接着對"放入消息"這個命令來進行響應
// 檢查持久化層的狀態
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 檢查消息正確
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
// 存儲消息到 CommitLog
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
/* pass:更新度量信息 */
});
return putResultFuture;
到達了這一層后,首先對當前的持久化能力進行檢查:
- 是否已經關閉
- 是否為從 Broker
- 是否可寫(磁盤滿、寫索引錯誤等問題)
- page cache 是否 busy 或被禁用
然后是消息的限制進行檢查。
在檢查完成后進入下一層
邏輯存儲層
邏輯存儲層中,CommitLog 是 RocketMQ 中持久化文件的抽象邏輯對象,它將底層的存儲結構與細節隱藏,可以當作是一個文件來寫入。
ConsumerQueue 則是一個 queue 在 CommitLog 中的消息的指針結構的文件抽象。IndexFile 是索引文件。
這一層比較復雜,我們分段來讀
// 設置存儲時間
msg.setStoreTimestamp(System.currentTimeMillis());
// 設置 CRC32
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// 返回結果
AppendMessageResult result = null;
// 持久化的度量信息管理服務
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 非事務或為提交消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 需要延遲投遞的話
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 進行編碼,返回結果不為空則說明出現異常
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
首先在第一部分,RocketMQ 對消息的信息進行進一步補充,也就是將上面講過的 MessageExtBrokerInner
補充完整以存儲。如持久化時間、完整性校驗。
同時對於事務消息與延時消息也要補充額外的信息,例如對於延遲消息,要裝入真實 Topic 的信息(延時消息的實現是通過更換 Topic 的"戲法"來實現的)
你可能會疑惑,事務消息和延時消息不是已經在業務處理層被其他方法處理到了嗎?實際上,那些方法最終還是會調用這個方法來做持久化
在消息的具體的信息都補充完成后,就會開始進行編碼為字節數組,具體的代碼在 MessageExtEncoder#encode
編碼完成后的 byte 數組,就是一條消息在磁盤上的真實物理結構

字段 | 含義 |
---|---|
total size | 該消息的總長度 |
magic code | 魔數 |
body crc | 消息的 body 部分的 CRC32 |
queue id | 該消息所屬的 queue id |
flag | 區分是普通 RPC 還是 oneway RPC 的標志 |
queue offset | 在所屬 queue 內的偏移量 |
physical offset | 在 commitLog 中的物理上的實際偏移量 |
sys flag | 見下表 |
born timestamp | 生產時間 |
born host | 生產所在主機IP(可能是 IPV4 或 IPV6) |
store timestamp | 持久化時間 |
store host | 持久化所在 Broekr |
reconsume time | 重消費次數 |
prepared transcation offset | 一個事務中 prepared 消息的 offset |
sys flag
變量名 | 含義 |
---|---|
COMPRESSED_FLAG | 壓縮消息。消息為批量的時候,就會進行壓縮,默認使用5級的 zip |
MULTI_TAGS_FLAG | 有多個 tag。 |
TRANSACTION_NOT_TYPE | 事務為未知狀態。當 Broker 回查 Producer 的時候,如果為 Commit 應該提交,為 Rollback 應該回滾,為 Unknown 時應該繼續回查 |
TRANSACTION_PREPARED_TYPE | 事務的運行狀態。當前消息是事務的一部分 |
TRANSACTION_COMMIT_TYPE | 事務的提交消息。要求提交事務 |
TRANSACTION_ROLLBACK_TYPE | 事務的回滾消息。要求回滾事務 |
BORNHOST_V6_FLAG | 生成該消息的 host 是否 ipv6 的地址 |
STOREHOSTADDRESS_V6_FLAG | 持久化該消息的 host 是否是 ipv6 的地址 |
然后接着來看第二部分
// 文件寫入,加鎖
putMessageLock.lock(); // 自旋鎖或可重入鎖的選擇取決於存儲引擎配置
try {
// 獲取最新的 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// 通過設置時間戳保證全局有序
msg.setStoreTimestamp(beginLockTimestamp);
// 不存在或已滿則創建
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 追加寫到消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// 寫結果處理
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// 到文件尾部則創建文件且寫入消息
unlockMappedFile = mappedFile;
// 創建文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
// 創建完成后重新寫入
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case XXX:
/* pass:對於異常結果設置響應碼,且直接返回 */
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
/* pass:打Log */
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 更新度量數據
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 刷盤請求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 副本策略
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
/* pass:對於刷盤問題和副本問題設置響應碼 */
});
當要存放的消息組裝好后,就對文件寫入加鎖,且交給下一層寫入。寫入完成后如果成功則會進行刷盤檢查和使用副本策略。
存儲映像層
存儲映像層中,MappedFile 是對一個物理文件的抽象,MappedFileQueue 是一組文件的隊列,MappedByteBuffer 則是一個映射到物理文件所對應的 page cache 的一塊內存。
在上一層中,它嘗試從當前的 mappedFileQueue 獲取或創建最新的 MappedFile,然后將 byte 數組交給其寫入。
我們再來看看這層是怎么處理寫入的。
// 獲取寫指針
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 將 byteBuffer 交給 AppendMessageCallback 寫入
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新寫位置
this.wrotePosition.addAndGet(result.getWroteBytes());
// 更新持久化時間戳
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
在這層,該 MappedFile 首先會獲取 writeBuffer 或 mappedByteBuffer 作為 ByteBuffer ,然后嘗試進行寫入
writeBuffer 和 mappedByteBuffer
writeBuffer 是一塊直接內存,也就是說它並不受 Java 堆去管理,當然也不會被 GC 限制。
而 mappedByteBuffer 則是一塊映射到 CommitLog 文件的內存(具體可以了解 mmap )。
准確來講,它占用了 page cache 的一部分,也就是說寫入這里的文件可以免去 從用戶空間到內核空間一次拷貝成本,這叫做 零拷貝(zero-copy) 。
而如果通過 writeBuffer 寫入,則需要再寫入對應 CommitLog 文件的 Channel (如果我們的 writeBuffer 不是堆外內存,那還會發生一次從堆到堆外的拷貝),寫入 Channel 實際上是寫入了 page cache,也就是 mappedByteBuffer 映射的那部分。
所以說,如果在 Channel 寫入,那我們是可以在 mappedByteBuffer 上讀取到的。
但是 page cache 中的內存終究只是一塊內存,斷電宕機就會丟失,所以是一定會寫回到硬盤中。Linux 中具有多種寫回策略,這里就不列舉了。
提到這個是因為 writeBuffer 和 mappedByteBuffer 的寫回問題。
我們知道他們在寫入后是寫入到同一塊 page cache,但是在 flush 回磁盤的時候,他們則是"各寫各的",這點在 FileChannel
上的注釋有提到。
在后文介紹刷盤策略的時候,同步刷盤時是只對這兩塊 buffer 中的一塊進行 flush(因為我們只對一塊寫入)
了解了這兩塊 ByteBuffer 的作用后,我們再接着看是具體怎么寫入的
public AppendMessageResult doAppend(
// 文件開始寫的偏移量
final long fileFromOffset,
// 和文件映射好的一塊 DirectBuffer
final ByteBuffer byteBuffer,
// 最大空隙
final int maxBlank,
// 需要寫入的消息
final MessageExtBrokerInner msgInner,
PutMessageContext putMessageContext) {
// 物理上的開始寫時偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
// id = 持久化時時間戳 + 持久化所在host + 物理偏移量
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// 獲取邏輯隊列的消費偏移量
String key = putMessageContext.getTopicQueueTableKey();
long queueOffset = CommitLog.this.topicQueueTable.computeIfAbsent(key, k -> 0L);
// 特殊處理事務消息
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared 消息和 Rollback 消息不會被消費,不會進入 consumer queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
final int msgLen = preEncodeBuffer.getInt(0);
// 確保有充足的空間,沒有則進來新建文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
this.msgStoreItemMemory.putInt(maxBlank);
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 將總長與魔數寫入尾部
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
int pos = 4 + 4 + 4 + 4 + 4;
// 6 填充之前沒有寫的 queue 中偏移量
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 commit log 上的偏移量(當前文件的全局偏移量 + 文件內偏移量)
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 正式將數據寫入文件(准確來講,是緩沖區或 page cache)
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
在獲取后計算寫入的偏移量與要填充的信息(在上一層的持久化格式的創建中並沒有填充所有的內容),統計根據計算后的偏移量來觀察是否需要新建文件,不需要的話可以直接寫入。這樣就完成了寫入。
在寫入完成后,我們回到邏輯存儲層,那里會繼續更新度量信息和進行刷盤策略和副本同步策略
刷盤策略
在寫入完成后,邏輯存儲層 會對根據當前的刷盤策略來做對應的操作
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 同步刷盤策略
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// 寫入位置結束點 與 刷盤超時時間
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
} else /* 異步刷盤 */ {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
在以上代碼中,會根據策略來選擇不同的刷盤服務,所有的刷盤服務都繼承自 FlushCommitLogService
:
-
GroupCommitService
同步的刷盤策略
-
FlushRealTimeService
異步的刷盤策略
-
CommitRealTimeService
只 commit 策略
同步
我們先來看第一個刷盤策略的實現
在這個策略的實現中,具有兩個請求存放隊列
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<>();
在請求到達的時候,會放入 write 隊列
public synchronized void putRequest(final GroupCommitRequest request) {
lock.lock();
try {
// 加入到請求隊列
this.requestsWrite.add(request);
} finally {
lock.unlock();
}
// 喚醒提交線程
this.wakeup();
}
從源碼看得出,這里的刷盤並不是由當前線程自己去刷盤,而是阻塞直到另一個專門的刷盤線程來刷盤后再繼續運行。
我們再來看刷盤線程的運行
public void run() {
while (!this.isStopped()) {
try {
// 在這里會進行等待,並交換讀寫隊列
// 這樣設計可以避免單個隊列的加鎖並發問題
this.waitForRunning(10);
// 嘗試提交
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// shutdown,處理最后的請求
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
}
這里主要還是等待然后嘗試提交。這里之所以說是嘗試,是因為其 waitForRunning
方法的實現
protected void waitForRunning(long interval) {
// 如果能 cas,說明已經有線程提交了請求
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
waitPoint.reset();
try {
// 否則 wait 直到請求到來被喚醒或睡夠了
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
它的最大等待時間只有 10 毫秒,且當結束時會調用 this.onWaitEnd()
方法,這會讓 requestRead 隊列和 requestWrite 隊列交換。這樣可以減少在運行時的同步問題。
同時需要注意的是 waitForRunning
方法是在其父類 ServiceThread
中實現的,而 onWaitEnd
方法是留給其子類實現的一個方法
然后進入到 doCommit 方法
private void doCommit() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// 判斷 已經flush到的位置 是否大於 請求中需要flush到的位置
// 如果是的話,當然就不需要 flush 了
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// 下一個文件中可能有消息,所以最多可以 flush 兩次
for (int i = 0; i < 2 && !flushOK; i++) {
// 進行 flush
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
// flush complete, so wakeup
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 換一個新的隊列
this.requestsRead = new LinkedList<>();
} else {
// 因為個別消息設置為 async flush,所以會來到這個地方
CommitLog.this.mappedFileQueue.flush(0);
}
}
代碼和注釋講的都比較的清楚了,主要的是對 mappedFileQueue
的最新的一個 mappedFile 進行 flush,而這個方法同樣會被其他刷盤策略使用,所以我們等看完了其他刷盤策略后再了解
異步
接着看異步的刷盤策略,異步刷盤相較同步要簡單一些,只是簡單的將異步刷盤線程喚醒
while (!this.isStopped()) {
/* pass:基本變量的獲取 */
// 當當前時間超最久需要 flush 時間,則將 flush 所需頁設定為0 (立即 flush)
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
}
try {
// 是否開啟定時刷新
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
// 如果關閉則需要會在寫入后自行通知來喚醒當前線程
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
// 進行 flush
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// 在 shutdown 前,確保所有的消息都被 flush
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
}
這里的刷盤策略可以用一句話概括:(距離上次刷盤超過了一定時間) 或 (在 page cache 中的臟頁超過指定數量) 則進行刷盤
至於只 commit 策略,它的策略和異步刷盤策略差不多,只是刷盤后更新的指針與使用的刷盤方法不一樣。
也就是說最大的區別在於,它並沒有刷盤,只是將 writeBuffer 中的內容 write 到了 channel (page cache) 而已。
刷盤實現
接下來看同步策略和異步策略的具體刷盤方法
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// 對找到的最新 MappedFile flush
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
這里通過偏移量找到 MappedFile,然后調用其的 flush
方法並更新指針
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
// 只 flush fileChannel 和 mappedByteBuffer 其一
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
// 更新已經 flush 的偏移量
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
首先,先判斷是否運行寫入,判斷主要是根據當前在 page cache 中臟頁的頁數是否超過傳入的 flushLeastPages
。
最后則是對於 writeBuffer 和 mappedByteBuffer 任一調用 force
方法刷盤。