轉載自 huxihx,原文鏈接 【原創】Kafka 0.11消息設計
目錄
一、Kafka消息層次設計
1. v1格式
2. v2格式
二、v1消息格式
三、v2消息格式
四、測試對比
一、Kafka消息層次設計
不管是0.11版本還是之前的版本,Kafka的消息層次都是分為兩層:消息集合(message set)以及消息(message)。一個消息集合中包含若干多條日志項,而每個日志項封裝了消息以及其他一些元數據。Kafka底層的消息日志則由一系列消息集合日志項組成的。Kafka不會在消息這個層面上直接操作,它總是在消息集合這個層面上進行寫入操作。
新舊兩個版本對這兩個層次的設計都有很大區別,我們下面分開來說。不過在深入到具體版本之前,我們先要明確一些基本術語。
首先,我會遵循Kafka社區的規范,稱老版本消息格式為v1,新版本格式為v2。另外這里所指的老版本是指包含了時間戳(timestamp)字段的消息格式,更早之前的消息格式不在本文討論的范圍。其次,消息集合和消息在新舊版本對應的類名也有些許區別:
1. v1格式
在0.11版本之前,消息集合對應的類是org.apache.kafka.common.record.Records,消息是o.a.k.common.record.Record。消息集合中的每一項被稱為日志項(log entry),你可以理解每個日志項都是一個batch。
2. v2格式
0.11版本中消息集合對應的類是o.a.k.common.record.RecordBatch,消息依然是o.a.k.common.record.Record。特別注意這里的RecordBatch,如果你在之前的Kafka版本中搜尋RecordBatch類,你會發現在老版本中它指代的是Java producer端的消息batch——java producer將待發送消息收集起來,然后根據topic分區執行分組操作,其分組結果就保存在多個RecordBatch中。但是在新版本中,RecordBatch指的是普通的消息集合,producer端的分組batch由類o.a.k.clients.producer.internals.ProducerBatch來負責。各位千萬不要混淆!
okay,了解了基本的術語,我們分別討論下老版本、新版本的消息格式。
二、v1消息格式
在0.11版本之前,Kafka的消息格式如下圖所示:

圖中各個字段的含義很清晰,這里不再贅述。從上圖中我們可以計算出來一條普通的Kafka消息的頭部開銷——這里姑且稱為頭部,header,但不要和新版本的header混淆!下面會討論新版本的header。此版本的消息頭部開銷等於4 + 1 + 1 + 8 + 4 + 4 = 22字節,也就是說一條Kafka消息長度再小,也不可以小於22字節,否則會被Kafka視為corrupted。另外根據這張圖展示出來的格式,我們能夠很容易地計算每條Kafka消息的總長度。注意,這里我們討論的未壓縮消息。已壓縮消息的計算會復雜一些,故本文暫且不討論。
下面我們來做一些計算。假設有一條Kafka消息,key是“key”,value是“hello”,那么key的長度就是3,value的長度就是5,因此這條Kafka消息需要占用22 + 3 + 5 = 30字節;倘若另一條Kafka消息未指定key,而value依然是“hello”,那么Kafka會往key長度字段中寫入-1表明key是空,因而不再需要保存key信息,故總的消息長度= 22 + 5 = 27字節。當然value字段也可能是null——Kafka的log cleaner會定期地寫入這種被稱為tombstone消息,不過計算方法與key為空時是相同的。總之單條Kafka消息長度的計算是很簡單的,下面我們來說說消息集合日志項的計算。
老版本消息集合中的每一項的格式如下圖所示:
如圖所示,每個消息集合中的日志項由日志項頭部+一條“淺層”消息構成。
- 淺層(shallow)消息:如果是未壓縮消息,shallow消息就是消息本身;如果是壓縮消息,Kafka會將多條消息壓縮再一起封裝進這條淺層消息的value字段。這條淺層消息也被稱為wrapper消息,里面包含的消息被稱為內部消息,即inner message。由此可見,老版本的message batch中通常都只包含一條消息,即使是對於已壓縮消息而言,它也只是包含一條shallow消息。
- 日志項頭部(log entry header):8字節的offset字段 + 4個字節的size字段,共計12個字節。其中offset保存的是這條消息的位移。對於未壓縮消息,它就是消息的位移;如果是壓縮消息,它表示wrapper消息中最后一條inner消息的位移。由此可見,給定一個老版本的消息集合倘若要尋找該消息集合的起始位移(base offset或starting offset)是一件很困難的事情,因為這通常都需要深度遍歷整個inner消息,這也就是意味着broker端需要執行解壓縮的操作,因此代價非常高。
下面我們來看下如何計算消息集合大小,還是拿之前的兩條Kafka消息為例。第一條消息被封裝進一個消息集合,那么該消息集合總的長度 = 12 + 30 = 42字節,而包含第二條未指定key消息的消息集合總長度 = 12 + 27 = 39字節。我們做個試驗來驗證下:
1. 創建一個測試topic,1個分區,replication-factor = 1,然后使用console-producer腳本發送一條消息,key=“key”,value=“hello”,然后驗證下底層文件日志大小是42字節。
bogon:kafka_0.10.2.1 huxi$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic test Created topic "test". bogon:kafka_0.10.2.1 huxi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=: key:hello
輸出結果:
bogon:test-0 huxi$ pwd /Users/huxi/SourceCode/testenv/datalogs/kafka_1/test-0 bogon:test-0 huxi$ ll *.log -rw-r--r-- 1 huxi staff 42 Jul 6 11:36 00000000000000000000.log
可見,我們的計算是正確的。
2. 再使用console-producer腳本發送另一條消息,不指定key,value依然是“hello”,然后驗證下底層文件日志大小是42 + 39 = 81字節。
bogon:test-0 huxi$ ll *.log -rw-r--r-- 1 huxi staff 81 Jul 6 11:39 00000000000000000000.log
結果再次證明我們的計算方法是正確的。不過,老版本的消息集合在設計上有一些弊端,包括:
- 對空間的利用率不高,比如不論key和value的長度是多少,老版本消息都是用固定的4個字節來保存長度信息,比如value是100字節還是1000字節,v1消息都需要花費4個字節來保存整個值,但其實保存100這個值只需要7個比特就夠了,也就是說只用1個字節就可以,另外3個字節都是浪費的。如果你的系統中這種情況很常見的話,那么對於磁盤/內存空間的浪費是十分可觀的。
- 老版本設計中的offset是消息集合的最后一條消息的offset,如果用戶想要獲取第一條消息的位移,必須要把所有消息解壓全部裝入內存然后反向遍歷才能獲取到。顯然這個代價是很大的
- CRC的計算有些雞肋。老版本設計中每條消息都需要執行CRC校驗。但有些情況下我們不能想認為producer端發送的消息的CRC到consumer端消息時是不變的。比如如果用戶指定的時間戳類型是LOG_APPEND_TIME,那么在broker端會對消息時間戳字段進行更新,那么重新計算之后的CRC值就會發生變化;再比如broker端進行消息格式轉換也會帶來CRC的變化。鑒於這些情況,再對每條消息都執行CRC校驗就有點沒必要了,不僅浪費空間還耽誤CPU時間
- 每次需要單條消息的總長度信息時都需要計算而得出,沒有使用一個字段來保存下來,解序列化效率不高。
鑒於以上這些弊端以及對0.11版本新功能支持的需要, Kafka社區重新設計了v2版本的消息來解決以上的問題。
三、v2消息格式
v2版本依然分消息與消息集合兩個維度,只不過消息集合這個提法被消息batch所取代。v2版本的術語叫RecordBatch。我們先來看v2消息的格式,如下圖所示:

這里的"可變長度"表示Kafka會根據具體的值來確定到底需要幾個字節來保存。為了序列化時降低所需的字節數,0.11版本借鑒了Google PB的Zig-zag編碼方式,使得絕對值較小的整數占用比較少的字節。這是符合Kafka消息使用場景的,畢竟在實際使用過程中,key或value很大的可能性並不高。比如key如果是一個有業務含義的字符串(這是很常見的使用方法),那么這個字符串的長度通常都不會太長,這樣大部分情況下只需要1~2個字節就可以保存了。這比v1版本中固定使用4個字節來保存要節省得多。如果要深入了解pb的編碼方式請參考:https://developers.google.com/protocol-buffers/docs/encoding
總之v2版本的消息格式比起v1有很大的變化。除了可變長度這一點,v2版本的屬性字段被棄用了,CRC被移除了,另外增加了消息總長度、時間戳增量(timestamp delta)、位移增量(offset delta)和headers信息。我們分別說下:
- 消息總長度:直接計算出消息的總長度並保存在第一個字段中,而不需要像v1版本時每次需要重新計算。這樣做的好處在於提升解序列化的效率——拿到總長度后,Kafka可以直接new出一個等長度的ByteBuffer,然后裝填各個字段。同時有了這個總長度,在遍歷消息時可以實現快速地跳躍,省去了很多copy的工作。
- 時間戳增量:消息時間戳與所屬record batch起始時間戳的差值,保存差值可以進一步節省消息字節數
- 位移增量:消息位移與所屬record batch起始位移的差值,保存差值可以進一步節省消息字節數
- headers:這和本文之前提到的所有header都無關。這是0.11版本引入的新字段。它是一個數組,里面的Header只有兩個字段:key和value,分別是String和byte[]類型。
- v2版本不在對每條消息執行CRC校驗,而是針對整個batch
- v2版本不在使用屬性字節,原先保存在屬性字段中的諸如壓縮類型、時間戳類型等信息都統一保存在外層的batch中
下面我們依然拿上面的Kafka消息舉例計算下0.11版本的消息長度是多少。假設這條Kafka消息的key是“key”,value是“hello”,同時假設這是batch中的第一條消息,因此時間戳增量和位移增量都是0,另外我們還假設沒有指定任何header,因此header數組個數是0。結合上圖我們可以計算這條消息的長度 = 總長度值占用的字節數 + 1 + 1 + 1 + 1 + 3 + 1 + 5 + 1 = 總長度值占用的字節數 + 14,由於14小於64,因此總長度值只需1個字節,故消息總長度是15字節。同時消息的第一個字節保存的值就是15。這里提一句為什么是64? 先前提到的Zigzag編碼會將有符號32位整數編碼成一個無符號整數,大致的思想是:
0 ---> 0 -1 ---> 1 1 ---> 2 -2 ---> 3 2 ---> 4 ...
這樣做,我們不再需要為-1去保存32位的補碼,只需要1個字節就能保存-1,但是zigzag會將每個字節的第一個比特作為特殊之用,故每個字節只能有7位來做實際的編碼任務,也就是表示從0~127。上面的編碼表中我們可以發現正數都會被編碼成其2倍的數字,因此如果一旦上面例子中的長度超過了128/2=64,長度信息就需要用2個字節來保存。這就是上面64這個數字出現的原因。(希望我解釋清楚了。。。。) 我們再舉個例子,假設某條Kafka消息未指定key,value是“hello”,該消息在整個batch中的第100條且時間戳增量也是100,那么該消息總的字節數是=總長度值占用的字節數 + 1 + 2 + 2 + 1 + 1 + 5 + 1 = 總長度值占用的字節數 + 13 = 14字節。
談完了消息格式,我們終於可以說說record batch了。v2的batch格式非常復雜,不廢話了, 直接上圖:
顯然這比v1版本的batch要復雜得多,簡單解釋一下它和v1的主要區別:
- CRC被移動batch這一層,而非消息這一層
- 屬性字段被擴充為2個字節,而不是之前的一個字節,其中第一個字節的低3位比特保存壓縮類型,第4個比特保存時間戳類型,第5個比特保存消息的事務類型(事務型消息和非事務型消息),第6個比特指定batch是否是control batch(control batch以及control message用於支持事務)
- PID、producer epoch、序列號等信息都是為了實現冪等producer之用,故本文不做詳細展開(其實我也不會😭)
- 總的overhead是61個字節,看上去比v1大了不少。但v2版本支持batch中包含多條消息,所以引入新字段的開銷被攤薄到每條消息上,整體節省了空間
bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1 Created topic "test". bin/kafka-console-producer.sh --topic test --broker-list localhost:9092 --property parse.key=true --property key.separator=: >key:hello ^C
bogon:test-0 huxi$ pwd /Users/huxi/SourceCode/newenv/datalogs/kafka_1/test-0 bogon:test-0 huxi$ ll *.log -rw-r--r-- 1 huxi staff 76 Jul 6 14:38 00000000000000000000.log
由此可見,我們的計算是正確的。
四、測試對比
如果拿v1和v2的測試結果來看,似乎v2版本占用的磁盤空間反而增加了,這是因為我們的試驗中每個batch只有一條消息,如果我們改用java API程序來批量發送消息的話,我們就會發現兩者的不同。下面簡單地做個試驗:
給v1和v2版本的topic各自發1千萬條消息,value平均大小是500字節,我們比較下兩個版本所占用的磁盤空間:
由此可見,在未有任何調優的情況下,v2版本消息格式確實可以節省磁盤空間。我們可以說0.11版本的Kafka消息在支持事務、冪等producer的同時還一定程度上減少了網絡IO和磁盤IO的開銷。