RocketMQ存儲機制與確認重傳機制


引子

消息隊列之前就聽說過,但一直沒有學習和接觸,直到最近的工作流引擎項目用到,需要了解學習一下。本文主要從一個初學者的角度針對RocketMQ的存儲機制和確認重傳機制做一個淺顯的總結。

存儲機制

我們知道,Broker(消息服務器)是消息存儲中心,主要作用是接收來自 Producer 的消息並存儲, Consumer 從這里取得消息。因此,RocketMQ的所有消息數據都是存放在Broker上的,我們先看看RocketMQ官方文檔中的Broker消息存儲架構圖,然后再來詳細講解。

CommitLog、ConsumeQueue、IndexFile

CommitLog:消息存放的物理文件,是消息主體以及元數據的存儲主體。每台broker上的commitlog被本機所有的queue共享,不做任何區分。用於存儲Producer端寫入的消息主體內容,消息內容不是定長的,文件順序寫,隨機讀。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;如下為Commit Log存儲單元結構圖

ConsumeQueue:ConsumeQueue是消息的邏輯消費隊列,相當於字典的目錄引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的,Consumer即可根據ConsumeQueue來查找待消費的消息。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,每個topic下的每個queue都有一個對應的consumequeue文件,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

consumequeue文件存儲單元格式

  1. CommitLogOffset:是指這條消息在Commit Log文件中的起始物理偏移量。
  2. msgSize:存儲中消息的大小。
  3. tagsCode:消息Tag的HashCode值。主要用於訂閱時消息過濾(訂閱時如果指定了Tag,會根據HashCode來快速查找到訂閱的消息)。

同樣consumequeue文件采取定長設計,每一個條目共20個字節,分別為8字節的commitlog物理偏移量、4字節的消息長度、8字節tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

我們來看一看具體的存儲文件是怎么樣的

如上圖所示:

  1. 根據topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
  2. 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA。
  3. 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,並重試指定次數后,仍然失敗,則發往死信隊列,比如圖中的%DLQ%ConsumerGroupA。(死信隊列(Dead Letter Queue)一般用於存放由於某種原因無法傳遞的消息,比如處理失敗或者已經過期的消息。)

IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法,如果一個消息包含key值的話,會使用IndexFile存儲消息索引。Index文件的存儲位置是:$HOME \store\index${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。如下圖所示為IndexFile文件結構:

索引文件根據key查找對應消息主要流程

  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引文件里面包含的最大槽的數目,例如圖中所示 slotNum=5000000)
  2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue 總是指向最新的一個索引項)
  3. 遍歷索引項列表返回查詢時間范圍內的結果集(默認一次最大返回的 32 條記錄)

混合型存儲結構

在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲於一個CommitLog中)針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。(引自RocketMQ官方文檔)

以上便是RocketMQ的存儲機制,看到這,有的讀者可能會問到,架構圖中的ConsumeOffset,minOffSet這些參數是干什么的你還沒有說呢?別急,我們今天要討論的不光是存儲機制,還有確認重傳機制。

消息ACK機制及消費進度管理

關於ACK和確保消費成功相關內容,我們只討論RocketMQ中的PushConsumer即Java客戶端中的DefaultPushConsumer,因為若要使用PullConsumer模式,類似的工作如何ack,如何保證消費等均需要使用方自己實現。

如何確保消費成功

PushConsumer為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功——即都會重新投遞。首先,消費的時候,我們需要注入一個消費回調,具體sample代碼如下:

consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//執行真正消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消費成功
        }
    });

業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批消息(默認是1條)是消費完成的。(具體如何ACK見后)

如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。

為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消費失敗的消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。

啟動的時候從哪里消費

當新實例啟動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset,見存儲架構圖),按照這個進度發起自己的第一次Pull請求。

如果這個消費進度在Broker並沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:

CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息
CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前

消息ACK機制

RocketMQ是以consumer group+queue為單位是管理消費進度的,以一個consumer offset標記這個這個消費組在這條queue上的消費進度。如果某已存在的消費組出現了新消費實例的時候,依靠這個組的消費進度,就可以判斷第一次是從哪里開始拉取的。

每次消息成功后,本地的消費進度會被更新,然后由定時器定時同步到broker,以此持久化消費進度。但是每次記錄消費進度的時候,只會把一批消息中最小的offset值為消費進度值,如下圖:

這鍾方式和傳統的一條message單獨ack的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重復問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。

在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了(注:consumerOffset=2201)。

在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。

對於這個場景,RocketMQ暫時無能為力,所以業務必須要保證消息消費的冪等性,這也是RocketMQ官方多次強調的態度。

參考資料

Apache RocketMQ開發者指南

RocketMQ——消息ACK機制及消費進度管理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM