RocketMQ原理


此篇幅將從以下幾個方面來分析RocketMQ其中運作原理:

消息的生產
消息的存儲
indexFile(客戶端指定key)
消息的消費
訂閱關系的一致性
offset管理
消費冪等
消息堆積與消費延遲
消息的清理
一、消息的生產

Producer可以將消息寫入到某Broker中的某Queue中,其經歷了如下過程:

Producer發送消息之前,會先向NameServer發出獲取消息Topic的路由信息的請求
NameServer返回該Topic的路由表及Broker列表
Producer根據代碼中指定的Queue選擇策略,從Queue列表中選出一個隊列,用於后續存儲消息
Produer對消息做一些特殊處理,例如,消息本身超過4M,則會對其進行壓縮
Producer向選擇出的Queue所在的Broker發出RPC請求,將消息發送到選擇出的Queue
二、消息的存儲

RocketMQ中的消息存儲在本地文件系統中,這些相關文件默認在當前用戶主目錄下的store目錄中。

abort:該文件在Broker啟動后會自動創建,正常關閉Broker,該文件會自動消失。若在沒有啟動Broker的情況下,發現這個文件是存在的,則說明之前Broker的關閉是非正常關閉

checkpoint:其中存儲着commitlog、consumequeue、index文件的最后刷盤時間戳

commitlog:其中存放着commitlog文件,而消息是寫在commitlog文件中的

conæg:存放着Broker運行期間的一些配置數據

consumequeue:其中存放着consumequeue文件,隊列就存放在這個目錄中

index:其中存放着消息索引文件indexFile

lock:運行期間使用到的全局資源鎖

1、commitlog文件

commitlog目錄中存放着很多的mappedFile文件(源碼真實命名),當前Broker中的所有消息都是落盤到這些mappedFile文件中的。mappedFile文件大小為1G(小於等於1G),文件名由20位十進制數構成,表示當前文件的第一條消息的起始位移偏移量。 需要注意的是,一個Broker中僅包含一個commitlog目錄,所有的mappedFile文件都是存放在該目錄中的。即無論當前Broker中存放着多少Topic的消息,這些消息都是被順序寫入到了mappedFile文件中的。也就是說,這些消息在Broker中存放時並沒有被按照Topic進行分類存放。

消息單元

mappedFile文件內容由一個個的消息單元構成。每個消息單元中包含消息總長度MsgLen、消息的物理位置physicalOffset、消息體內容Body、消息體長度BodyLength、消息主題Topic、Topic長度
TopicLength、消息生產者BornHost、消息發送時間戳BornTimestamp、消息所在的隊列QueueId、消息在Queue中存儲的偏移量QueueOffset等近20余項消息相關屬性。

2、consumequeue

為了提高效率,會為每個Topic在~/store/consumequeue中創建一個目錄,目錄名為Topic名稱。在該Topic目錄下,會再為每個該Topic的Queue建立一個目錄,目錄名為queueId。每個目錄中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根據consumequeue定位到具體的消息。

consumequeue文件名也由20位數字構成,表示當前文件的第一個索引條目的起始位移偏移量。與mappedFile文件名不同的是,其后續文件名是固定的。因為consumequeue文件大小是固定不變的。

每個consumequeue文件可以包含30w個索引條目,每個索引條目包含了三個消息重要屬性:消息在mappedFile文件中的偏移量CommitLog Offset、消息長度、消息Tag的hashcode值。這三個屬性占20個字節,所以每個文件的大小是固定的30w * 20字節。

3、對文件的讀寫

消息寫入

一條消息進入到Broker后經歷了以下幾個過程才最終被持久化。

Broker根據queueId,獲取到該消息對應索引條目要在consumequeue目錄中的寫入偏移量,即QueueOffset
將queueId、queueOffset等數據,與消息一起封裝為消息單元
將消息單元寫入到commitlog
同時,形成消息索引條目
將消息索引條目分發到相應的consumequeue

消息拉取

當Consumer來拉取消息時會經歷以下幾個步驟:

Consumer獲取到其要消費消息所在Queue的消費偏移量offset,計算出其要消費消息的消息offset
Consumer向Broker發送拉取請求,其中會包含其要拉取消息的Queue、消息offset及消息Tag

Broker計算在該consumequeue中的queueOffset

從該queueOffset處開始向后查找第一個指定Tag的索引條目

解析該索引條目的前8個字節,即可定位到該消息在commitlog中的commitlog offset

從對應commitlog offset中讀取消息單元,並發送給Consumer

三、indexFile

除了通過通常的指定Topic進行消息消費外,RocketMQ還提供了根據key進行消息查詢的功能。該查詢是通過store目錄中的index子目錄中的indexFile進行索引實現的快速查詢。當然,這個indexFile中的索引數據是在包含了key的消息被發送到Broker時寫入的。如果消息中沒有包含key,則不會寫入。

1、索引目錄結構

每個Broker中會包含一組indexFile,每個indexFile都是以一個時間戳命名的(這個indexFile被創建時的時間戳)。每個indexFile文件由三部分構成:indexHeader,slots槽位,indexes索引數據。每個indexFile文件中包含500w個slot槽。而每個slot槽又可能會掛載很多的index索引單元。

indexHeader固定40個字節,其中存放着如下數據:

beginTimestamp:該indexFile中第一條消息的存儲時間
endTimestamp:該indexFile中最后一條消息存儲時間
beginPhyoffset:該indexFile中第一條消息在commitlog中的偏移量commitlog offset
endPhyoffset:該indexFile中最后一條消息在commitlog中的偏移量commitlog offset
hashSlotCount:已經填充有index的slot數量(並不是每個slot槽下都掛載有index索引單元,這里統計的是所有掛載了index索引單元的slot槽的數量)

indexCount:該indexFile中包含的索引單元個數(統計出當前indexFile中所有slot槽下掛載的所有index索引單元的數量之和)

indexFile中最復雜的是Slots與Indexes間的關系。在實際存儲時,Indexes是在Slots后面的,但為了便於理解,將它們的關系展示為如下形式:

key的hash值 % 500w的結果即為slot槽位,然后將該slot值修改為該index索引單元的indexNo,根據這個indexNo可以計算出該index單元在indexFile中的位置。不過,該取模結果的重復率是很高的,為了解決該問題,在每個index索引單元中增加了preIndexNo,用於指定該slot中當前index索引單元的前一個index索引單元。而slot中始終存放的是其下最新的index索引單元的indexNo,這樣的話,只要找到了slot就可以找到其最新的index索引單元,而通過這個index索引單元就可以找到其之前的所有index索引單元。

index索引單元默寫20個字節,其中存放着以下四個屬性:

keyHash:消息中指定的業務key的hash值

phyOffset:當前key對應的消息在commitlog中的偏移量commitlog offset

timeDiff:當前key對應消息的存儲時間與當前indexFile創建時間的時間差

preIndexNo:當前slot下當前index索引單元的前一個index索引單元的indexNo

2、indexFile的創建

indexFile的文件名為當前文件被創建時的時間戳。 根據業務key進行查詢時,查詢條件除了key之外,還需要指定一個要查詢的時間戳,表示要查詢不大於該時間戳的最新的消息,即查詢指定時間戳之前存儲的最新消息。這個時間戳文件名可以簡化查詢,提高查詢效率。

indexFile文件是何時創建的?其創建的條件(時機)有兩個:

當第一條帶key的消息發送來后,系統發現沒有indexFile,此時會創建第一個indexFile文件。

當一個indexFile中掛載的index索引單元數量超出2000w個時,會創建新的indexFile。當帶key的消息發送到來后,系統會找到最新的indexFile,並從其indexHeader的最后4字節中讀取到indexCount。若indexCount >= 2000w時,會創建新的indexFile。

3、查詢流程

當消費者通過業務key來查詢相應的消息時,其需要經過一個相對較復雜的查詢流程。不過,在分析查詢流程之前,首先要清楚幾個定位計算式子:

計算指定消息key的slot槽位序號:
slot槽位序號 = key的hash % 500w

計算槽位序號為n的slot在indexFile中的起始位置:
slot(n)位置 = 40 + (n - 1) * 4

計算indexNo為m的index在indexFile中的位置:
index(m)位置 = 40 + 500w * 4 + (m - 1) * 20

四、消息的消費

消費者從Broker中獲取消息的方式有兩種:pull拉取方式和push推動方式。消費者組對於消息消費的模式又分為兩種:集群消費Clustering和廣播消費Broadcasting。

1、獲取消費類型

拉取式消費

Consumer主動從Broker中拉取消息,主動權由Consumer控制。一旦獲取了批量消息,就會啟動消費過程。不過,該方式的實時性較弱,即Broker中有了新的消息時消費者並不能及時發現並消費。

推送式消費

該模式下Broker收到數據后會主動推送給Consumer。該獲取方式一般實時性較高。該獲取方式是典型的發布-訂閱模式,即Consumer向其關聯的Queue注冊了監聽器,一旦發現有新的消息到來就會觸發回調的執行,回調方法是Consumer去Queue中拉取消息。而這些都是基於Consumer與Broker間的長連接的。長連接的維護是需要消耗系統資源的。

2、消費模式

廣播消費模式

廣播消費模式下,相同Consumer Group的每個Consumer實例都接收同一個Topic的全量消息。即每條消息都會被發送到Consumer Group中的每個Consumer。

集群消費模式

集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤同一個Topic的消息。即每條消息只會被發送到Consumer Group中的某個Consumer。

3、消費進度保存
廣播模式:消費進度保存在consumer端
集群模式:消費進度保存在broker端
4、Rebalance機制

Rebalance產生的前提條件是集群消費,產生的本意是為了提升消息的並行消費能力。

Rebalance限制:

由於⼀個隊列最多分配給⼀個消費者,因此當某個消費者組下的消費者實例數量大於隊列的數量時,多余的消費者實例將分配不到任何隊列。

Rebalance危害:

消費暫停、消費重復、消費突刺

Rebalance產生的原因:

消費者所訂閱Topic的Queue數量發生變化
消費者組中消費者的數量發生變化

Rebalance過程:

在Broker中維護着多個Map集合,這些集合中動態存放着當前Topic中Queue的信息、Consumer Group中Consumer實例的信息。一旦發現消費者所訂閱的Queue數量發生變化,或消費者組中消費者的數量發生變化,立即向Consumer Group中的每個實例發出Rebalance通知。

TopicConågManager:key是topic名稱,value是TopicConåg。TopicConåg中維護着該Topic中所有Queue的數據。

ConsumerManager:key是Consumser Group Id,value是ConsumerGroupInfo。ConsumerGroupInfo中維護着該Group中所有Consumer實例數據。

ConsumerOffsetManager:key為 Topic與訂閱該Topic的Group的組合,即topic@group,value是一個內層Map。內層Map的key為QueueId,內層Map的value為該Queue的消費進度offset。

Consumer實例在接收到通知后會采用Queue分配算法自己獲取到相應的Queue,即由Consumer實例自主進行Rebalance。

5、Queue分配模式
平均分配策略:根據avg = QueueCount / ConsumerCount 的計算結果進行分配的。如果能夠整除,則按順序將avg個Queue逐個分配Consumer;如果不能整除,則將多余出的Queue按照Consumer順序逐個分配
環形分配策略: 根據消費者的順序,依次在由queue隊列組成的環形圖中逐個分配
一致性hash策略: 將consumer的hash值作為Node節點存放到hash環上,然后將queue的hash值也放到hash環上,通過順時針方向,距離queue最近的那個consumer就是該queue要分配的consumer
同機房策略: 根據queue的部署機房位置和consumer的位置,過濾出當前consumer相同機房的queue。然后按照平均分配策略或環形平均策略對同機房queue進行分配。如果沒有同機房queue,則按照平均分配策略或環形平均策略對所有queue進行分配

6、至少一次原則

RocketMQ有一個原則:每條消息必須要被成功消費一次。Consumer在消費完消息后會向其消費進度記錄器提交其消費消息的offset,offset被成功記錄到記錄器中,那么這條消費就被成功消費了。

五、訂閱關系的一致性

訂閱關系的一致性指的是,同一個消費者組(Group ID相同)下所有Consumer實例所訂閱的Topic與Tag及對消息的處理邏輯必須完全一致。否則,消息消費的邏輯就會混亂,甚至導致消息丟失。

六、offset管理

消費進度offset是用來記錄每個Queue的不同消費組的消費進度的。根據消費進度記錄器的不同,可以分為兩種模式:本地模式和遠程模式。

1、offset本地管理模式

當消費模式為廣播消費時,offset使用本地模式存儲。因為每條消息會被所有的消費者消費,每個消費者管理自己的消費進度,各個消費者之間不存在消費進度的交集。

Consumer在廣播消費模式下offset相關數據以json的形式持久化到Consumer本地磁盤文件中,默認文件路徑為當前用戶主目錄下的.rocketmq_offsets/${clientId}/${group}/Offsets.json 。其中${clientId}為當前消費者id,默認為ip@DEFAULT;${group}為消費者組名稱。

2、offset遠程管理模式

當消費模式為集群消費時,offset使用遠程模式管理。因為所有Cosnumer實例對消息采用的是均衡消費,所有Consumer共享Queue的消費進度。

Consumer在集群消費模式下offset相關數據以json的形式持久化到Broker磁盤文件中,文件路徑為當前用戶主目錄下的store/config/consumerOffset.json。

Broker啟動時會加載這個文件,並寫入到一個雙層Map(ConsumerOffsetManager)。外層map的key為topic@group,value為內層map。內層map的key為queueId,value為offset。當發生Rebalance時,新的Consumer會從該Map中獲取到相應的數據來繼續消費。

集群模式下offset采用遠程管理模式,主要是為了保證Rebalance機制。

3、offset用途

消費者是如何從最開始持續消費消息的?消費者要消費的第一條消息的起始位置是用戶自己通過consumer.setConsumeFromWhere()方法指定的。

當消費完一批消息后,Consumer會提交其消費進度offset給Broker,Broker在收到消費進度后會將其更新到那個雙層Map(ConsumerOffsetManager)及consumerOffset.json文件中,然后向該Consumer進行ACK,而ACK內容中包含三項數據:當前消費隊列的最小offset(minOffset)、最大offset(maxOffset)、及下次消費的起始offset(nextBeginOffset)。

4、重試隊列

當rocketMQ對消息的消費出現異常時,會將發生異常的消息的offset提交到Broker中的重試隊列。系統在發生消息消費異常時會為當前的topic@group創建一個重試隊列,該隊列以%RETRY%開頭,到達重試時間后進行消費重試。

5、offset的同步提交與異步提交

同步提交:消費者在消費完一批消息后會向broker提交這些消息的offset,然后等待broker的成功響應。若在等待超時之前收到了成功響應,則繼續讀取下一批消息進行消費(從ACK中獲取nextBeginOffset)。若沒有收到響應,則會重新提交,直到獲取到響應。而在這個等待過程中,消費者是阻塞的。其嚴重影響了消費者的吞吐量。

異步提交:消費者在消費完一批消息后向broker提交offset,但無需等待Broker的成功響應,可以繼續讀取並消費下一批消息。這種方式增加了消費者的吞吐量。但需要注意,broker在收到提交的offset后,還是會向消費者進行響應的。可能還沒有收到ACK,此時Consumer會從Broker中直接獲取nextBeginOffset。

七、消費冪等

冪等:若某操作執行多次與執行一次對系統產生的影響是相同的,則稱該操作是冪等的。

1、消息重復的場景分析
發送時消息重復
消費時消息重復
Rebalance時消息重復
2、通用解決方案

兩要素

冪等解決方案的設計中涉及到兩項要素:冪等令牌,與唯一性處理。只要充分利用好這兩要素,就可以設計出好的冪等解決方案。

冪等令牌:是生產者和消費者兩者中的既定協議,通常指具備唯⼀業務標識的字符串。例如,訂單號、流水號。一般由Producer隨着消息一同發送來的。

唯一性處理:服務端通過采用⼀定的算法策略,保證同⼀個業務邏輯不會被重復執行成功多次。例如,對同一筆訂單的多次支付操作,只會成功一次。

解決方案

對於常見的系統,冪等性操作的通用性解決方案是:

首先通過緩存去重。在緩存中如果已經存在了某冪等令牌,則說明本次操作是重復性操作;若緩存沒有命中,則進入下一步。
在唯一性處理之前,先在數據庫中查詢冪等令牌作為索引的數據是否存在。若存在,則說明本次操作為重復性操作;若不存在,則進入下一步。
在同一事務中完成三項操作:唯一性處理后,將冪等令牌寫入到緩存,並將冪等令牌作為唯一索引的數據寫入到DB中。
八、消息堆積與消費延遲

消息處理流程中,如果Consumer的消費速度跟不上Producer的發送速度,MQ中未處理的消息會越來越多(進的多出的少),這部分消息就被稱為堆積消息。消息出現堆積進而會造成消息的消費延遲。

1、產生原因分析

消息拉取:

Consumer通過長輪詢Pull模式批量拉取的方式從服務端獲取消息,將拉取到的消息緩存到本地緩沖隊列中。對於拉取式消費,在內網環境下會有很高的吞吐量,所以這一階段一般不會成為消息堆積的瓶頸。

消息消費:

Consumer將本地緩存的消息提交到消費線程中,使用業務消費邏輯對消息進行處理,處理完畢后獲取到一個結果。這是真正的消息消費過程。此時Consumer的消費能力就完全依賴於消息的消費耗時和消
費並發度了。如果由於業務處理邏輯復雜等原因,導致處理單條消息的耗時較長,則整體的消息吞吐量肯定不會高,此時就會導致Consumer本地緩沖隊列達到上限,停止從服務端拉取消息。

結論: 消息堆積的主要瓶頸在於客戶端的消費能力,而消費能力由消費耗時和消費並發度決定

2、消費耗時

影響消息處理時長的主要因素是代碼邏輯。而代碼邏輯中可能會影響處理時長代碼主要有兩種類型:CPU內部計算型代碼和外部I/O操作型代碼

3、消費並發度

一般情況下,消費者端的消費並發度由單節點線程數和節點數量共同決定,其值為單節點線程數*節點數量。不過,通常需要優先調整單節點的線程數,若單機硬件資源達到了上限,則需要通過橫向擴展來提高消費並發度

4、如何避免

為了避免在業務使用時出現非預期的消息堆積和消費延遲問題,需要在前期設計階段對整個業務邏輯進行完善的排查和梳理。其中最重要的就是梳理消息的消費耗時和設置消息消費的並發度。

九、消息的清理

消息是被順序存儲在commitlog文件的,且消息大小不定長,所以消息的清理是不可能以消息為單位進行清理的,而是以commitlog文件為單位進行清理的。否則會急劇下降清理效率,並實現邏輯復雜。

commitlog文件存在一個過期時間,默認為72小時,即三天。除了用戶手動清理外,在以下情況下也會被自動清理,無論文件中的消息是否被消費過:

文件過期,且到達清理時間點(默認為凌晨4點)后,自動清理過期文件

文件過期,且磁盤空間占用率已達過期清理警戒線(默認75%)后,無論是否達到清理時間點,都會自動清理過期文件

磁盤占用率達到清理警戒線(默認85%)后,開始按照設定好的規則清理文件,無論是否過期。默認會從最老的文件開始清理

磁盤占用率達到系統危險警戒線(默認90%)后,Broker將拒絕消息寫入


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM