rocketmq消息存儲概述


了解消息存儲部分首先需要關注的幾個方法,load()--Load previously stored messages、start()--Launch this message store、putMessage--Store a(or batch) message into store.
以及一些關鍵詞:
  commitLog :      消息的物理存儲相關
   consumeQueue:    邏輯隊列存儲相關
  IndexFile:           消息存儲索引
  刷盤:             將寫入內存的消息持久化
  主從同步(HAService):       Master中的數據同步到Slave中
  load()方法:         用於重啟時,加載數據
load()方法用於重啟時,加載數據,初始化boker時,boker中的initialize方法中會調用messageStore.load(),包括:commitLog.load()、 loadConsumeQueue()、 indexService.load、 recover(lastExitOK)
 
正文:
(一)消息存儲開啟基礎服務 -- 在后台運行,時刻准備為存儲服務
boker啟動時,會初始化DefaultMessageStore,調用DefaultMessageStore.start()服務。
I. 初始化DefaultMessageStore時開啟的服務
  預分配MapedFile對象服務(線程):AllocateMapedFileService
  分發消息索引服務(線程):   DispatchMessageService --(注:rockemq4.0版本中拋棄了該服務,對應變成了CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex內部類)
  消息索引服務(線程):     IndexService
II. DefaultMessageStore.start()會開啟的服務(或服務線程)
  邏輯隊列刷盤服務(線程):  FlushConsumeQueueService
  物理隊列刷盤服務(線程):  FlushCommitLogService (該服務在初始化commitlog對象時開啟)
  運行時數據統計服務(線程):    StoreStatsService
  從物理隊列解析消息重新發送到邏輯隊列服務(線程):ReputMessageService
  HA服務: HAService
  定時服務:ScheduleMessageService,如定時刪除過期文件--cleanFilesPeriodically等。
(注:這些服務對象基本都在初始化DefaultMessageStore實例對象時被創建)
 
 
(二) 存儲過程
rockemq4.0數據存儲的過程與之前的版本存入過程與有很大的不同:
  如rocketmq 3.2.4中只有角色為SLAVE的boker會開啟ReputMessageService服務。
  如rockemq4.0中將之前版本中廢除了處理分發消息索引服務DispatchMessageService服務,更改為這兩個類CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex。
  如rocketmq 3.2.4可以通過是否開啟消息索引功能可以控制是否執行 Index索引。
 
2.1 rocketmq 3.2.4版本的數據存入過程:
I. commitLog數據存入
  • 如果boker角色為MASTER
  生產者每寫入一條數據,boker端接受到消息后,DefaultMessageStore.putMessage調用Commit.putMessage方法,PutMessage首先要檢查一些條件,比如:
    1. 每條數據第一寫入的broker的屬性必須為master,否則回返回PutMessageStatus.SERVICE_NOT_AVAILABLE狀態,“message store is slave mode, so putMessage is forbidden ”. 
    2. 這條msg是否具有被寫入的權限,否則回返回PutMessageStatus.SERVICE_NOT_AVAILABLE狀態,"message store is not writeable, so putMessage is forbidden ".
    3. message topic長度校驗
    4. message properties長度校驗
  Commit.putMessage 首先將數據寫入到commitlog對應的mapedFile中,每寫入一條消息,通過mapedFile.appendMessage追加到MapedFile文件中,當MapedFile寫滿后,生成一個新的MapedFile,然后向這個MapedFile中追加消息,如此不斷 ... ...,這些MapedFile裝在MapedFileQueue中。
  commitLog中每向mapedFile中寫入一個消息后,會返回一個AppendMessageResult對象,根據AppendMessageResult與msg消息信息,生成一個DispatchRequest對象,調用commit的內部類DispatchMessageService.putRequest(dispatchRequest)方法,將寫入的消息對應dispatchRequest寫入到定義的List<DispatchRequest> requestsWrite列表中。
  • 如果boker角色為SLAVE
  沒有putMessage過程,數據加載通過HAService進行主從同步,同步MASTER中的邏輯隊列,向commitLog存入數據。(過程比較復雜,有機會以后單獨成文分析)
II. consumeQueue數據存入(indexFile數據存入可選)
  • 如果boker角色為MASTER
DispatchMessageService線程在后台一直運行,不斷執行doDispatch()
while (!this.isStoped()) {
  try {
    this.waitForRunning(0);
    this.doDispatch();
  } catch (Exception e) {
    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
  }
}
doDispatch()會將requestsWrite列表中的dispatchRequest處理,將它們轉換成consumeQueue單元結構對應數據,這些數據追加到consumerQueue對應的MapedFile中。然后添加到consumerQueue的MapedFileQueue中。如果開啟了消息索引功能即: isMessageIndexEnable==true,則將requestsWrite列表中的dispatchRequest傳給indexService服務,然后indexService將這些消息寫入IndexFile中。
  • 如果boker角色為SLAVE
  由於生產者產生消息不會直接到SLAVE,因此在SLAVE不會執行putMessage邏輯,它主要靠 ReputMessageService 服務線程,從物理隊列(commitlog)解析消息重新發送到邏輯隊列,大致過程為: 從物理隊列解析數據,生成dispatchRequest,如果數據正常,則將dispatchRequest傳入給DispatchMessageService的List<DispatchRequest> requestsWrite,之后DispatchMessageService處理dispatchRequest的過程與上文一樣。
 
2.2. rocketmq 4.0 版本的數據存入過程
I. commitLog數據存入過程基本不變
  不同的是,commit.putMessage過程並不會根據AppendMessageResult與msg消息信息,生成一個DispatchRequest對象,該版本中DispatchRequest對象的生成過程 放在了ReputMessageService中,通過ReputMessageService生成DispatchRequest對象。該版本中ReputMessageService服務線程不像rocketmq 3.2.4中那樣只為boker角色為SLAVE單獨開設。
II. consumeQueue與indexFile數據存入
  rocketmq4.0中此過程的核心服務是 ReputMessageService,與之前版本不同的是在rocketmq4.0版本中,consumeQueue與indexFile數據存入的服務線程獨立出來了,分別使用CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex類處理,初始化DefaultMessageStore時,將這兩個類存放入dispatcherList列表中:
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
在ReputMessageService服務線程啟開后,不斷從commitLog中解析數據,生成dispatchRequest :
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
 之后向dispatcherList中的所有分發器分發dispatcherList:
DefaultMessageStore.this.doDispatch(dispatchRequest);
即執行:
CommitLogDispatcherBuildConsumeQueue.doDispatch(dispatchRequest)生成consumeQueue數據
CommitLogDispatcherBuildIndex.doDispatch(dispatchRequest)生成IndexFile數據
 
 
(三) 數據寫入內存小結
即:內存映射
生成commitLog數據的核心接口:
this.commitLog.putMessage(msg)
將數據寫入到commitlog對應的MapedFiLe對象中。
生成consumeQueue數據的核心接口:
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
  ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
  cq.putMessagePositionInfoWrapper(dispatchRequest);
}
將數據寫入到consumeQueue對應的MapedFiLe對象中。
生成IndexFile數據的核心接口:
DefaultMessageStore.this.indexService.buildIndex(request);
將數據寫入到IndexFile對應的MapedFiLe對象中。
 
 
(四)  內存文件落地 -- 刷盤
上文介紹了數據如何寫入到邏輯隊列、物理隊列、索引的MapedFiLe中,這里介紹如何將邏輯隊列、物理隊列內存數據持久化到磁盤(索引文件的寫入可以在以后的文章中單獨分析)。
 
邏輯隊列、物理隊列內存文件刷盤方式相同,它們生成的MapedFile文件會放在各自對應的MapedFileQueue對象中,通過刷盤的方式,將MapedFileQueue持久化到物理磁盤上。
初始化DefaultMessageStore的時候會開啟: 邏輯隊列刷盤服務線程--FlushConsumeQueueService、將ConsumeQueue.mapedFileQueues刷入磁盤;
初始化commitlog對象時開啟:物理隊列刷盤服務線程--FlushCommitLogService,將commitlog.mapedFileQueues刷入磁盤。
這兩個線程會分別將MapedFileQueue持久化到物理磁盤上。
對於commitlog的刷盤策略:
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
  this.flushCommitLogService = new GroupCommitService();
} else {
  this.flushCommitLogService = new FlushRealTimeService();
}
異步刷盤使用的是FlushRealTimeService, 同步刷盤使用的是GroupCommitService
刷盤過程要涉及到MapedFile,MapedFile以及java NIO相關的知識如MappedByteBuffer、FileChannel,可以學到的到東西很多,具體的刷盤實現過程見(下一篇)。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM