消息偏移量 Offset
queue0 offset 0 0-20 offset 1 20-40
糾錯:每條消息的tag對應的HashCode.
queue1 offset 0 0-20 offset 1 20-40
queue2 offset 0 0-20 offset 1 20-40
queue3 offset 0 0-20 offset 1 20-40
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每條消息偏移量,以十六進制表示消息長度,轉換成十進制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(隊列偏移量,當前隊列增加1條就自增1)]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六進制表示消息長度,轉換成十進制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六進制表示消息長度,轉換成十進制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六進制表示消息長度,轉換成十進制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
概念
- message queue 是無限長的數組,一條消息進來下標就會漲1,下標就是 offset,消息在某個 MessageQueue 里的位置,通過 offset 的值可以定位到這條消息,或者指示 Consumer 從這條消息開始向后處理。
- message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 並不是最新的那條消息的 offset,而是最新消息的 offset+1,minOffset 則是現存在的最小 offset。
- fileReserveTime=48 默認消息存儲48小時后,消費會被物理地從磁盤刪除,message queue 的 minOffset 也就對應增長。所以比 minOffset 還要小的那些消息已經不在 broker上了,就無法被消費
類型(父類是OffsetStore):
- 本地文件類型
- DefaultMQPushConsumer 的 BROADCASTING 廣播模式,各個 Consumer 沒有互相干擾,使用 LocalFileOffsetStore,把 Offset 存儲在本地
- Broker 代存儲類型
- DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存儲和控制 Offset 的值,使用 RemoteBrokerOffsetStore
作用
- 主要是記錄消息的偏移量,有多個消費者進行消費
- 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
- 廣播模式下采用 LocalFileOffsetStore,消費端存儲
建議采用 pushConsumer,RocketMQ 自動維護 OffsetStore,如果用另外一種 pullConsumer 需要自己進行維護 OffsetStore
消息存儲 CommitLog
消息存儲是由 ConsumeQueue 和 CommitLog 配合完成
- ConsumeQueue 是邏輯隊列,CommitLog 是真正存儲消息文件的,ConsumeQueue 存儲的是指向物理存儲的地址。Topic 下的每個 message queue 都有對應的 ConsumeQueue 文件,內容也會被持久化到磁盤。默認地址:store/consumequeue/{topicName}/{queueid}/fileName
- CommitLog:存儲消息真正內容的文件。
-
- 生成規則:
- 每個文件的默認1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如 00000000000000000000 代表了第一個文件,起始偏移量為0,文件大小為1G=1 073 741 824 Byte;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,消息存儲的時候會順序寫入文件,當文件滿了則寫入下一個文件。
- 判斷消息存儲在哪個 CommitLog 上
- 例如 1073742827 為物理偏移量,則其對應的相對偏移量為 1003 = 1073742827 - 1073741824,並且該偏移量位於第二個 CommitLog。
- 生成規則:
Broker 里面一個 Topic 里面有多個 MesssageQueue,每個 MessageQueue 對應一個 ConsumeQueue,ConsumeQueue 里面記錄的是消息在 CommitLog 里面的物理存儲地址。
IndexFile 消息索引文件
ConsumerQueue是通過偏移量offset去CommitLog文件中查找消息,但實際工作應用中,我們想查找某條具體的消息,並不知道offset值,那該怎么辦呢?那IndexFile作用就來了。
IndexFile是消息索引文件,如果一個生產者發送的消息包含key值的話,會使用IndexFile存儲消息索引,主要用於使用key來查詢消息。文件的內容結構如圖
在Broker端,通過Key來計算Hash槽的位置,從而找到Index索引數據。從Index索引中拿到消息的物理偏移量,最后根據這個物理偏移量,直接到CommitLog文件中去找就可以了。另外說明下,通過IndexFile來查找消息的方法不影響RocketMQ的正常生產-消費流程,它只是查詢定位消息的方法而已。