【原創】Kakfa message包源代碼分析


筆者最近在研究Kafka的message包代碼,有了一些心得,特此記錄一下。其實研究的目的從來都不是只是看源代碼,更多地是想借這個機會思考幾個問題:為什么是這么實現的?你自己實現方式是什么?比起人家的實現方式,你的方案有哪些優缺點?
任何消息引擎系統最重要的都是定義消息,使用什么數據結構來保存消息和消息隊列?剛剛思考這個問題的時候,我自己嘗試實現了一下Message的定義:
public class Message implements Serializable { private CRC32 crc; private short magic; private boolean codecEnabled; private short codecClassOrdinal; private String key; private String body; }

可以看出,實現的方式是非常朴素和簡單的,基本上就是由一個CRC32校驗碼、一個magic域、2個表明壓縮類的字段以及兩個表明消息的鍵值和消息本身的字符串組成。當與源代碼中的實現方式做了比較時才發現自己的實現方式實在是too simple, sometimes naive了。在Java的內存模型中,對象保存的開銷其實相當大,通常都要花費至少2倍以上的空間來保存數據(甚至更糟)。另外,隨着堆數據越來越大,GC的性能下降得很多,將會變得非常緩慢。

在上面的實現中JMM會為字段進行重排以減少內存使用:

 

1 public class Message implements Serializable { 2     private short magic; 3     private short codecKlassOrdinal; 4     private boolean codecEnabled; 5     private CRC32 crc; 6     private String key; 7     private String body; 8 }

 

即使是這樣,上面朴素的實現仍然需要40字節,而其中有7個字節只是為了補齊(padding)。Kafka實現的方式是使用nio.ByteBuffer來保存消息,同時依賴文件系統提供的頁緩存機制,而不是依靠堆緩存。畢竟通常情況下,堆上保存的對象很有可能在os的頁緩存中還保存一份,造成了資源的浪費。ByteBuffer是二進制的緊湊字節結構,而不是獨立的對象,因此我們至少能夠訪問多一倍的可用內存。按照Kafka官網的說法,在一台32GB內存的機器上,Kafka幾乎能用到28~30GB的物理內存同時還不比擔心GC的糟糕性能。如果使用ByteBuffer來保存同樣的消息,只需要24個字節,比起純Java堆的實現減少了40%的空間占用,好處不言而喻。這種設計的好處還有加入了擴展的可能性。下圖就是Kafka中Message的實現方式:

message包中的Message.scala中定義了Message,以伴生對象的方式實現了Message的定義。首先定義了object Message,里面有很多目前Message定義的常量。不過
基本思想就是為每個域提供2個字段:offset和長度,這樣就可以很容易地定位該域中的任何一個字節處。
具體的域有: crc + magic + attribute + key + value,其中key + value 稱為message overhead。雖然代碼中起的名字都是offset,位移,但其實你可以理解為對應域在bytebuffer中的起始位移位置。由於個人是不太主張通篇把源代碼貼上來的,畢竟大家都能下載到,所以這里就不全篇貼代碼了。
 
  值得注意的是,消息中的attributes字段。attributes用了一個字節來表示,總共有8位可以使用,目前使用了后三位作為codec類,其實從目前的代碼來看,完全可以采用2位來表示是否啟用壓縮,目前3位的做法可能也是以后擴展方便。
0:無壓縮
1:GZIP,也是默認的壓縮方法
2:Snappy
3:LZ4
 
在定義了message的一些常量之后,一個Message class被創建,它的主構造器函數接收一個BufferByte作為對象,即將這段BufferByte緩沖區中的數據封裝成一個Message對象,並提供了很多方法。比如:
computeCheckSum ——根據消息內容計算crc值
checksum——返回該消息頭部的crc值
isValid——比較前兩個方法中得到的crc值是否相同,
ensureValid——如果isValid不等的話報錯
size——消息總的字節數
keySize/hasKey——消息中key的長度.如果長度>=0,即視為有key
payloadSizeOffset ---- value size保存的起始位移
isNull --- 判斷payloadSize < 0
sliceDelimited——從給定位移處讀取4個字節的內容並創建一個新的bytebuffer返回
equals——過Bytebuffer的equals方法比較Message包含的ByteBuffer
 
目前Message的定義支持壓縮,attributes屬性字節中的最后三位被用作表示codec。而目前Kafka支持的codec是由CompressionCodec和CompressionFactory兩個scala文件定義的。CompressionCodec通過創建了sealed trait CompressionCodecde的方式,使得所有實現(因為trait類似於java中的接口,我們這里也就順着接口實現的方式稱之為實現)它的子類也必須在這個文件中。每個codec都有一個編號和一個名字。目前編號為0表示無壓縮,支持的壓縮格式有GZIP, Snappy和LZ4。
  
Kafka還提供了CompressionCodecFactory object提供工廠方法分別建立帶壓縮解壓縮功能的輸入流、輸出流。值得注意的是在clients工程中org.apache.kafka.common.message包中提供了2個v1.4.1 LZ4 Frame format的部分實現: KafkaLZ4BlockInputStream和KafkaLZ4BlockOutputStream。由於壓縮算法不在我們討論的范圍,故不做深入討論。
 
okay!說完了Message.scala和兩個codec支持類:CompressionFactory和CompressionCodec,我們再看看哪個文件還沒說過
先找軟柿子捏! InvalidMessageException和MessageLengthException就不說了,一個是crc校驗碼不匹配,一個是消息長度超過最大限度,不過貌似后者也沒用上。另外,我們先跳過ByteBufferBackedInputStream和ByteBufferMessageSet這兩個scala文件。先看MessageAndMetadata:這個case類是一個帶泛型類:接收一個topic,分區號和原始的消息Message,位移信息,並提供了2個方法:key()和value():分別返回消息的key和value。其實筆者這里並沒有弄明白為什么接收offset,而且貌似在代碼中也沒有用到。從調用這個類的方法給定的參數來看,這個offset應該是指某個分區log的位移。
 
MessageAndOffset就比較簡單了,給定一個Message和位移。它提供了nextOffset方法,另外因為是case class,也提供了2個構造器參數的reader方法。
 
說完MessageAndOffset,我們就可以說說MessageSet了。這個包第二重要的(至少我是這么認為的)就是MessageSet了:MessageSet就是消息的集合,以抽象類的方式實現。這個集合保存的就是消息的字節形式,類似於字節容器的作用,可能在內存中也有可能在磁盤上。kafka代碼中有不同的類繼承這個類,分別實現了on-disk和in-memory. 注意的是集合中的對象並不單純地是Message,而是offset field + message size field + Message field的組合。目前還沒有弄明白為什么需要中間的那個字段值,畢竟message.size也能獲得message的字節數,這樣豈不是能節省4個字節? 也許后面能告訴我答案吧。這個抽象類提供了三個抽象方法供它的子類實現:
writeTo——將消息集合寫入到指定的channel中,從offset開始寫,最多不能超過maxSize
iterator——迭代器方法,用於遍歷MessageSet
sizeInBytes——計算MessageSet中總的字節數
 
而ByteBufferMessageSet就比較復雜了,至少代碼很長。從全局來看,實現還是伴生對象的方式,一個object,一個class。ByteBufferMessageSet class接收一個ByteBuffer,創建一個message set。有2種方式創建方式:一種是從bytebuffer中創建(消費者進程使用的這種模式);還有一種方式是給定一個消息列表以及相對應的序列化格式——消費者進程使用這種方式。另外還提供了3個輔助構造函數,都是調用了ByteBufferMessageSet object的create方法創建一個ByteBuffer傳給ByteBufferMessageSet。其他方法包括: 
sizeInBytes方法:返回這個消息集合總的字節數,包括那些不完整的尾部消息,這也是實現了抽象類MessageSet的方法
writeTo方法:將消息集合中的消息寫入指定的channel,並返回寫入的字節數,這也是實現了抽象類MessageSet的方法
internalIterator方法:其實實現的是父類MessageSet的iterator方法,返回消息集合的迭代器,用於遍歷消息集合。該迭代器有個boolean類型的開關。如果設置為true,就是遍歷第一級的消息,也就是說不考慮其元素是壓縮過的消息集合的情況;如果是true,則外層遍歷時候還需要解壓壓縮過的消息集合並執行內部遍歷——即將包含消息集合的消息集合扁平化遍歷 
assignOffsets——更新位移信息。如果沒有壓縮情況,直接原地更新;如果存在壓縮的情況,使用internalIterator方法重新拷貝一份新的ByteBufferMessageSet並指定了offset返回
 
總之,Kafka的message定義了消息和消息隊列或消息集合的數據結構供其他的組件使用,在Kafka的其他核心組件的源代碼中我們會陸續看到它的使用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM