Broker消息存儲機制
RocketMQ 使用 CommitLog 文件將消息存儲到磁盤上,那么 RocketMQ 存儲消息到磁盤的過程是怎么樣的呢?
RocketMQ 首先將消息數據寫入操作系統 PageCache,然后定時將數據刷入磁盤。
一、Broker 消息存儲的流程是什么?
下面主要介紹 RocketMQ 是如何接收發送消息請求並將消息寫入 PageCache 的,整個過程如下:
(1)Broker 接收客戶端發送消息的請求並做預處理。
SendMessageProcessor.processRequest()方法會自動被調用者接收、解析客戶端請求為消息實例。
-
- 解析請求參數
- 執行發送處理前的 Hook
- 調用保存方法存儲消息
- 執行發送處理后的 Hook
(2)Broker存儲前預處理消息。
預處理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor.sendMessage(),
首先,設置請求處理返回對象標志,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java,代碼如下:
1 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); 2 final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); 3 4 response.setOpaque(request.getOpaque());
Netty 是異步執行的,請求發送到 Broker 被處理后,返回結果時,在客戶端的處理線程已經不再是發送請求的線程,那么客戶端如何確定返回結果對應哪個請求呢? 通過返回標志來判斷。
其次,做一些列存儲前發送請求的數據檢查,比如死信消息處理、Broker 是否拒絕事務消息處理、消息基本檢查等。消息基本檢查方法代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\AbstractSendMessageProcessor.java 中 msgCheck(),該方法的主要功能如下:
-
- 校驗 Broker 是否配置可寫
- 校驗 Topic 名字是否為莫認證
- 校驗 Topic 配置是否存在
- 校驗 queueId 與讀寫隊列數是否匹配
- 校驗 Broker 是否支持事務消息(msgCheck之后進行的校驗)
(3)執行 DefaultMessageStore.putMessage() 方法進行消息校驗和存儲模塊檢查
在真正保存消息前,會對消息數據做基本檢查、對存儲服務做可用性檢查、對 Broker 做是否 Slave 的檢查等,總結如下:
-
- 校驗存儲模塊是否已經關閉
- 校驗 Broker 是否是Slave
- 校驗存儲模塊運行標記
- 校驗 Topic 長度
- 校驗擴展信息的長度
- 校驗操作系統 PageCache 是否繁忙。
(4)執行 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 的 putMessage() 方法,將消息寫入 CommitLog。
存儲消息的核心處理過程如下:
- 設置消息保存時間為當前時間戳,設置消息完整性校驗碼 CRC(循環冗余碼)。
- 延遲消息處理。如果發送的消息是延遲消息,這里會單獨設置延遲消息的數據字段,比如修改 Topic 為延遲消息特有的 Topic —— SCHEDULE_TOPIC_XXXX,並且備份原來的 Topic 和 queueId,以便延遲消息在投遞后被消費者消費。
延遲消息的處理代碼如下:
1 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 2 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE 3 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 4 // Delay Delivery 5 if (msg.getDelayTimeLevel() > 0) { 6 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 7 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 8 } 9 10 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; 11 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 12 13 // Backup real topic, queueId 14 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 15 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 16 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 17 18 msg.setTopic(topic); 19 msg.setQueueId(queueId); 20 } 21 }
- 獲取最后一個 CommitLog 文件實例 MappedFile,鎖住該 MappedFile。默認為自旋鎖,也可以通過 useReentrantLockWhenPutMessage 進行配置、修改和使用 ReentrantLock。
- 校驗最后一個 MappedFile,如果結果為空或已寫滿,則新創建一個 MappedFile 返回。
- 調用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final AppendMessageCallback cb),將消息寫入 MappedFile。
根據消息是單個消息還是批量消息來調用 AppendMessageCallback.doAppend()方法,並將消息寫入 Page Cache,該方法的功能包含以下幾點:
- 查找即將寫入的消息物理機 Offset
- 事務消息單獨處理。這里主要處理 Prepared 類型和 Rollback 類型的消息,設置消息 queueOffset 為 0 。
- 序列化消息,並將序列化結果保存到 ByteBuffer 中(文件內存映射的 Page Cache 或 Direct Memory,簡稱 DM)。特別地,如果將刷盤設置為異步刷盤,那么當ransientStorePoolEnablTrue時,會先寫入DM,DM中的數據再異步寫入文件內存映射的Page Cache 中,因為消費者始終是從 Page Cache 中讀取消息消費的,所以這個機制也稱為 "讀寫分離"。
- 更新消息所在 Queue 的位點。
在消息存儲完成后,會處理刷盤邏輯和主從同步邏輯,分別調用 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush() 方法,代碼如下
1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 // Synchronization flush 3 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { 4 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; 5 if (messageExt.isWaitStoreMsgOK()) { 6 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 7 service.putRequest(request); 8 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); 9 PutMessageStatus flushStatus = null; 10 try { 11 flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 12 TimeUnit.MILLISECONDS); 13 } catch (InterruptedException | ExecutionException | TimeoutException e) { 14 //flushOK=false; 15 } 16 if (flushStatus != PutMessageStatus.PUT_OK) { 17 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() 18 + " client address: " + messageExt.getBornHostString()); 19 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 20 } 21 } else { 22 service.wakeup(); 23 } 24 } 25 // Asynchronous flush 26 else { 27 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 28 flushCommitLogService.wakeup(); 29 } else { 30 commitLogService.wakeup(); 31 } 32 } 33 }
和handleHA()方法,代碼如下:
1 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { 3 HAService service = this.defaultMessageStore.getHaService(); 4 if (messageExt.isWaitStoreMsgOK()) { 5 // Determine whether to wait 6 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 7 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 8 service.putRequest(request); 9 service.getWaitNotifyObject().wakeupAll(); 10 PutMessageStatus replicaStatus = null; 11 try { 12 replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 13 TimeUnit.MILLISECONDS); 14 } catch (InterruptedException | ExecutionException | TimeoutException e) { 15 } 16 if (replicaStatus != PutMessageStatus.PUT_OK) { 17 log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " 18 + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); 19 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 20 } 21 } 22 // Slave problem 23 else { 24 // Tell the producer, slave not available 25 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 26 } 27 } 28 } 29 30 }
在 Broker 處理發送消息請求時,由於處理器 SendMessageProcessor 本身是一個線程池服務,所以設計了快速失敗邏輯,方便在高峰時自我保護,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\latency\BrokerFastFailure.java中的cleanExpiredRequest()方法,代碼如下:
1 private void cleanExpiredRequest() { 2 while (this.brokerController.getMessageStore().isOSPageCacheBusy()) { 3 try { 4 if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { 5 final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS); 6 if (null == runnable) { 7 break; 8 } 9 10 final RequestTask rt = castRunnable(runnable); 11 rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size())); 12 } else { 13 break; 14 } 15 } catch (Throwable ignored) { 16 } 17 } 18 19 cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(), 20 this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()); 21 22 cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), 23 this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()); 24 25 cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), 26 this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); 27 28 cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this 29 .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); 30 }
在 BrokerController 啟動 BrokerFastFailure 服務時,會啟動一個定時任務處理快速失敗的異常,啟動及掃描代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\latency\BrokerFastFailure.java,具體代碼如下:
public void start() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) { cleanExpiredRequest(); #每間隔 10ms 執行一次該方法,清理非法、過期的請求。 } } }, 1000, 10, TimeUnit.MILLISECONDS); }
cleanExpiredRequest()方法處理方式有3種:
- 系統繁忙時發送消息請求快速失敗處理。當操作系統 PageCache繁忙時,會將發送消息請求從發送消息請求線程池工作隊列中取出來,直接返回SYSTEM_BUSY。如果此種情況發生說明系統已經不堪重負,需要增加系統資源或者擴容來減輕當前 Broker 的壓力。
- 發送請求超時處理。
- 拉取消息請求超時處理。
第2種和第3種的代碼邏輯和第1種代碼邏輯處理類似,如果出現了,說明請求在線程池的工作隊列中的排隊時間超過預期配置的時間,那么增加排隊等待時間即可。如果請求持續超時,說明系統可能達到瓶頸,那么需要增加系統資源或者擴容。
二、Broker如何保證高效存儲?——內存映射機制與高效寫磁盤
RocketMQ 在存儲設計中通過內存映射、順序寫文件等方式實現了高吞吐。
那么這些怎么實現的呢?
RocketMQ 的基本數據結構:
org.apache.rocketmq.store.CommitLog:RocketMQ 對存儲消息的物理文件的抽象實現,也就是物理 CommitLog 文件的具體實現。
org.apache.rocketmq.store.MappedFile:CommitLog 文件在內存中的映射文件,映射文件同時具有內存的寫入速度和磁盤一樣可靠的持久化方式。
org.apache.rocketmq.store.MappedFileQueue:映射文件隊列中有全部的 CommitLog 映射文件,第一個映射文件為最先過期的文件,最后一個文件是最后過期的文件,最新的消息總是寫入最后一個映射文件中。
CommitLog、MappedFile、MappedFileQueue 與物理 CommitLog 文件的關系如下:
每個 MappedFileQueue 包含多個 MappedFile,就是真實的物理 CommitLog文件,Java 通過 java.nio.MappedByteBuffer 來實現文件的內存映射,即文件讀寫都是通過 MappedByteBuffer(其實是 Page Cache)來操作的。
寫入數據時先加鎖,然后通過 Append 方式寫入最新 MappedFile。對於讀取消息,大部分情況下用戶只關心最新數據,而這些數據都在 Page Cache 中,也就是說,讀寫文件就是在 Page Cache 中進行的,其速度幾乎等於志傑操作內存的速度。
三、文件刷盤機制
消息存儲完成后,會被操作系統持久化到磁盤,也就是刷盤。
RocketMQ 支持2種刷盤方式,在 Broker 啟動時:
- 配置 flushDiskType = SYNC_FLUSH 表示同步刷盤
- 配置 flushDiskType = ASYNC_FLUSH 表示異步刷盤
GroupCommitService 就是 org.apahce.rocketmq.store.CommitLog.GroupCommitServie —— 同步刷盤服務。在 Broker 存儲消息到 Page Cache 后,同步將 Page Cache 刷到磁盤,再返回客戶端消息並寫入結果,具體過程如下所示:
FlushRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.FlushRealTimeService —— 異步刷盤服務。在 Broker 存儲消息到 Page Cache 后,立即返回客戶端寫入結果,然后異步刷盤服務將 Page Cache 異步刷盤到磁盤。
CommitRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.CommitRealTimeService —— 異步轉存服務。Broker 通過配置讀寫分離將消息寫入直接內存(Direct Memory),簡稱 DM),然后通過異步轉存服務,將 DM 中的數據再次存儲到 Page Cache 中,以供異步刷盤服務將 Page Cache 刷到磁盤中,轉存服務過程如下:
將消息成功保存到 CommitLog 映射文件后,調用 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush() 方法處理刷盤邏輯,代碼如下:
1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 // Synchronization flush
3 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { 4 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; 5 if (messageExt.isWaitStoreMsgOK()) { 6 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 7 service.putRequest(request); 8 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); 9 PutMessageStatus flushStatus = null; 10 try { 11 flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 12 TimeUnit.MILLISECONDS); 13 } catch (InterruptedException | ExecutionException | TimeoutException e) { 14 //flushOK=false;
15 } 16 if (flushStatus != PutMessageStatus.PUT_OK) { 17 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() 18 + " client address: " + messageExt.getBornHostString()); 19 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 20 } 21 } else { 22 service.wakeup(); 23 } 24 } 25 // Asynchronous flush
26 else { 27 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 28 flushCommitLogService.wakeup(); 29 } else { 30 commitLogService.wakeup(); 31 } 32 } 33 }
通過以上代碼可知,同步刷盤、異步刷盤都是在這里發起的。異步刷盤的實現根據是否配置讀寫分離機制而稍有不同。
接下來我們介紹兩種刷盤方式:
(1)同步刷盤:
同步刷盤是一個后台線程服務 ,消息進行同步刷盤的流程如下圖:
存儲消息線程:主要負責將消息存儲到 Page Cache 或者 DM 中,存儲成功后通過調用 handleDiskFlush() 方法將同步刷盤請求 "發送" 給 GroupCommitService 服務,並在該刷盤請求上執行鎖等待,代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush(),具體代碼如下:
1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 // Synchronization flush 3 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { 4 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; 5 if (messageExt.isWaitStoreMsgOK()) { #客戶端可以設置,默認為True 6 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 7 service.putRequest(request); #保存同步磁盤請求 8 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); #請求同步鎖等待 9 PutMessageStatus flushStatus = null; 10 try { 11 flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 12 TimeUnit.MILLISECONDS); 13 } catch (InterruptedException | ExecutionException | TimeoutException e) { 14 //flushOK=false; 15 } 16 if (flushStatus != PutMessageStatus.PUT_OK) { 17 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() #記錄刷盤超時設置 18 + " client address: " + messageExt.getBornHostString()); 19 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 20 } 21 } else { 22 service.wakeup(); #異步刷盤,不用同步返回 23 } 24 } 25 // Asynchronous flush 26 else { 27 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 28 flushCommitLogService.wakeup(); 29 } else { 30 commitLogService.wakeup(); 31 } 32 } 33 }
同步刷盤服務線程:通過 GroupCommitService 類實現的同步刷盤服務。
具體同步刷盤是怎么執行的,執行完成后又是如何將刷盤結果通知存儲數據線程的呢?
正常同步刷盤線程會間隔 10ms 執行一次 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法,該方法循環每一個同步刷盤請求,如果刷盤成功,那么喚醒等待刷盤請求鎖的存儲消息線程,並告知刷盤成功。
由於操作系統刷盤耗時及每次刷多少字節數據到磁盤等,都不是 RocketMQ 進程能掌控的,所以在每次刷盤前都需要做必要的檢查,以確認當前同步刷盤請求對應位點的消息是否已經被刷盤,如果已經被刷盤,當前刷盤請求就不需要執行。
在 RocketMQ 進程正常關閉時,如果有同步刷盤請求未執行完,那么數據會丟失嗎?
答案是:不會的。在上圖,我們得知,關閉刷盤服務時,會執行 Thread.sleep(10) 等待所有的同步刷盤請求保存到刷盤請求隊列中后,交換保存刷盤請求的隊列,再執行 doCommit() 方法。
(2)異步刷盤:
如果 Broker 配置讀寫分離,則異步刷盤過程包含異步轉存數據和真正的異步刷盤操作。
異步轉存數據是通過 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法實現的。
下面將介紹異步轉存數據服務的核心的執行過程。
(1)獲取轉存參數。整個轉存過程的參數都是可配置的。
(2)執行轉存數據。
(3)轉存失敗,喚醒異步刷盤線程。轉存數據失敗,並不代表沒有數據被轉存到 Page Cache 中,而是說明有部分數據轉存成功,部分數據轉存失敗。所以可以喚醒刷盤線程執行刷盤操作。而如果轉存成功,則正常執行異步刷盤即可。
在異步轉存服務和存儲服務把消息寫入 Page Cache 后,由異步刷盤將消息刷入磁盤中。異步刷盤服務的主要功能是將 Page Cache 中的數據異步刷入磁盤,並記錄 Checkpoint 信息。異步刷盤的實現代碼主要在 org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run() 方法中,步驟拆解如下:
第一步:獲取刷盤參數;
第二步:等待刷盤間隔;
第三步:執行刷盤;
第四步:記錄 CheckPoint 和耗時日志。這里主要記錄最后刷盤成功過時間和刷盤耗時超過 500ms 的情況。