一、MQ存儲分類
MQ存儲主要分為以下三類:
文件系統:RocketMQ/Kafka/RabbitMQ
關系型數據庫DB:ActiveMQ(默認采用的KahaDB做消息存儲)可選用JDBC的方式來做消息持久化
分布式KV存儲:ZeroMQ
對比:
存儲效率, 文件系統>分布式KV存儲>關系型數據庫DB
易於實現和快速集成,關系型數據庫DB>分布式KV存儲>文件系統,但是性能會下降很多
二、RocketMQ存儲概要
(一)存儲文件
RocketMQ文件存儲在rocketmq文件夾下的store文件夾內,里面包含commitlog、config、consumerqueue、index這四個文件夾和abort、checkpoint兩個文件。
其中,commitlog內存儲的是消息內容
config內存儲的是一些配置信息
consumerqueue存儲的是topic信息
index存儲的是消息隊列的索引文件
abort主要是標記mq是正常退出還是異常退出
checkpoint文件存儲的是commitlog、consumerqueue、index文件的刷盤時間
對於以上內容,下文會詳細說明。
文件層級如下(一個層級代表一個文件夾):
rocketmq
|--store
|-commitlog
| |-00000000000000000000
| |-00000000001073741824
|-config
| |-consumerFilter.json
| |-consumerOffset.json
| |-delayOffset.json
| |-subscriptionGroup.json
| |-topics.json
|-consumequeue
| |-SCHEDULE_TOPIX_XXX
| |-topicA
| |-topicB
| |-0
| |-1
| |-2
| |-3
| |-00000000000000000000
| |-00000000001073741824
|-index
| |-00000000000000000000
| |-00000000001073741824
|-abort
|-checkpoint
(二)對象封裝
對於磁盤上的對劍,在程序內都有對應的封裝對象,實際操作的時,啟動時,加載磁盤內容到封裝對象,處理時,處理的是封裝的對象,最后再刷盤到磁盤中。
(1)CommitLog: 對應commitlog文件
(2)ConsumeQueue:對應consumerqueue文件
(3)IndexFile:對應index文件
(4)MappedFile:文件存儲的直接內存映射業務抽象封裝類,源碼中通過操作該類,可以把消息字節寫入內存映射緩存區(commit),或者原子性地將消息持久化的刷盤(flush);
(5)MapedFileQueue:對連續物理存儲的抽象封裝類,源碼中可以通過消息存儲的物理偏移量位置快速定位該offset所在MappedFile(具體物理存儲位置的抽象)、創建、刪除MappedFile等操作;
(6)MappedFileBuff:堆外內存
三、文件存儲
(一)存儲對象關系
消息存儲的主要流程:
生產者發送消息后,消息存儲:
(1)將消息內容存入commitlog文件
(2)將topic信息存入consumerqueue文件,里面存入包括topic、起始偏移量、消息長度等內容
(3)將一些索引信息存入index文件
消費者消費信息時,根據topic查詢consumerqueue文件,找到對應的topic,開始讀取消息,此時讀取到的數據是消息的起始偏移量和消息長度,根據消息的起始偏移量從commitlog中查找對應的偏移量位置,然后根據消息長度取commitlog中的數據,即取到了指定的消息內容。
如果消費者獲取消息使用了tag標簽,會使用index文件,獲取消息內容方式與上面類似。
具體流程如下圖所示:
(二) 文件存儲對象間流程
RocketMQ使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。
此處有一個關鍵參數:reputFromOffset
消息允許重復:reputFromOffset = commitlog的提交指針
消息不允許重復:reputFromOffset = commitlog中內存的最大偏移量
commitDispacherBuildConsumeQueue(handler):構建消息消費隊列
1、 根據消息主題和消息ID獲取消息消費隊列ConsumeQueue
2、 依次將消息的偏移量、消息長度、taghash寫入ByteBufff,然后根據ConsumeQueueOffset計算出ConsumeQueue的物理地址,將內容追加到內存映射文件中
commitDispacherBuildIndex(handler):構建索引文件
這里有個配置項:messageIndexEnable,如果位true,則會構建索引文件
1、 創建或獲取indexFile的最大物理偏移量,如果該消息的物理偏移量小於索引文件的物理偏移量,說明是重復消息,則忽略本次構建
2、 如果索引唯一鍵不為空,則添加到hash索引中
3、 構建索引列
(三)文件存儲流程(以commitlog為例)
對於文件的存儲,以commitlog為例,當一次寫動作完成時,提交指針(commitedPosition)和寫指針(wrotePosition)位置一致,當新的消息到來時,寫指針會向后移動,如果發生commit動作時,會將寫指針和提交指針之間的內容提交到堆外緩存中(MapprByteBuffer)。
堆外緩存有提交指針(commitPosition)、寫指針(wrotePosition)和刷盤指針(flushPosition);當一次提交后,提交指針和寫指針是在同一位置,當一次刷盤操作完成時,刷盤指針和提交指針在同一位置。以三個指針都在同一位置為例,當消息被提交到堆外緩存時,堆外緩存的寫指針向后移動;當到達需要提交時,提交之后,提交指針和寫指針在同一位置;當需要刷盤時,將提交指針與刷盤指針之間的內容,寫入磁盤(此使寫指針一直在寫,一直在向后移動),因此,通常情況下,寫指針的位置在最前,刷盤指針的位置在最后。
對於刷盤,有同步刷盤和異步刷盤兩種方式,在源碼中,存在一個參 transientStorePoolEable ,是否開啟堆外內存,如果開啟,則是異步刷盤,否則,則是同步刷盤。
流程:
同步刷盤:內存映射文件直接寫入磁盤
這里需要特殊說一個機制:為了避免同步刷盤消費任務與其他消息生產者提交的任務直接產生競爭鎖,因此GroupCommitService提供了寫容器和讀容器,每次刷盤完畢后,兩者會做身份交換。因此后面說到consumerqueue中默認有4個隊列,實際上一讀一寫,默認是8個隊列。
異步刷盤:內存映射文件寫入堆外內存后異步刷盤
Commitlog存儲消息流程:
1、 消息寫入,寫指針往后移動
2、 異步提交commit(commitRealTimeService)
(1)異步Commit的條件(commitRealTimeService):
commit條件主要有三個維度:
a、執行間隔時間
b、最小提交頁數
c、兩次執行最大實際間隔
執行條件:
a、 到執行時間(每200ms執行一次),如果提交頁數大於最小提交頁數,則提交
b、距上次提交時間間隔超過了兩次執行的最大執行間隔
(2)Commit流程
a、校驗broker狀態、角色、消息大小
b、延遲隊列的特殊處理
c、獲取當前可以寫入的commitlog文件
d、獲取putMessageLock,准備寫入(由此可見,寫入時串行寫入)
e、設置消息的存儲時間(如果沒有文件,則創建一個新文件)
f、 將消息加載到MappedFile中
g、創建全局唯一的消息ID
h、獲取消息在隊列的偏移量計算消息
i、機選消息總長度
j、如果消息總長度+8>commitlog的空閑長度,則新建一個commitlog文件(8個長度表示文件剩余長度+魔數)
k、將消息存到buff中(內存映射文件)
l、更新消息隊列偏移量
m、釋放putMessageLock
3、內存映射更新寫指針位置
4、 移動提交指針到上次提交時的寫指針
5、 異步flush(FlushRealTimeService)
(1)異步執行條件
可執行維度:
a、等待方式(await/sleep)
b、線程運行的時間間隔
c、一次刷寫最小頁數
d、兩次執行的最大間隔(10s)
可譯性條件與異步提交commit一致
(2)執行步驟
a、異步commit執行成功,喚醒刷盤線程,flushRealTimeService
b、執行條件通過,提交線程
c、刷盤完成,更新checkpoint中刷盤時間點(將內存中數據寫入磁盤(FileChannel中的force),更新checkpoint中commitlog文件刷盤時間戳)
說明:checkpoint中commitlog文件刷盤時間戳刷盤在更新消息消費隊列時觸發。
四、文件恢復
(一)consumeQueue和Index恢復
1、判斷上次退出是否時異常,如果時異常退出
2、加載延遲隊列
3、判斷commitlog文件大小是否與配置文件大小一致,如果不一致,刪除commitlog文件,創建MappedFile對象
4、加載消息消費隊列,構建consumeQueue對象
5、加載checkpoint
6、加載索引文件
如果上次異常退出且索引文件的上次刷盤時間小於索引文件的最大的消息時間戳,則立即銷毀該文件
7、執行恢復策略
8、consumeQueue恢復后,在commitlog存儲消息的邏輯偏移量
(二)正常退出文件恢復
1、從倒數第三個文件開始恢復,如果不足三個文件,則從第一個文件開始恢復
2、校驗消息。
mappedFileOffset:校驗通過的偏移量
processOffset:文件已確認的偏移量
(1)消息查找校驗為true,且消息大小大於0,說明是正常消息存儲,繼續校驗下一個消息
(2)消息查找校驗為true,消息大小為0,說明是到了文件尾部,繼續下一個文件
(3)消息查找校驗為false,說明該文件未填滿,結束循環處理(此處即為消息的偏移量)
3、更新MappedFileQueue中的刷盤指針和提交指針到offset
4、刪除offset之后的所有文件
(1)offset > 文件尾部offse,說明是正常文件,忽略
(2)文件頭部offset < offset < 文件尾部offset,說明offset在該文件偏移量內,設置MappedFile的commitPosition和flushPosition
(3)offset < 文件頭部offset,說明是在有效文件之后創建的,刪除(清理MappedFile占用的資源,刪除物理文件)
(三)異常退出文件恢復
異常退出恢復的流程和正常退出文件恢復的流程基本一致,有兩點差異:
1、 文件讀取順序
正常恢復:從倒數第三個文件開始,向后遍歷
異常恢復:從最后一個開始,向前遍歷到第一個正確存儲的文件
2、 空文件夾處理
正常:無需處理
異常恢復:如果commitlog文件夾是空的,則刪除消息消費隊列下的所有文件
判斷是否是正確文件:
1、 魔數判斷
2、 文件的第一條消息長度為0,說明未存儲消息
3、 對比文件第一條消息的offset,與checkpoint中(commitlog/consumeQueue/index)的刷盤時間對比
第一條消息offset < checkpoint中刷盤時間,說明是正確文件
4、 驗證合法性,轉發到MappedFile
5、 如果未找到MappedFile,重置commitPosition和flushPosition,銷毀消息消費隊列文件
五、文件刪除
(一)觸發條件(任意滿足一條即可觸發):
1、 每天凌晨執行定時任務(4點)
2、 磁盤不足
頻率:每10ms查詢一次磁盤是否充足,不充足,則調用文件刪除
磁盤不足維度:
(1)文件所在磁盤的最大使用量
(2)磁盤使用率
(3)磁盤使用率閾值
(4)磁盤使用率預警值
磁盤不足條件:
(1)磁盤使用率大於預警閾值,建議立即清除文件
(2)磁盤使用率大於磁盤使用率閾值
3、 手動觸發(未封裝)
(二)判斷刪除哪些文件
觸發后是否要刪除文件,需要滿足以下條件的文件才會被刪除:
1、文件保存時間
2、刪除物理文件時間間隔
3、距第一次刪除被拒絕可保留時間
(三)刪除過程:
從倒數第二個文件開始
1、 刪除MappedFile所占用的資源
2、 刪除MappedFile對應的文件