Kafka消息(存儲)格式及索引組織方式


要深入學習Kafka,理解Kafka的存儲機制是非常重要的。本文介紹Kafka存儲消息的格式以及數據文件和索引組織方式,以便更好的理解Kafka是如何工作的。

Kafka消息存儲格式

Kafka為了保證消息的可靠性,服務端會將接收的消息進行序列化並保存到磁盤上(Kafka的多副本存儲機制),這里涉及到消息的存儲格式,即消息編碼后落到磁盤文件上的二進制的數據格式。下圖是根據Kafka 3.0官方文檔整理的消息格式:

 

 

 

包含三個部分:BatchRecords、Record,以及Header的編碼格式。

BatchRecords是Kafka數據的存儲單元,一個BatchRecords中包含多個Record(即我們通常說的一條消息)。BatchRecords中各個字段的含義如下:

 

字段名 含義
baseOffset 這批消息的起始Offset
partitionLeaderEpoch 用於Partition的Recover時保護數據的一致性,具體場景可以見KIP101
batchLength BatchRecords的長度
magic 魔數字段,可以用於拓展存儲一些信息,當前3.0版本的magic是2
crc crc校驗碼,包含從attributes開始到BatchRecords結束的數據的校驗碼
attributes int16,其中bit0~2中包含了使用的壓縮算法,bit3是timestampType,bit4表示是否失誤,bit5表示是否是控制指令,bit6~15暫未使用
lastOffsetDelta BatchRecords中最后一個Offset,是相對baseOffset的值
firstTimestamp BatchRecords中最小的timestamp
maxTimestamp BatchRecords中最大的timestamp
producerId 發送端的唯一ID,用於做消息的冪等處理
producerEpoch 發送端的Epoch,用於做消息的冪等處理
baseSequence BatchRecords的序列號,用於做消息的冪等處理
records 具體的消息內容

一個BatchRecords中可以包含多條消息,即上圖中的Record,而每條消息又可以包含多個Header信息,Header是Key-Value形式的。Record和Header的格式都非常簡單,就不對其中的字段做解釋了。

Log Segment

在Kafka中,一個Topic會被分割成多個Partition,而Partition由多個更小的,稱作Segment的元素組成。

 

 

 

Kafka一個Partition下會包含類似上圖中的一些文件,由log、index、timeindex三個文件組成一個Segment,而文件名中的(0和35)表示的是一個Segment的起始Offset(Kafka會根據log.segment.bytes的配置來決定單個Segment文件(log)的大小,當寫入數據達到這個大小時就會創建新的Segment)。log、index、timeindex中存儲的都是二進制的數據(log中存儲的就是上一部分介紹的BatchRecords的內容,而index和timeindex分別是一些索引信息。)

下圖是log文件中數據解析后的示意圖(也就是本文第一部分BatchRecords格式)。其中16開頭的這一行表示一個第一條消息的Offset是16的BatchRecord,而24開頭的這一行表示的是一個第一條消息的Offset是24的BatchRecord。

 

 

 

索引

我們知道Kafka中每個Consumer消費一個Partition都會有一個關聯的Offset表示已經處理過的消息的位置。通常Consumer會根據Offset連續的處理消息。而通過Offset從存儲層中獲取消息大致分為兩步:

  • 第一步,根據Offset找到所屬的Segment文件

  • 第二步,從Segment中獲取對應Offset的消息數據

其中第一步可以直接根據Segment的文件名進行查找(上面已經介紹了Segment的文件面就是它包含的數據的起始Offset)。第二步則需要一些索引信息來快速定位目標數據在Segment中的位置,否則就要讀取整個Segment文件了,這里需要的索引信息就是上面的index文件存儲的內容。

 

 

index文件中存儲的是Offset和Position(Offset對應的消息在log文件中的偏移量)的對應關系,這樣當有Offset時可以快速定位到Position讀取BatchRecord,然后再從BatchRecord中獲取某一條消息。比如上述Offset25會被定位到24這個BatchRecord,然后再從這個BatchRecord中取出第二個Record(24這個BatchRecord包含了24、25兩個Record)。

注意,Kafka並不會為每個Record都保存一個索引,而是根據log.index.interval.bytes等配置構建稀疏的索引信息。

除了index索引文件保存Offset和Position的映射關系外,Kafka中還維護了timeindex,保存了Timestamp和Offset的關系,用於應對一些場景需要根據timestamp來定位消息。timeindex中的一個(timestampX,offsetY)元素的含義是所有創建時間大於timestampX的消息的Offset都大於offsetY。

 

 同樣的,timeindex也采用了稀疏索引的機制,使用和index相同的配置(log.index.interval.bytes),所以timeindex和index是一一對應的。

 

總結

本文首先介紹了Kafka消息的存儲格式,然后介紹了Kafka是如何索引(index & timeindex)存儲的數據的。看完索引部分后遺留了一個疑問:每次讀取消息都要先根據索引讀取Position信息,然后再根據Position去讀數據,而索引又是稀疏索引(查找索引也是要開銷的),這樣效率是否會比較低呢?如果利用Consumer總是順序讀取消息的特性,每次讀取數據時都帶上一些上下文信息(比如上一次Offset對應的Position信息)直接去讀Log數據效率是否會更高?歡迎留言交流!

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM