RocketMQ(4.8.0)——Broker消息存儲機制


Broker消息存儲機制

  RocketMQ 使用 CommitLog 文件將消息存儲到磁盤上,那么 RocketMQ 存儲消息到磁盤的過程是怎么樣的呢?

  RocketMQ 首先將消息數據寫入操作系統 PageCache,然后定時將數據刷入磁盤。

一、Broker 消息存儲的流程是什么?

  下面主要介紹 RocketMQ 是如何接收發送消息請求並將消息寫入 PageCache 的,整個過程如下:

(1)Broker 接收客戶端發送消息的請求並做預處理。

  SendMessageProcessor.processRequest()方法會自動被調用者接收、解析客戶端請求為消息實例。

    • 解析請求參數
    • 執行發送處理前的 Hook
    • 調用保存方法存儲消息
    • 執行發送處理后的 Hook

(2)Broker存儲前預處理消息。

  預處理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor.sendMessage(),

  首先,設置請求處理返回對象標志,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java,代碼如下:

1     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
2     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
3 
4     response.setOpaque(request.getOpaque());

  Netty 是異步執行的,請求發送到 Broker 被處理后,返回結果時,在客戶端的處理線程已經不再是發送請求的線程,那么客戶端如何確定返回結果對應哪個請求呢? 通過返回標志來判斷。

  其次,做一些列存儲前發送請求的數據檢查,比如死信消息處理、Broker 是否拒絕事務消息處理、消息基本檢查等。消息基本檢查方法代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\AbstractSendMessageProcessor.java 中 msgCheck(),該方法的主要功能如下:

    • 校驗 Broker 是否配置可寫
    • 校驗 Topic 名字是否為莫認證
    • 校驗 Topic 配置是否存在
    • 校驗 queueId 與讀寫隊列數是否匹配
    • 校驗 Broker 是否支持事務消息(msgCheck之后進行的校驗)

 (3)執行 DefaultMessageStore.putMessage() 方法進行消息校驗和存儲模塊檢查

  在真正保存消息前,會對消息數據做基本檢查、對存儲服務做可用性檢查、對 Broker 做是否 Slave 的檢查等,總結如下:

    • 校驗存儲模塊是否已經關閉
    • 校驗 Broker 是否是Slave
    • 校驗存儲模塊運行標記
    • 校驗 Topic 長度
    • 校驗擴展信息的長度
    • 校驗操作系統 PageCache 是否繁忙。

  (4)執行 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java putMessage() 方法,將消息寫入 CommitLog。

  存儲消息的核心處理過程如下:

    • 設置消息保存時間為當前時間戳,設置消息完整性校驗碼 CRC(循環冗余碼)。
    • 延遲消息處理。如果發送的消息是延遲消息,這里會單獨設置延遲消息的數據字段,比如修改 Topic 為延遲消息特有的 Topic —— SCHEDULE_TOPIC_XXXX,並且備份原來的 Topic 和 queueId,以便延遲消息在投遞后被消費者消費。

延遲消息的處理代碼如下:

 1         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 2         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
 3                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 4             // Delay Delivery
 5             if (msg.getDelayTimeLevel() > 0) {
 6                 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 7                     msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 8                 }
 9 
10                 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
11                 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
12 
13                 // Backup real topic, queueId
14                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
15                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
16                 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
17 
18                 msg.setTopic(topic);
19                 msg.setQueueId(queueId);
20             }
21         }
  • 獲取最后一個 CommitLog 文件實例 MappedFile,鎖住該 MappedFile。默認為自旋鎖,也可以通過 useReentrantLockWhenPutMessage 進行配置、修改和使用 ReentrantLock。
  • 校驗最后一個 MappedFile,如果結果為空或已寫滿,則新創建一個 MappedFile 返回。
  • 調用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final AppendMessageCallback cb),將消息寫入 MappedFile。

根據消息是單個消息還是批量消息來調用 AppendMessageCallback.doAppend()方法,並將消息寫入 Page Cache,該方法的功能包含以下幾點:

  1. 查找即將寫入的消息物理機 Offset
  2. 事務消息單獨處理。這里主要處理 Prepared 類型和 Rollback 類型的消息,設置消息 queueOffset 為 0 。
  3. 序列化消息,並將序列化結果保存到 ByteBuffer 中(文件內存映射的 Page Cache 或 Direct Memory,簡稱 DM)。特別地,如果將刷盤設置為異步刷盤,那么當ransientStorePoolEnablTrue時,會先寫入DM,DM中的數據再異步寫入文件內存映射的Page Cache 中,因為消費者始終是從 Page Cache 中讀取消息消費的,所以這個機制也稱為 "讀寫分離"。
  4. 更新消息所在 Queue 的位點。

  在消息存儲完成后,會處理刷盤邏輯和主從同步邏輯,分別調用 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush() 方法,代碼如下

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5             if (messageExt.isWaitStoreMsgOK()) {
 6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7                 service.putRequest(request);
 8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
 9                 PutMessageStatus flushStatus = null;
10                 try {
11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
12                             TimeUnit.MILLISECONDS);
13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
14                     //flushOK=false;
15                 }
16                 if (flushStatus != PutMessageStatus.PUT_OK) {
17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
18                         + " client address: " + messageExt.getBornHostString());
19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
20                 }
21             } else {
22                 service.wakeup();
23             }
24         }
25         // Asynchronous flush
26         else {
27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
28                 flushCommitLogService.wakeup();
29             } else {
30                 commitLogService.wakeup();
31             }
32         }
33     }

和handleHA()方法,代碼如下:

 1     public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
 3             HAService service = this.defaultMessageStore.getHaService();
 4             if (messageExt.isWaitStoreMsgOK()) {
 5                 // Determine whether to wait
 6                 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
 7                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 8                     service.putRequest(request);
 9                     service.getWaitNotifyObject().wakeupAll();
10                     PutMessageStatus replicaStatus = null;
11                     try {
12                         replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
13                                 TimeUnit.MILLISECONDS);
14                     } catch (InterruptedException | ExecutionException | TimeoutException e) {
15                     }
16                     if (replicaStatus != PutMessageStatus.PUT_OK) {
17                         log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
18                             + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
19                         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
20                     }
21                 }
22                 // Slave problem
23                 else {
24                     // Tell the producer, slave not available
25                     putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
26                 }
27             }
28         }
29 
30     }

  在 Broker 處理發送消息請求時,由於處理器 SendMessageProcessor 本身是一個線程池服務,所以設計了快速失敗邏輯,方便在高峰時自我保護,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\latency\BrokerFastFailure.java中的cleanExpiredRequest()方法,代碼如下:

 1     private void cleanExpiredRequest() {
 2         while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
 3             try {
 4                 if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
 5                     final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
 6                     if (null == runnable) {
 7                         break;
 8                     }
 9 
10                     final RequestTask rt = castRunnable(runnable);
11                     rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
12                 } else {
13                     break;
14                 }
15             } catch (Throwable ignored) {
16             }
17         }
18 
19         cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
20             this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
21 
22         cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
23             this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
24 
25         cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
26             this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
27 
28         cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
29             .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
30     }

  在 BrokerController 啟動 BrokerFastFailure 服務時,會啟動一個定時任務處理快速失敗的異常,啟動及掃描代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\latency\BrokerFastFailure.java,具體代碼如下:

    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                    cleanExpiredRequest(); #每間隔 10ms 執行一次該方法,清理非法、過期的請求。
                }
            }
        }, 1000, 10, TimeUnit.MILLISECONDS);
    }

cleanExpiredRequest()方法處理方式有3種:

  1. 系統繁忙時發送消息請求快速失敗處理。當操作系統 PageCache繁忙時,會將發送消息請求從發送消息請求線程池工作隊列中取出來,直接返回SYSTEM_BUSY。如果此種情況發生說明系統已經不堪重負,需要增加系統資源或者擴容來減輕當前 Broker 的壓力。
  2. 發送請求超時處理。
  3. 拉取消息請求超時處理。

  第2種和第3種的代碼邏輯和第1種代碼邏輯處理類似,如果出現了,說明請求在線程池的工作隊列中的排隊時間超過預期配置的時間,那么增加排隊等待時間即可。如果請求持續超時,說明系統可能達到瓶頸,那么需要增加系統資源或者擴容。

二、Broker如何保證高效存儲?——內存映射機制與高效寫磁盤

  RocketMQ 在存儲設計中通過內存映射、順序寫文件等方式實現了高吞吐。

  那么這些怎么實現的呢?

  RocketMQ 的基本數據結構:

  org.apache.rocketmq.store.CommitLog:RocketMQ 對存儲消息的物理文件的抽象實現,也就是物理 CommitLog 文件的具體實現。

  org.apache.rocketmq.store.MappedFile:CommitLog 文件在內存中的映射文件,映射文件同時具有內存的寫入速度和磁盤一樣可靠的持久化方式。

  org.apache.rocketmq.store.MappedFileQueue:映射文件隊列中有全部的 CommitLog 映射文件,第一個映射文件為最先過期的文件,最后一個文件是最后過期的文件,最新的消息總是寫入最后一個映射文件中。

  CommitLog、MappedFile、MappedFileQueue 與物理 CommitLog 文件的關系如下:

  每個 MappedFileQueue 包含多個 MappedFile,就是真實的物理 CommitLog文件,Java 通過 java.nio.MappedByteBuffer 來實現文件的內存映射,即文件讀寫都是通過 MappedByteBuffer(其實是 Page Cache)來操作的。

  寫入數據時先加鎖,然后通過 Append 方式寫入最新 MappedFile。對於讀取消息,大部分情況下用戶只關心最新數據,而這些數據都在 Page Cache 中,也就是說,讀寫文件就是在 Page Cache 中進行的,其速度幾乎等於志傑操作內存的速度。

三、文件刷盤機制

  消息存儲完成后,會被操作系統持久化到磁盤,也就是刷盤。

  RocketMQ 支持2種刷盤方式,在 Broker 啟動時:

    • 配置 flushDiskType = SYNC_FLUSH 表示同步刷盤
    • 配置 flushDiskType = ASYNC_FLUSH 表示異步刷盤

                                                 GroupCommitService 就是 org.apahce.rocketmq.store.CommitLog.GroupCommitServie —— 同步刷盤服務。在 Broker 存儲消息到 Page Cache 后,同步將 Page Cache 刷到磁盤,再返回客戶端消息並寫入結果,具體過程如下所示: 

  FlushRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.FlushRealTimeService —— 異步刷盤服務。在 Broker 存儲消息到 Page Cache 后,立即返回客戶端寫入結果,然后異步刷盤服務將 Page Cache 異步刷盤到磁盤。

  CommitRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.CommitRealTimeService —— 異步轉存服務。Broker 通過配置讀寫分離將消息寫入直接內存(Direct Memory),簡稱 DM),然后通過異步轉存服務,將 DM 中的數據再次存儲到 Page Cache 中,以供異步刷盤服務將 Page Cache 刷到磁盤中,轉存服務過程如下:

  將消息成功保存到 CommitLog 映射文件后,調用 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush() 方法處理刷盤邏輯,代碼如下:

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {  2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {  4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;  5             if (messageExt.isWaitStoreMsgOK()) {  6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());  7  service.putRequest(request);  8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();  9                 PutMessageStatus flushStatus = null; 10                 try { 11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), 12  TimeUnit.MILLISECONDS); 13                 } catch (InterruptedException | ExecutionException | TimeoutException e) { 14                     //flushOK=false;
15  } 16                 if (flushStatus != PutMessageStatus.PUT_OK) { 17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() 18                         + " client address: " + messageExt.getBornHostString()); 19  putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 20  } 21             } else { 22  service.wakeup(); 23  } 24  } 25         // Asynchronous flush
26         else { 27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 28  flushCommitLogService.wakeup(); 29             } else { 30  commitLogService.wakeup(); 31  } 32  } 33     }

  通過以上代碼可知,同步刷盤、異步刷盤都是在這里發起的。異步刷盤的實現根據是否配置讀寫分離機制而稍有不同。

  接下來我們介紹兩種刷盤方式:

  (1)同步刷盤:

  同步刷盤是一個后台線程服務 ,消息進行同步刷盤的流程如下圖:

  存儲消息線程:主要負責將消息存儲到 Page Cache 或者 DM 中,存儲成功后通過調用 handleDiskFlush() 方法將同步刷盤請求 "發送" 給 GroupCommitService 服務,並在該刷盤請求上執行鎖等待,代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java 中 handleDiskFlush(),具體代碼如下:

 1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2         // Synchronization flush
 3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5             if (messageExt.isWaitStoreMsgOK()) { #客戶端可以設置,默認為True  6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7                 service.putRequest(request); #保存同步磁盤請求  8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); #請求同步鎖等待  9                 PutMessageStatus flushStatus = null;
10                 try {
11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
12                             TimeUnit.MILLISECONDS);
13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
14                     //flushOK=false;
15                 }
16                 if (flushStatus != PutMessageStatus.PUT_OK) {
17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() #記錄刷盤超時設置 18                         + " client address: " + messageExt.getBornHostString());
19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
20                 }
21             } else {
22                 service.wakeup();    #異步刷盤,不用同步返回 23             }
24         }
25         // Asynchronous flush
26         else {
27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
28                 flushCommitLogService.wakeup();
29             } else {
30                 commitLogService.wakeup();
31             }
32         }
33     }

  同步刷盤服務線程:通過 GroupCommitService 類實現的同步刷盤服務。

  具體同步刷盤是怎么執行的,執行完成后又是如何將刷盤結果通知存儲數據線程的呢?

  正常同步刷盤線程會間隔 10ms 執行一次 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法,該方法循環每一個同步刷盤請求,如果刷盤成功,那么喚醒等待刷盤請求鎖的存儲消息線程,並告知刷盤成功。

  由於操作系統刷盤耗時及每次刷多少字節數據到磁盤等,都不是 RocketMQ 進程能掌控的,所以在每次刷盤前都需要做必要的檢查,以確認當前同步刷盤請求對應位點的消息是否已經被刷盤,如果已經被刷盤,當前刷盤請求就不需要執行。

  在 RocketMQ 進程正常關閉時,如果有同步刷盤請求未執行完,那么數據會丟失嗎?

  答案是:不會的。在上圖,我們得知,關閉刷盤服務時,會執行 Thread.sleep(10) 等待所有的同步刷盤請求保存到刷盤請求隊列中后,交換保存刷盤請求的隊列,再執行 doCommit() 方法。

(2)異步刷盤:

  如果 Broker 配置讀寫分離,則異步刷盤過程包含異步轉存數據和真正的異步刷盤操作。

  異步轉存數據是通過 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法實現的。

  下面將介紹異步轉存數據服務的核心的執行過程。

  (1)獲取轉存參數。整個轉存過程的參數都是可配置的。

  (2)執行轉存數據。

  (3)轉存失敗,喚醒異步刷盤線程。轉存數據失敗,並不代表沒有數據被轉存到 Page Cache 中,而是說明有部分數據轉存成功,部分數據轉存失敗。所以可以喚醒刷盤線程執行刷盤操作。而如果轉存成功,則正常執行異步刷盤即可。

  在異步轉存服務和存儲服務把消息寫入 Page Cache 后,由異步刷盤將消息刷入磁盤中。異步刷盤服務的主要功能是將 Page Cache 中的數據異步刷入磁盤,並記錄 Checkpoint 信息。異步刷盤的實現代碼主要在 org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run() 方法中,步驟拆解如下:

  第一步:獲取刷盤參數;

  第二步:等待刷盤間隔;

  第三步:執行刷盤;

  第四步:記錄 CheckPoint 和耗時日志。這里主要記錄最后刷盤成功過時間和刷盤耗時超過 500ms 的情況。

總結:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM