了解消息存儲部分首先需要關注的幾個方法,load()--Load previously stored messages、start()--Launch this message store、putMessage--Store a(or batch) message into store.
以及一些關鍵詞:
commitLog
: 消息的物理存儲相關
consumeQueue: 邏輯隊列存儲相關
IndexFile: 消息存儲索引
刷盤: 將寫入內存的消息持久化
主從同步(HAService): Master中的數據同步到Slave中
load()方法: 用於重啟時,加載數據
load()方法用於重啟時,加載數據,初始化boker時,boker中的initialize方法中會調用messageStore.load(),包括:commitLog.load()、 loadConsumeQueue()、 indexService.load、 recover(lastExitOK)
正文:
(一)消息存儲開啟基礎服務 -- 在后台運行,時刻准備為存儲服務
boker啟動時,會初始化DefaultMessageStore,調用DefaultMessageStore.start()服務。
I. 初始化DefaultMessageStore時開啟的服務
預分配MapedFile對象服務(線程):AllocateMapedFileService
分發消息索引服務(線程): DispatchMessageService --(注:rockemq4.0版本中拋棄了該服務,對應變成了CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex內部類)
消息索引服務(線程): IndexService
II. DefaultMessageStore.start()會開啟的服務(或服務線程)
邏輯隊列刷盤服務(線程): FlushConsumeQueueService
物理隊列刷盤服務(線程): FlushCommitLogService (該服務在初始化commitlog對象時開啟)
運行時數據統計服務(線程): StoreStatsService
從物理隊列解析消息重新發送到邏輯隊列服務(線程):ReputMessageService
HA服務: HAService
定時服務:ScheduleMessageService,如定時刪除過期文件--cleanFilesPeriodically等。
(注:這些服務對象基本都在初始化DefaultMessageStore實例對象時被創建)
(二) 存儲過程
rockemq4.0數據存儲的過程與之前的版本存入過程與有很大的不同:
如rocketmq 3.2.4中只有角色為SLAVE的boker會開啟ReputMessageService服務。
如rockemq4.0中將之前版本中廢除了處理分發消息索引服務DispatchMessageService服務,更改為這兩個類CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex。
如rocketmq 3.2.4可以通過是否開啟消息索引功能可以控制是否執行 Index索引。
2.1 rocketmq 3.2.4版本的數據存入過程:
I. commitLog數據存入
- 如果boker角色為MASTER
生產者每寫入一條數據,boker端接受到消息后,DefaultMessageStore.putMessage調用Commit.putMessage方法,PutMessage首先要檢查一些條件,比如:
1. 每條數據第一寫入的broker的屬性必須為master,否則回返回PutMessageStatus.SERVICE_NOT_AVAILABLE狀態,“message store is slave mode, so putMessage is forbidden ”.
2. 這條msg是否具有被寫入的權限,否則回返回PutMessageStatus.SERVICE_NOT_AVAILABLE狀態,"message store is not writeable, so putMessage is forbidden ".
3. message topic長度校驗
4. message properties長度校驗
Commit.putMessage 首先將數據寫入到commitlog對應的mapedFile中,每寫入一條消息,通過mapedFile.appendMessage追加到MapedFile文件中,當MapedFile寫滿后,生成一個新的MapedFile,然后向這個MapedFile中追加消息,如此不斷 ... ...,這些MapedFile裝在MapedFileQueue中。
commitLog中每向mapedFile中寫入一個消息后,會返回一個AppendMessageResult對象,根據AppendMessageResult與msg消息信息,生成一個DispatchRequest對象,調用commit的內部類DispatchMessageService.putRequest(dispatchRequest)方法,將寫入的消息對應dispatchRequest寫入到定義的List<DispatchRequest> requestsWrite列表中。
- 如果boker角色為SLAVE
沒有putMessage過程,數據加載通過HAService進行主從同步,同步MASTER中的邏輯隊列,向commitLog存入數據。(過程比較復雜,有機會以后單獨成文分析)
II. consumeQueue數據存入(indexFile數據存入可選)
- 如果boker角色為MASTER
DispatchMessageService線程在后台一直運行,不斷執行doDispatch()
while (!this.isStoped()) { try { this.waitForRunning(0); this.doDispatch(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }
doDispatch()會將requestsWrite列表中的dispatchRequest處理,將它們轉換成consumeQueue單元結構對應數據,這些數據追加到consumerQueue對應的MapedFile中。然后添加到consumerQueue的MapedFileQueue中。如果開啟了消息索引功能即:
isMessageIndexEnable==true,則將requestsWrite列表中的dispatchRequest傳給indexService服務,然后indexService將這些消息寫入IndexFile中。
- 如果boker角色為SLAVE
由於生產者產生消息不會直接到SLAVE,因此在SLAVE不會執行putMessage邏輯,它主要靠
ReputMessageService 服務線程,從物理隊列(commitlog)解析消息重新發送到邏輯隊列,大致過程為: 從物理隊列解析數據,生成dispatchRequest,如果數據正常,則將dispatchRequest傳入給DispatchMessageService的List<DispatchRequest> requestsWrite,之后DispatchMessageService處理dispatchRequest的過程與上文一樣。
2.2. rocketmq 4.0 版本的數據存入過程
I. commitLog數據存入過程基本不變
不同的是,commit.putMessage過程並不會根據AppendMessageResult與msg消息信息,生成一個DispatchRequest對象,該版本中DispatchRequest對象的生成過程
只放在了ReputMessageService中,通過ReputMessageService生成DispatchRequest對象。該版本中ReputMessageService服務線程不像rocketmq 3.2.4中那樣只為boker角色為SLAVE單獨開設。
II. consumeQueue與indexFile數據存入
rocketmq4.0中此過程的核心服務是
ReputMessageService,與之前版本不同的是在rocketmq4.0版本中,consumeQueue與indexFile數據存入的服務線程獨立出來了,分別使用CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex類處理,初始化DefaultMessageStore時,將這兩個類存放入dispatcherList列表中:
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
在ReputMessageService服務線程啟開后,不斷從commitLog中解析數據,生成dispatchRequest :
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
之后向dispatcherList中的所有分發器分發dispatcherList:
DefaultMessageStore.this.doDispatch(dispatchRequest);
即執行:
CommitLogDispatcherBuildConsumeQueue.doDispatch(dispatchRequest)生成consumeQueue數據
CommitLogDispatcherBuildIndex.doDispatch(dispatchRequest)生成IndexFile數據
(三) 數據寫入內存小結
即:內存映射
生成commitLog數據的核心接口:
this.commitLog.putMessage(msg)
將數據寫入到commitlog對應的MapedFiLe對象中。
生成consumeQueue數據的核心接口:
public void putMessagePositionInfo(DispatchRequest dispatchRequest) { ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); cq.putMessagePositionInfoWrapper(dispatchRequest); }
將數據寫入到consumeQueue對應的MapedFiLe對象中。
生成IndexFile數據的核心接口:
DefaultMessageStore.this.indexService.buildIndex(request);
將數據寫入到IndexFile對應的MapedFiLe對象中。
(四) 內存文件落地 -- 刷盤
上文介紹了數據如何寫入到邏輯隊列、物理隊列、索引的MapedFiLe中,這里介紹如何將邏輯隊列、物理隊列內存數據持久化到磁盤(索引文件的寫入可以在以后的文章中單獨分析)。
邏輯隊列、物理隊列內存文件刷盤方式相同,它們生成的MapedFile文件會放在各自對應的MapedFileQueue對象中,通過刷盤的方式,將MapedFileQueue持久化到物理磁盤上。
初始化DefaultMessageStore的時候會開啟: 邏輯隊列刷盤服務線程--FlushConsumeQueueService、將ConsumeQueue.mapedFileQueues刷入磁盤;
初始化commitlog對象時開啟:物理隊列刷盤服務線程--FlushCommitLogService,將commitlog.mapedFileQueues刷入磁盤。
這兩個線程會分別將MapedFileQueue持久化到物理磁盤上。
對於commitlog的刷盤策略:
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { this.flushCommitLogService = new FlushRealTimeService(); }
異步刷盤使用的是FlushRealTimeService,
同步刷盤使用的是GroupCommitService
刷盤過程要涉及到MapedFile,MapedFile以及java NIO相關的知識如MappedByteBuffer、FileChannel,可以學到的到東西很多,具體的刷盤實現過程見(下一篇)。