RocketMQ中的CommitLog、ConsumeQueue、indexFile、offsetTable以及多種偏移量對比


CommitLog
消息內容原文的存儲文件,同Kafka一樣,消息是變長的,順序寫入,生成規則:
每個文件的默認1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1 073 741 824Byte;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824, 消息存儲的時候會順序寫入文件,當文件滿了則寫入下一個文件。

ConsumeQueue
ConsumeQueue中並不需要存儲消息的內容,而存儲的是消息在CommitLog中的offset。也就是說ConsumeQueue其實是CommitLog的一個索引文件。一個ConsumeQueue文件對應topic下的一個隊列。

 

ConsumeQueue是定長的結構,每1條記錄固定的20個字節。很顯然,Consumer消費消息的時候,要讀2次:先讀ConsumeQueue得到offset,再通過offset找到CommitLog對應的消息內容。

ConsumeQueue的作用

  • 通過broker保存的offset(offsetTable.offset json文件中保存的ConsumerQueue的下標)可以在ConsumeQueue中獲取消息,從而快速的定位到commitLog的消息位置
  • 過濾tag是也是通過遍歷ConsumeQueue來實現的(先比較hash(tag)符合條件的再到consumer比較tag原文)
  • 並且ConsumeQueue還能保存於操作系統的PageCache進行緩存提升檢索性能

下面是我解析的ConsumeQueue

public static void main(String[] args) throws IOException {
        decodeCQ(new File("D:\\00000000000000000000"));
    }

    static void decodeCQ(File consumeQueue) throws IOException {
        FileInputStream fis = new FileInputStream(consumeQueue);
        DataInputStream dis = new DataInputStream(fis);

        long preTag = 0;
        long count = 1;
        while (true) {
            long offset = dis.readLong();
            int size = dis.readInt();
            long tag = dis.readLong();

            if (size == 0) {
                break;
            }
            preTag = tag;
            System.out.printf(" %d %d %d\n",   tag, size, offset);
        }
        fis.close();
  }
hash(tag)|size|offset(commitLog) 3552231 191 180081
 3552231 191 180654
 3552231 191 180845
 3552231 191 182182
 3552231 192 182565
 121074 201 182757
 3552231 245 190411
 3552231 245 190656
 3552231 245 190901
 3552231 245 191146
 3552231 245 191391
 3552231 245 191636
 3552231 245 191881
 99 197 219910
 99 197 220107
 99 197 220304

offsetTable.offset(json中保存)
和commitLog的offset不是一回事,這個offset是ConsumeQueue文件的(已經消費的)下標/行數,可以直接定位到ConsumeQueue並找到commitlogOffset從而找到消息體原文。這個offset是消息消費進度的核心
offset持久化 類型(父類是OffsetStore):

  • 本地文件類型:DefaultMQPushConsumer的BROADCASTING模式,各個Consumer沒有互相干擾,使用LoclaFileOffsetStore,把Offset存儲在Consumer本地
  • Broker代存儲類型:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存儲和控制Offset的值,使用RemoteBrokerOffsetStore

{
    "offsetTable":{
        "zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43
        },
        "TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250
        },
        "%RETRY%zxp_test_group2@zxp_test_group2":{0:3
        },
        "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
        },
        "%RETRY%zxp_test_group3@zxp_test_group3":{0:0
        },
        "order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3
        }
    }
}

indexFile
如果我們需要根據消息ID,來查找消息,consumequeue 中沒有存儲消息ID,如果不采取其他措施,又得遍歷 commitlog文件了,indexFile就是為了解決這個問題的文件。
偏移量總結
由於出現了多個偏移量的概念,所以我總結一下:
1、CommitLog中的offset(消息體偏移量)
  體現在commitlog文件名稱中,對應這個CommitLog文件所有消息在整個topic的隊列中起始偏移量(方便通過ConsumeQueue.commitlogOffset找到當前要消費的消息存在於哪個commitlog文件)
2、ConsumeQueue中的commitlogOffset(消息體偏移量)
  定位了當前這條消息在commitlog中的偏移量
3、offsettable.offset(下標)
  定位了當前已經消費的ConsumeQueue的下標是哪條消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM