CommitLog
消息內容原文的存儲文件,同Kafka一樣,消息是變長的,順序寫入,生成規則:
每個文件的默認1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1 073 741 824Byte;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824, 消息存儲的時候會順序寫入文件,當文件滿了則寫入下一個文件。
ConsumeQueue
ConsumeQueue中並不需要存儲消息的內容,而存儲的是消息在CommitLog中的offset。也就是說ConsumeQueue其實是CommitLog的一個索引文件。一個ConsumeQueue文件對應topic下的一個隊列。
ConsumeQueue是定長的結構,每1條記錄固定的20個字節。很顯然,Consumer消費消息的時候,要讀2次:先讀ConsumeQueue得到offset,再通過offset找到CommitLog對應的消息內容。
ConsumeQueue的作用
- 通過broker保存的offset(offsetTable.offset json文件中保存的ConsumerQueue的下標)可以在ConsumeQueue中獲取消息,從而快速的定位到commitLog的消息位置
- 過濾tag是也是通過遍歷ConsumeQueue來實現的(先比較hash(tag)符合條件的再到consumer比較tag原文)
- 並且ConsumeQueue還能保存於操作系統的PageCache進行緩存提升檢索性能
下面是我解析的ConsumeQueue
public static void main(String[] args) throws IOException { decodeCQ(new File("D:\\00000000000000000000")); } static void decodeCQ(File consumeQueue) throws IOException { FileInputStream fis = new FileInputStream(consumeQueue); DataInputStream dis = new DataInputStream(fis); long preTag = 0; long count = 1; while (true) { long offset = dis.readLong(); int size = dis.readInt(); long tag = dis.readLong(); if (size == 0) { break; } preTag = tag; System.out.printf(" %d %d %d\n", tag, size, offset); } fis.close(); }
hash(tag)|size|offset(commitLog) 3552231 191 180081 3552231 191 180654 3552231 191 180845 3552231 191 182182 3552231 192 182565 121074 201 182757 3552231 245 190411 3552231 245 190656 3552231 245 190901 3552231 245 191146 3552231 245 191391 3552231 245 191636 3552231 245 191881 99 197 219910 99 197 220107 99 197 220304
offsetTable.offset(json中保存)
和commitLog的offset不是一回事,這個offset是ConsumeQueue文件的(已經消費的)下標/行數,可以直接定位到ConsumeQueue並找到commitlogOffset從而找到消息體原文。這個offset是消息消費進度的核心
offset持久化 類型(父類是OffsetStore):
- 本地文件類型:DefaultMQPushConsumer的BROADCASTING模式,各個Consumer沒有互相干擾,使用LoclaFileOffsetStore,把Offset存儲在Consumer本地
- Broker代存儲類型:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存儲和控制Offset的值,使用RemoteBrokerOffsetStore
{ "offsetTable":{ "zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43 }, "TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250 }, "%RETRY%zxp_test_group2@zxp_test_group2":{0:3 }, "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0 }, "%RETRY%zxp_test_group3@zxp_test_group3":{0:0 }, "order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3 } } }
indexFile
如果我們需要根據消息ID,來查找消息,consumequeue 中沒有存儲消息ID,如果不采取其他措施,又得遍歷 commitlog文件了,indexFile就是為了解決這個問題的文件。
偏移量總結
由於出現了多個偏移量的概念,所以我總結一下:
1、CommitLog中的offset(消息體偏移量)
體現在commitlog文件名稱中,對應這個CommitLog文件所有消息在整個topic的隊列中起始偏移量(方便通過ConsumeQueue.commitlogOffset找到當前要消費的消息存在於哪個commitlog文件)
2、ConsumeQueue中的commitlogOffset(消息體偏移量)
定位了當前這條消息在commitlog中的偏移量
3、offsettable.offset(下標)
定位了當前已經消費的ConsumeQueue的下標是哪條消息