RocketMQ消息存儲原理總結(一)


1.RocketMQ的存儲架構

 

1.1存儲特點

  • 消息主體以及元數據都存儲在CommitLog當中
  • Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
  • 每次讀取消息隊列先讀取consumerQueue,然后再通過consumerQueue去commitLog中拿到消息主體。

1.2為什么要這樣設計?
rocketMQ的設計理念很大程度借鑒了kafka,所以有必要介紹下kafka的存儲結構設計:

存儲特點:和RocketMQ類似,每個Topic有多個partition(queue),kafka的每個partition都是一個獨立的物理文件,消息直接從里面讀寫。
根據之前阿里中間件團隊的測試,一旦kafka中Topic的partitoin數量過多,隊列文件會過多,會給磁盤的IO讀寫造成很大的壓力,造成tps迅速下降。所以RocketMQ進行了上述這樣設計,consumerQueue中只存儲很少的數據,消息主體都是通過CommitLog來進行讀寫。
那么RocketMQ這樣處理有什么優缺點?

  • 優點:1、隊列輕量化,單個隊列數據量非常少。對磁盤的訪問串行化,避免磁盤竟爭,不會因為隊列增加導致IOWAIT增高。
  • 缺點:寫雖然完全是順序寫,但是讀卻變成了完全的隨機讀。讀一條消息,會先讀ConsumeQueue,再讀CommitLog,增加了開銷。要保證CommitLog與ConsumeQueue完全的一致,增加了編程的復雜度。

以上缺點如何克服:

  • 隨機讀,盡可能讓讀命中page cache,減少IO讀操作,所以內存越大越好。如果系統中堆積的消息過多,讀數據要訪問磁盤會不會由於隨機讀導致系統性能急劇下降,答案是否定的。
  • 訪問page cache 時,即使只訪問1k的消息,系統也會提前預讀出更多數據,在下次讀時,就可能命中內存。
  • 隨機訪問Commit Log磁盤數據,系統IO調度算法設置為NOOP方式,會在一定程度上將完全的隨機讀變成順序跳躍方式,而順序跳躍方式讀較完全的隨機讀性能會高5倍以上。
  • 另外4k的消息在完全隨機訪問情況下,仍然可以達到8K次每秒以上的讀性能。
  • 由於Consume Queue存儲數據量極少,而且是順序讀,在PAGECACHE預讀作用下,Consume Queue的讀性能幾乎與內存一致,即使堆積情況下。所以可認為Consume Queue完全不會阻礙讀性能。
  • Commit Log中存儲了所有的元信息,包含消息體,類似於Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使數據丟失,仍然可以恢復出來。

2、CommitLog文件
要想知道RocketMQ如何存儲消息,我們先看看CommitLog。在RocketMQ中,所有topic的消息都存儲在一個稱為CommitLog的文件中,該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。通過CommitLog,RocketMQ將所有消息存儲在一起,以順序IO的方式寫入磁盤,充分利用了磁盤順序寫減少了IO爭用提高數據存儲的性能,消息在CommitLog中的存儲格式如下:

  • 4字節表示消息的長度,消息的長度是整個消息體所占用的字節數的大小
  • 4字節的魔數,是固定值,有MESSAGE_MAGIC_CODE和BLANK_MAGIC_CODE
  • 4字節的CRC,是消息體的校驗碼,用於防止網絡、硬件等故障導致數據與發送時不一樣帶來的問題
  • 4字節的queueId,表示消息發到了哪個MessageQueue(邏輯上相當於kakka的partition)
  • 4字節的flag,flag是創建Message對象時由生產者通過構造器設定的flag值
  • 8字節的queueOffset,表示在queue中的偏移量
  • 8字節的physicalPosition,表示在存儲文件中的偏移量
  • 4字節sysFlag,是生產者相關的信息標識,具體生產邏輯可以看相關代碼
  • 8字節消息創建時間
  • 8字節消息生產者的host
  • 8字節消息存儲時間
  • 8字節消息存儲的機器的host
  • 4字節表示重復消費次數
  • 8字節消息事務相關偏移量
  • 4字節表示消息體的長度
  • 消息休,不是固定長度,和前面的4字節的消息體長度值相等
  • 1字節表示topic的長度,因此topc的長度最多不能超過127個字節,超過的話存儲會出錯(有前置校驗)
  • Topic,存儲topic,因為topic不是固定長度,所以這里所占的字節是不固定的,和前一個表示topic長度的字節的值相等
  • 2字節properties的長度,properties是創建消息時添加到消息中的,因此,添加在消息中的poperties不能太多太大,所有的properties的kv對在拼接成string后,所占的字節數不能超過2^15-1
  • Properties的內容,也不是固定長度,和前面的2字節properties長度的值相同

3、ConsumeQueue文件
一個ConsumeQueue表示一個topic的一個queue,類似於kafka的一個partition,但是rocketmq在消息存儲上與kafka有着非常大的不同,RocketMQ的ConsumeQueue中不存儲具體的消息,具體的消息由CommitLog存儲,ConsumeQueue中只存儲路由到該queue中的消息在CommitLog中的offset,消息的大小以及消息所屬的tag的hash(tagCode),一共只占20個字節,整個數據包如下:

4、消息存儲方式
前文已經描述過,RocketMQ的消息存儲由CommitLog和ConsumeQueue兩部分組成,其中CommitLog用於存儲原始的消息,而ConsumeQueue用於存儲投遞到某一個queue中的消息的位置信息,消息的存儲如下圖所示:

消費者在讀取消息時,先讀取ConsumeQueue,再通過ConsumeQueue中的位置信息讀取CommitLog,得到原始的消息。

5、RocketMQ消息存儲結構類型及缺點
上圖即為RocketMQ的消息存儲整體架構,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。而Kafka采用的是獨立型的存儲結構,每個隊列一個文件。這里小編認為,RocketMQ采用混合型存儲結構的缺點在於,會存在較多的隨機讀操作,因此讀的效率偏低。同時消費消息需要依賴ConsumeQueue,構建該邏輯消費隊列需要一定開銷。
6、RocketMQ消息存儲架構深入分析
從上面的整體架構圖中可見,RocketMQ的混合型存儲結構針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息,至於消費的時間可以稍微滯后一些也沒有太大的關系。退一步地講,即使Consumer端第一次沒法拉取到待消費的消息,Broker服務端也能夠通過長輪詢機制等待一定時間延遲后再次發起拉取消息的請求。
這里,RocketMQ的具體做法是,使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據(ps:對於該服務線程在消息消費篇幅也有過介紹,不清楚的童鞋可以跳至消息消費篇幅再理解下)。然后,Consumer即可根據ConsumerQueue來查找待消費的消息了。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)則只是為了消息查詢提供了一種通過key或時間區間來查詢消息的方法(ps:這種通過IndexFile來查找消息的方法不影響發送與消費消息的主流程)。
7、PageCache與Mmap內存映射
這里有必要先稍微簡單地介紹下page cache的概念。系統的所有文件I/O請求,操作系統都是通過page cache機制實現的。對於操作系統來說,磁盤文件都是由一系列的數據塊順序組成,數據塊的大小由操作系統本身而決定,x86的linux中一個標准頁面大小是4KB。
操作系統內核在處理文件I/O請求時,首先到page cache中查找(page cache中的每一個數據塊都設置了文件以及偏移量地址信息),如果未命中,則啟動磁盤I/O,將磁盤文件中的數據塊加載到page cache中的一個空閑塊,然后再copy到用戶緩沖區中。
page cache本身也會對數據文件進行預讀取,對於每個文件的第一個讀請求操作,系統在讀入所請求頁面的同時會讀入緊隨其后的少數幾個頁面。因此,想要提高page cache的命中率(盡量讓訪問的頁在物理內存中),從硬件的角度來說肯定是物理內存越大越好。從操作系統層面來說,訪問page cache時,即使只訪問1k的消息,系統也會提前預讀取更多的數據,在下次讀取消息時, 就很可能可以命中內存。
在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue的讀性能會比較高近乎內存,即使在有消息堆積情況下也不會影響性能。而對於CommitLog消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統IO調度算法,比如設置調度算法為“Noop”(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。
另外,RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型直接將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(這里需要注意的是,采用MappedByteBuffer這種內存映射的方式有幾個限制,其中之一是一次只能映射1.5~2G 的文件至用戶態的虛擬內存,這也是為何RocketMQ默認設置單個CommitLog日志數據文件為1G的原因了)。

8、RocketMQ文件存儲模型層次結構

9、RocketMQ文件存儲模型層次結構 RocketMQ文件存儲模型層次結構如上圖所示,根據類別和作用從概念模型上大致可以划分為5層,下面將從各個層次分別進行分析和闡述:
(1)RocketMQ業務處理器層:Broker端對消息進行讀取和寫入的業務邏輯入口,這一層主要包含了業務邏輯相關處理操作(根據解析RemotingCommand中的RequestCode來區分具體的業務操作類型,進而執行不同的業務處理流程),比如前置的檢查和校驗步驟、構造MessageExtBrokerInner對象、decode反序列化、構造Response返回對象等;
(2)RocketMQ數據存儲組件層;該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數據文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志數據文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該組件初始化時候,還會啟動很多存儲相關的后台服務線程,包括AllocateMappedFileService(MappedFile預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統計服務線程)、IndexService(索引文件服務線程)等;
(3)RocketMQ存儲邏輯對象層:該層主要包含了RocketMQ數據文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數據文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數據文件提供訪問服務。這三個模型類也是構成了RocketMQ存儲層的整體結構(對於這三個模型類的深入分析將放在后續篇幅中);
(4)封裝的文件內存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數據文件的讀寫。其中,采用MappedByteBuffer這種內存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。這里限制的問題在上面已經講過;對於每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的字節大小數+1,即為文件的起始偏移量,從而實現了整個大文件的串聯。這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、內存數據刷盤、內存清理等和文件相關的服務);
(5)磁盤存儲層:主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(如IOPS、吞吐量和訪問時延等指標)對順序寫/隨機讀操作帶來的影響(ps:小編建議在正式業務上線之前做好多輪的性能壓測,具體用壓測的結果來評測);
10、總結
RocketMQ消息存儲部分的內容與其他所有篇幅(RocketMQ的Remoting通信、普通消息發送和消息消費部分)相比是最為復雜的,需要讀者反復多看源碼並多次對消息讀和寫進行Debug(可以通過在Broker端的SendMessageProcessor/PullMessageProcesssor/QueryMessaageProcessor幾個業務處理器入口,在其重要方法中打印相關重要屬性值的方式或者一步步地Debug代碼,來仔細研究下其中的存儲過程),反復幾次后才可以對消息存儲這部分有一個較為深刻的理解,同時也有助於提高對RocketMQ的整體理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM