RocketMQ:(2) Broker


  Broker是RocketMQ的核心,大部分“重量級”工作都是由Broker完成的,包括接收Producer發過來的信息、處理Consumer的消費消息請求、消息的持久化存儲、消息的HA機制以及服務端過濾功能等。

一、消息存儲文件

  分布式隊列因為有高可靠性的要求,所以數據要通過磁盤進行持久化存儲。用磁盤存儲消息,速度會不會很慢呢?能滿足實時性和高吞吐量的要求嗎?

  實際上,磁盤有時候會比你想象的快很多,有時候也會比你想象的慢很多,關鍵在如何使用,使用得當,磁盤的速度完全可以匹配上網絡的數據傳輸速度。目前的高性能磁盤,順序寫速度可以達到600MB/s,超過了一般網卡的傳輸速度,這是磁盤比想象的快的地方。但是磁盤隨機寫的速度只有大概100KB/s,和順序寫的性能相差6000倍!

  RocketMQ主要存儲的文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。RocketMQ將所有主題的消息存儲在同一個文件中,確保消息發送時順序寫文件,盡最大的能力確保消息發送的高性能與高吞吐量。但由於消息中間件一般是基於消息主題的訂閱機制,這樣便給按照消息主題檢索消息帶來了極大的不便。為了提高消息消費的效率,RocketMQ引入了ConsumeQueue消息隊列文件,每個消息主題包含多個消息消費隊列,每一個消息隊列有一個消息文件。IndexFile索引文件,其主要設計理念就是為了加速消息的檢索性能,根據消息的屬性快速從Commitlog文件中檢索消息。RocketMQ是一款高性能的消息中間件,存儲部分的設計是核心,存儲的核心是IO訪問性能。

1)CommitLog:消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中。
2)config:運行期間一些配置信息,主要包括:
  consumerFilter.json:主題消息過濾信息
  consumerOffset.json:集群消費模式消息消費進度
  delayOffset.json:延時消息隊列拉取進度
  subscriptionGroup.json:消息消費組配置信息
  topics.json:topic配置屬性
3)ConsumeQueue:消息消費隊列,消息到達CommitLog文件后,將異步轉發到消息消費隊列,供消息消費者消費。
4)IndexFile:消息索引文件,主要存儲消息Key與Offset的對應關系。
5)abort:如果存在abort文件說明Broker非正常關閉,該文件默認啟動時創建,正常退出之前刪除。
6)checkpoint:文件檢測點,存儲commitlog文件最后一次刷盤時間戳,comsumequeue最后一次刷盤時間,Index索引文件最后一次刷盤時間戳。

Commitlog文件

  該目錄下的文件主要存儲消息,commitlog文件默認大小為1G,可在broker配置文件中設置mappedFileSizeCommitlog屬性來修改。

ConsumeQueue文件

  RocketMQ基於主題訂閱模式實現消息消費,消費者關心的是一個主題下的所有消息,但由於同一主題的消息不連續地存儲在commitlog文件中,試想一下如果消息消費者直接從消息存儲文件(CommitLog)中去遍歷查找訂閱主題下的消息,效率將極其低下,RocketMQ為了適應消息消費的檢索需求,設計了消息消費隊列文件(ConsumeQueue),該文件可以看成是CommitLog關於消息消費的“索引”文件,consumeQueue的第一級目錄為消息主題,第二級目錄為主題的消息隊列。

  每一個Consumequeue條目不會存儲消息的全量信息,單個ConsumeQueue文件中默認包含30萬個條目,單個文件的長度為30w*20字節,單個ConsumeQueue文件可以看作是一個ConsumeQueue條目的數組,其下標為ConsumeQueue的邏輯偏移量,消息消費進度存儲的偏移量即邏輯偏移量。ConsumeQueue即為Commitlog文件的索引文件,其構建機制是當消息到達Commitlog文件后,由專門的線程產生消息轉發任務,從而構建消息消費隊列文件與下文提到的索引文件。 

Index索引文件

  消息消費隊列是RocketMQ專門為消息訂閱構建的索引文件,提高根據主題與消息隊列檢索消息的速度。另外RocketMQ引入了Hash索引機制為消息建立索引,HashMap的設計包含兩個基本點:Hash槽與Hash沖突的鏈表結構。RocketMQ索引文件布局如圖,IndexFile總共包含IndexHeader、Hash槽、Hash條目(數據):

  • Hash槽:一個IndexFile默認包含500萬個Hash槽,每個Hash槽存儲的是落在該Hash槽的hashcode最新的 Index 索引。
  • Index條目列表:默認一個索引文件包含2000w個條目,每一個Index條目會存儲key的hashcode及消息對應的物理偏移量。RocketMQ將消息索引鍵與消息偏移量映射關系寫入到IndexFile。

  值得關注的是,IndexFile條目中存儲的不是消息索引key而是消息屬性key的HashCode,在根據key查找時需要根據消息物理偏移量找到消息進而再驗證消息key的值,之所以只存儲消息HashCode而不存儲具體的key,是為了將Index條目設計為定長結構,才能方便地檢索與定位條目。

CheckPoint文件

  checkpoint的作用是記錄commitlog、ConsumeQueue、index文件的刷盤時間點,文件固定長度為4k,其中只用該文件的前面24個字節。

 

二、消息發送存儲流程

  Step1:如果當前Broker停止工作或Broker為SLAVE角色或當前Rocket不支持寫入則拒絕消息寫入;如果消息主題長度超過127個字符、消息屬性長度超過32767個字符將拒絕該消息寫入。
  Step2:如果消息的延遲級別大於0,將消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主題SCHEDULE_TOPIC、消息隊列ID更新原先消息的主題與隊列,這是並發消息消費重試關鍵的一步。
  Step3:獲取當前可以寫入的Commitlog文件。Commitlog文件存儲目錄為${ROCKET_HOME}/store/commitlog目錄,每一個文件默認1G,一個文件寫滿后再創建另外一個,以該文件中第一個偏移量為文件名,偏移量小於20位用0補齊。這樣根據物理偏移量能快速定位到消息。MappedFileQueue可以看作是${ROCKET_HOME}/store/commitlog文件夾,而MappedFile則對應該文件夾下一個個的文件。
  Step4:在寫入CommitLog之前,先申請putMessageLock,也就是將消息存儲到CommitLog文件中是串行的
  Step5:設置消息的存儲時間,如果mappedFile為空,表明${ROCKET_HOME}/store/commitlog目錄下不存在任何文件,說明本次消息是第一次消息發送,用偏移量0創建第一個commit文件,文件為00000000000000000000,如果文件創建失敗,拋出CREATE_MAPEDFILE_FAILED,很有可能是磁盤空間不足或權限不夠。
  Step6:將消息追加到MappedFile中。
  Step7:創建全局唯一消息ID。
  Step8:獲取該消息在消息隊列的偏移量。CommitLog中保存了當前所有消息隊列的當前待寫入偏移量。
  Step9 :根據消息體的長度、主題的長度、屬性的長度結合消息存儲格式計算消息的總長度。如果消息長度+END_FILE_MIN_BLANK_LENGTH大於CommitLog文件的空閑空間,則返回AppendMessageStatus.END_OF_FILE, Broker會重新創建一個新的CommitLog文件來存儲該消息。
  Step10:將消息內容存儲到ByteBuffer中,然后創建AppendMessageResult。這里只是將消息存儲在MappedFile對應的內存映射buffer中,並沒有刷寫到磁盤。
  Step11 :更新消息隊列邏輯偏移量。處理完消息追加邏輯后將釋放putMessageLock鎖。
  Step12:DefaultAppendMessageCallback#doAppend只是將消息追加在內存中,需要根據是同步刷盤還是異步刷盤方式,將內存中的數據持久化到磁盤。然后執行HA主從同步復制。

  RocketMQ通過使用內存映射文件來提高IO訪問性能,不論是Commitlog,ConsumerQueue,還是IndexFile,單個文件都被設計為固定長度,如果一個文件寫滿以后再創建一個新文件,文件名就為該文件第一條消息對應的全局物理偏移量。由於使用了內存映射,只要存在於存儲目錄下的文件,都需要對應創建內存映射文件,如果不定時將已消費的消息從存儲文件中刪除,會造成極大的內存壓力與資源浪費,所以RocketMQ采取定時刪除存儲文件的策略,也就是說在存儲文件中,第一個文件不一定是00000000000000000000,因為該文件在某一時刻會被刪除。

  TransientStorePool:短暫的存儲池。RocketMQ單獨創建一個MappedByteBuffer內存緩存池,用來臨時存儲數據,數據先寫入該內存映射中,然后由commit線程定時將數據從該內存復制到與目的物理文件對應的內存映射中。RokcetMQ引入該機制主要的原因是提供一種內存鎖定,將當前堆外內存一直鎖定在內存中,避免被進程將內存交換到磁盤。

 

三、實時更新ConsumeQueue、IndexFile文件

  消息消費隊列文件、消息屬性索引文件都是基於CommitLog文件構建的,當消息生產者提交的消息存儲在Commitlog文件中,ConsumeQueue、IndexFile需要及時更新,否則消息無法及時被消費,根據消息屬性查找消息也會出現較大延遲。RocketMQ通過開啟一個線程ReputMessageServcie來准實時轉發CommitLog文件更新事件,相應的任務處理器根據轉發的消息及時更新ConsumeQueue、IndexFile文件。

  Broker服務器在啟動時會啟動ReputMessageService線程,並初始化一個非常關鍵的參數reputFfomOffset,該參數的含義是ReputMessageService從哪個物理偏移量開始轉發消息給ConsumeQueue和IndexFile。如果允許重復轉發,reputFromOffset設置為CommitLog的提交指針;如果不允許重復轉發,reputFromOffset設置為Commitlog的內存中最大偏移量。ReputMessageService線程每執行一次任務推送休息1毫秒就繼續嘗試推送消息到消息消費隊列和索引文件。

  Step1:返回reputFromOffset偏移量開始的全部有效數據(commitlog 文件)。然后循環讀取每一條消息。
  Step2:從result返回的ByteBuffer中循環讀取消息,一次讀取一條,創建DispatchRequest對象。如果消息長度大於0,則調用doDispatch方法。最終將分別調用 CommitLogDispatcherBuildConsumeQueue (構建消息消費隊 )、CommitLogDispatcherBuildlndex (構建索引文件)

根據消息更新ConumeQueue

  消息消費隊列轉發任務實現類為:CommitLogDispatcherBuildConsumeQueue。  

  Step1:根據消息主題與隊列ID,先獲取對應的ConumeQueue文件,其邏輯比較簡單,因為每一個消息主題對應一個消息消費隊列目錄,然后主題下每一個消息隊列對應一個文件夾,然后取出該文件夾最后的ConsumeQueue文件即可。
  Step2:根據consumeQueueOffset計算ConumeQueue中的物理地址,將內容追加到ConsumeQueue的內存映射文件中(本操作只追加並不刷盤), ConumeQueue的刷盤方式固定為異步刷盤模式。

根據消息更新Index索引文件

  Hash索引文件轉發任務實現類:CommitLogDispatcherBuildIndex。

  獲取或創建IndexFile文件並獲取所有文件最大的物理偏移量。如果該消息的物理偏移量小於索引文件中的物理偏移,則說明是重復數據,忽略本次索引構建。如果消息的唯一鍵不為空,則添加到Hash索引中,以便加速根據唯一鍵檢索消息。

 

四、消息隊列與索引文件恢復

  由於 RocketMQ 存儲首先將消息全量存儲在 CommitLog 文件中,然后異步生成轉發任務更新 ConsumeQueue、Index 文件。如果消息成功存儲到 CommitLog 文件中,轉發任務未成功執行,此時消息服務器 Broker 由於某個原因宕機,導致 CommitLog、ConsumeQueue、IndexFile文件數據不一致。如果不加以人工修復的話,會有一部分消息即便在 CommitLog 文件中存在,但由於並沒有轉發到 ConsumeQueue,這部分消息將永遠不會被消費者消費。

  RocketMQ是如何使CommitLog、ConsumeQueue達到最終一致性的呢?

RocketMQ關於存儲文件的加載流程

  Step1:判斷上一次退出是否正常。Broker在啟動時創建abort文件,在退出時通過注冊 JVM 鈎子函數刪除 abort 文件。如果下一次啟動時存在 abort 文件。 說明 Broker 是異常退出的,CommitLog 與 ConsumeQueue 數據有可能不一致,需要進行修復。
  Step2:加載延遲隊列,RocketMQ定時消息相關。
  Step3:加載Commitlog文件,加載 ${ROCKET_HOME}/store/commitlog 目錄下所有文件並按照文件名排序。
  Step4:加載消息消費隊列,其思路與Commitlog大體一致,遍歷消息消費隊列根目錄,獲取該Broker存儲的所有主題,然后遍歷每個主題目錄,獲取該主題下的所有消息消費隊列,然后分別加載每個消息消費隊列下的文件,構建ConumeQueue對象。
  Step5:加載存儲檢測點,檢測點主要記錄 commitLog 文件、ConsumeQueue 文件、Index 索引文件的刷盤點。
  Step6:加載索引文件,如果上次異常退出,而且索引文件上次刷盤時間小於該索引文件最大的消息時間戳該文件將立即銷毀。
  Step7:根據 Broker 是否是正常停止執行不同的恢復策略。
  Step8:恢復 ConsumeQueue 文件后,將在 CommitLog 實例中保存每個消息消費隊列當前的存儲邏輯偏移量,這也是消息中不僅存儲主題、消息隊列 ID 還存儲了消息隊列偏移量的關鍵所在。

Broker 正常停止文件恢復

  Step1:Broker正常停止再重啟時,從倒數第三個文件開始進行恢復,如果不足 3 個文件,則從第一個文件開始恢復。
  Step2:遍歷 CommitLog 文件,每次取出一條消息,如果查找結果為 true 並且消息的長度大於 0 表示消息正確,校驗指針向前移動到本消息的長度;如果查找結果為 true 並且消息的長度等於 0,表示已到該文件的末尾,如果還有下一個文件,則循環此步驟,否則跳出循環;如果查找結構為 false,表明該文件未填滿所有消息,跳出循環,結束遍歷文件。
  Step3:更新MappedFileQueue的flushedWhere與committedPosition指針。
  Step4:刪除offset之后的所有文件。遍歷目錄下的文件,如果offset小於文件的起始偏移量,說明該文件是有效文件后面創建的,加入到待刪除文件列表。

  正常停止的時,Broker 會將 IndexFile 和 ConsumeQueue 都更新好,所以如果 Broker 正常停止的話,恢復過程只是修正commit 指針和 flush 指針。

Broker 異常停止文件恢復

  異常文件恢復的步驟與正常停止文件恢復的流程基本相同,其主要差別有兩個。首先,正常停止默認從倒數第三個文件開始進行恢復,而異常停止則需要從最后一個文件往前走,找到第一個消息存儲正常的文件。其次,如果 CommitLog 目錄沒有消息文件,如果在消息消費隊列 ConsumeQueue 目錄下存在文件,則需要銷毀。 

  Step1:首先判斷文件的魔數。如果文件中第一條消息的存儲時間等於 0,說明該消息存儲文件中未存儲任何消息。
  Step2:對比文件第一條消息的時間戳與檢測點,文件第一條消息的時間戳小於文件檢測點 checkpoint 說明該文件部分消息是可靠的,則從該文件開始恢復。
  Step3:如果找到MappedFile,則遍歷 MappedFile 中的消息,驗證消息的合法性,並將消息重新轉發到消息消費隊列與索引文件。
  Step4:如果未找到有效MappedFile,則設置 CommitLog 目錄的 flushedWhere、 committedWhere指針都為 0,並銷毀消息消費隊列文件。

  存儲啟動時所謂的文件恢復主要完成flushedPosition、committedWhere指針的設置、消息消費隊列最大偏移量加載到內存,並刪除flushedPosition之后所有的文件。如果Broker異常啟動,在文件恢復過程中,RocketMQ會將最后一個有效文件中的所有消息重新轉發到消息消費隊列與索引文件,確保不丟失消息,但同時會帶來消息重復的問題,縱觀RocktMQ的整體設計思想,RocketMQ保證消息不丟失但不保證消息不會重復消費,故消息消費業務方需要實現消息消費的冪等設計。

 

五、文件刷盤機制

  RocketMQ 的存儲與讀寫是基於JDK NIO 的內存映射機制(MappedByteBuffer)的,消息存儲時首先將消息追加到內存,再根據配置的刷盤策略在不同時間進行刷寫磁盤,默認為異步刷盤。索引文件的刷盤並不是采取定時刷盤機制,而是每更新一次索引文件就會將上一次的改動刷寫到磁盤。

  SYNC_FLUSH (同步刷盤):消息追加到內存映射文件的內存中后,立即將數據從內存刷寫到磁盤文件。
  ASYNC_FLUSH (異步刷盤):在消息追加到內存后立刻返回給消息發送端。RocketMQ 使用一個單獨的線程按照某一個設定的頻率執行刷盤操作。

 

六、過期文件刪除機制

  由於 RocketMQ 操作 CommitLog、ConsumeQueue文件是基於內存映射機制並在啟動的時候會加載 CommitLog、ConsumeQueue 目錄下的所有文件,為了避免內存與磁盤的浪費,不可能將消息永久存儲在消息服務器上,所以需要引人一種機制來刪除己過期的文件。RocketMQ 順序寫 CommitLog 文件、ConsumeQueue 文件,所有寫操作全部落在最后一個 CommitLog 或 ConsumeQueue 文件上,之前的文件在下一個文件創建后將不會再被更新。

  RocketMQ 清除過期文件的方法是: 如果非當前寫文件在一定時間間隔內沒有再次被更新,則認為是過期文件,可以被刪除,RocketMQ 不會關注這個文件上的消息是否全部被消費。默認每個文件的過期時間為 72 小時 ,通過在 Broker 配置文件中設置 fileReservedTime 來改變過期時間,單位為小時。

  RocketMQ 會每隔 10s 調度一次清除過程,檢測是否需要清除過期文件。RocketMQ會每隔10s調度一次cleanFilesPeriodically,檢測是否需要清除過期文件。執行頻率可以通過設置cleanResourceInterval,默認為10s。 

  RocketMQ在如下三種情況任意之一滿足的情況下將繼續執行刪除文件操作。

  • 指定刪除文件的時間點,RocketMQ通過deleteWhen設置一天的固定時間執行一次刪除過期文件操作,默認為凌晨4點。
  • 磁盤空間是否充足,如果磁盤空間不充足,則返回true,表示應該觸發過期文件刪除操作。
  • 預留,手工觸發,可以通過調用excuteDeleteFilesManualy方法手工觸發過期文件刪除,目前RocketMQ暫未封裝手工觸發文件刪除的命令。

​​  

七、總結

  Commitlog, 消息存儲文件,RocketMQ為了保證消息發送的高吞吐量,采用單一文件存儲所有主題的消息,保證消息存儲是完全的順序寫,但這樣給文件讀取同樣帶來了不便,為此,RocketMQ為了方便消息消費構建了消息消費隊列文件,基於主題與隊列進行組織,同時RocketMQ為消息實現了Hash索引,可以為消息設置索引鍵,根據索引能快速從Commitlog文件中檢索消息。
  當消息達到Commitlog文件后,會通過ReputMessageService線程接近實時地將消息轉發給消息消費隊列文件與索引文件。為了安全起見,RocketMQ引入了abort文件,記錄Broker的停機是正常關閉還是異常關閉,在重啟Broker時為了保證Commitlog文件、消息消費隊列文件與Hash索引文件的正確性,分別采取不同的策略來恢復文件。
  RocketMQ不會永久存儲消息文件,消息消費隊列文件,而是啟用文件過期機制並在磁盤空間不足或默認在凌晨4點刪除過期文件,文件默認保存72小時,並且在刪除文件時並不會判斷該消息文件上的消息是否被消費。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM