rocketMq 消息偏移量 Offset


消息偏移量 Offset

queue0  offset 0   0-20  offset 1  20-40

糾錯:每條消息的tag對應的HashCode.

queue1  offset 0  0-20  offset 1  20-40

queue2  offset 0  0-20  offset 1  20-40

queue3  offset 0 0-20  offset 1  20-40

SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每條消息偏移量,以十六進制表示消息長度,轉換成十進制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(隊列偏移量,當前隊列增加1條就自增1)]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六進制表示消息長度,轉換成十進制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六進制表示消息長度,轉換成十進制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六進制表示消息長度,轉換成十進制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]

概念

  • message queue 是無限長的數組,一條消息進來下標就會漲1,下標就是 offset,消息在某個 MessageQueue 里的位置,通過 offset 的值可以定位到這條消息,或者指示 Consumer 從這條消息開始向后處理。
  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 並不是最新的那條消息的 offset,而是最新消息的 offset+1,minOffset 則是現存在的最小 offset。
  • fileReserveTime=48 默認消息存儲48小時后,消費會被物理地從磁盤刪除,message queue 的 minOffset 也就對應增長。所以比 minOffset 還要小的那些消息已經不在 broker上了,就無法被消費

類型(父類是OffsetStore):

  • 本地文件類型
    • DefaultMQPushConsumer 的 BROADCASTING 廣播模式,各個 Consumer 沒有互相干擾,使用 LocalFileOffsetStore,把 Offset 存儲在本地
  • Broker 代存儲類型
    • DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存儲和控制 Offset 的值,使用 RemoteBrokerOffsetStore

作用

  • 主要是記錄消息的偏移量,有多個消費者進行消費
  • 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
  • 廣播模式下采用 LocalFileOffsetStore,消費端存儲

建議采用 pushConsumer,RocketMQ 自動維護 OffsetStore,如果用另外一種 pullConsumer 需要自己進行維護 OffsetStore

消息存儲 CommitLog

消息存儲是由 ConsumeQueue 和 CommitLog 配合完成

  • ConsumeQueue 是邏輯隊列,CommitLog 是真正存儲消息文件的,ConsumeQueue 存儲的是指向物理存儲的地址。Topic 下的每個 message queue 都有對應的 ConsumeQueue 文件,內容也會被持久化到磁盤。默認地址:store/consumequeue/{topicName}/{queueid}/fileName
  • CommitLog:存儲消息真正內容的文件。
    • 生成規則:
      • 每個文件的默認1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如 00000000000000000000 代表了第一個文件,起始偏移量為0,文件大小為1G=1 073 741 824 Byte;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,消息存儲的時候會順序寫入文件,當文件滿了則寫入下一個文件。
    • 判斷消息存儲在哪個 CommitLog 上
      • 例如 1073742827 為物理偏移量,則其對應的相對偏移量為 1003 = 1073742827 - 1073741824,並且該偏移量位於第二個 CommitLog。

Broker 里面一個 Topic 里面有多個 MesssageQueue,每個 MessageQueue 對應一個 ConsumeQueue,ConsumeQueue 里面記錄的是消息在 CommitLog 里面的物理存儲地址。

IndexFile 消息索引文件

    ConsumerQueue是通過偏移量offset去CommitLog文件中查找消息,但實際工作應用中,我們想查找某條具體的消息,並不知道offset值,那該怎么辦呢?那IndexFile作用就來了。
    IndexFile是消息索引文件,如果一個生產者發送的消息包含key值的話,會使用IndexFile存儲消息索引,主要用於使用key來查詢消息。文件的內容結構如圖

 

 在Broker端,通過Key來計算Hash槽的位置,從而找到Index索引數據。從Index索引中拿到消息的物理偏移量,最后根據這個物理偏移量,直接到CommitLog文件中去找就可以了。另外說明下,通過IndexFile來查找消息的方法不影響RocketMQ的正常生產-消費流程,它只是查詢定位消息的方法而已。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM