Kafka設計解析(十六)Kafka 0.11消息設計


轉載自 huxihx,原文鏈接 【原創】Kafka 0.11消息設計

 

目錄

一、Kafka消息層次設計

1. v1格式

2. v2格式

二、v1消息格式

三、v2消息格式

四、測試對比

 

Kafka 0.11版本增加了很多新功能,包括支持事務、精確一次處理語義和冪等producer等,而實現這些新功能的前提就是要提供支持這些功能的新版本消息格式,同時也要維護與老版本的兼容性。本文將詳細探討Kafka 0.11新版本消息格式的設計,其中會着重比較新舊兩版本消息格式在設計上的異同。畢竟只有深入理解了Kafka的消息設計,我們才能更好地學習Kafka所提供的各種功能。    

一、Kafka消息層次設計

不管是0.11版本還是之前的版本,Kafka的消息層次都是分為兩層:消息集合(message set)以及消息(message)。一個消息集合中包含若干多條日志項,而每個日志項封裝了消息以及其他一些元數據。Kafka底層的消息日志則由一系列消息集合日志項組成的。Kafka不會在消息這個層面上直接操作,它總是在消息集合這個層面上進行寫入操作。

新舊兩個版本對這兩個層次的設計都有很大區別,我們下面分開來說。不過在深入到具體版本之前,我們先要明確一些基本術語。

首先,我會遵循Kafka社區的規范,稱老版本消息格式為v1,新版本格式為v2。另外這里所指的老版本是指包含了時間戳(timestamp)字段的消息格式,更早之前的消息格式不在本文討論的范圍。其次,消息集合和消息在新舊版本對應的類名也有些許區別:

1. v1格式

在0.11版本之前,消息集合對應的類是org.apache.kafka.common.record.Records,消息是o.a.k.common.record.Record。消息集合中的每一項被稱為日志項(log entry),你可以理解每個日志項都是一個batch。

2. v2格式

0.11版本中消息集合對應的類是o.a.k.common.record.RecordBatch,消息依然是o.a.k.common.record.Record。特別注意這里的RecordBatch,如果你在之前的Kafka版本中搜尋RecordBatch類,你會發現在老版本中它指代的是Java producer端的消息batch——java producer將待發送消息收集起來,然后根據topic分區執行分組操作,其分組結果就保存在多個RecordBatch中。但是在新版本中,RecordBatch指的是普通的消息集合,producer端的分組batch由類o.a.k.clients.producer.internals.ProducerBatch來負責。各位千萬不要混淆!

okay,了解了基本的術語,我們分別討論下老版本、新版本的消息格式。

二、v1消息格式

在0.11版本之前,Kafka的消息格式如下圖所示: 

圖中各個字段的含義很清晰,這里不再贅述。從上圖中我們可以計算出來一條普通的Kafka消息的頭部開銷——這里姑且稱為頭部,header,但不要和新版本的header混淆!下面會討論新版本的header。此版本的消息頭部開銷等於4 + 1 + 1 + 8 + 4 + 4 = 22字節,也就是說一條Kafka消息長度再小,也不可以小於22字節,否則會被Kafka視為corrupted。另外根據這張圖展示出來的格式,我們能夠很容易地計算每條Kafka消息的總長度。注意,這里我們討論的未壓縮消息。已壓縮消息的計算會復雜一些,故本文暫且不討論。

下面我們來做一些計算。假設有一條Kafka消息,key是“key”,value是“hello”,那么key的長度就是3,value的長度就是5,因此這條Kafka消息需要占用22 + 3 + 5 = 30字節;倘若另一條Kafka消息未指定key,而value依然是“hello”,那么Kafka會往key長度字段中寫入-1表明key是空,因而不再需要保存key信息,故總的消息長度= 22 + 5 = 27字節。當然value字段也可能是null——Kafka的log cleaner會定期地寫入這種被稱為tombstone消息,不過計算方法與key為空時是相同的。總之單條Kafka消息長度的計算是很簡單的,下面我們來說說消息集合日志項的計算。

老版本消息集合中的每一項的格式如下圖所示: 

如圖所示,每個消息集合中的日志項由日志項頭部+一條“淺層”消息構成。

  •  淺層(shallow)消息:如果是未壓縮消息,shallow消息就是消息本身;如果是壓縮消息,Kafka會將多條消息壓縮再一起封裝進這條淺層消息的value字段。這條淺層消息也被稱為wrapper消息,里面包含的消息被稱為內部消息,即inner message。由此可見,老版本的message batch中通常都只包含一條消息,即使是對於已壓縮消息而言,它也只是包含一條shallow消息。
  • 日志項頭部(log entry header):8字節的offset字段 + 4個字節的size字段,共計12個字節。其中offset保存的是這條消息的位移。對於未壓縮消息,它就是消息的位移;如果是壓縮消息,它表示wrapper消息中最后一條inner消息的位移。由此可見,給定一個老版本的消息集合倘若要尋找該消息集合的起始位移(base offset或starting offset)是一件很困難的事情,因為這通常都需要深度遍歷整個inner消息,這也就是意味着broker端需要執行解壓縮的操作,因此代價非常高。

下面我們來看下如何計算消息集合大小,還是拿之前的兩條Kafka消息為例。第一條消息被封裝進一個消息集合,那么該消息集合總的長度 = 12 + 30 = 42字節,而包含第二條未指定key消息的消息集合總長度 = 12 + 27 = 39字節。我們做個試驗來驗證下:

1. 創建一個測試topic,1個分區,replication-factor = 1,然后使用console-producer腳本發送一條消息,key=“key”,value=“hello”,然后驗證下底層文件日志大小是42字節。

bogon:kafka_0.10.2.1 huxi$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic test
Created topic "test".
bogon:kafka_0.10.2.1 huxi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=:
key:hello

輸出結果:

bogon:test-0 huxi$ pwd
/Users/huxi/SourceCode/testenv/datalogs/kafka_1/test-0
bogon:test-0 huxi$ ll *.log
-rw-r--r--  1 huxi  staff  42 Jul  6 11:36 00000000000000000000.log

可見,我們的計算是正確的。

2. 再使用console-producer腳本發送另一條消息,不指定key,value依然是“hello”,然后驗證下底層文件日志大小是42 + 39 = 81字節。

bogon:test-0 huxi$ ll *.log
-rw-r--r--  1 huxi  staff  81 Jul  6 11:39 00000000000000000000.log

結果再次證明我們的計算方法是正確的。不過,老版本的消息集合在設計上有一些弊端,包括:

  • 對空間的利用率不高,比如不論key和value的長度是多少,老版本消息都是用固定的4個字節來保存長度信息,比如value是100字節還是1000字節,v1消息都需要花費4個字節來保存整個值,但其實保存100這個值只需要7個比特就夠了,也就是說只用1個字節就可以,另外3個字節都是浪費的。如果你的系統中這種情況很常見的話,那么對於磁盤/內存空間的浪費是十分可觀的。
  • 老版本設計中的offset是消息集合的最后一條消息的offset,如果用戶想要獲取第一條消息的位移,必須要把所有消息解壓全部裝入內存然后反向遍歷才能獲取到。顯然這個代價是很大的
  • CRC的計算有些雞肋。老版本設計中每條消息都需要執行CRC校驗。但有些情況下我們不能想認為producer端發送的消息的CRC到consumer端消息時是不變的。比如如果用戶指定的時間戳類型是LOG_APPEND_TIME,那么在broker端會對消息時間戳字段進行更新,那么重新計算之后的CRC值就會發生變化;再比如broker端進行消息格式轉換也會帶來CRC的變化。鑒於這些情況,再對每條消息都執行CRC校驗就有點沒必要了,不僅浪費空間還耽誤CPU時間
  • 每次需要單條消息的總長度信息時都需要計算而得出,沒有使用一個字段來保存下來,解序列化效率不高。

鑒於以上這些弊端以及對0.11版本新功能支持的需要, Kafka社區重新設計了v2版本的消息來解決以上的問題。 

三、v2消息格式

v2版本依然分消息與消息集合兩個維度,只不過消息集合這個提法被消息batch所取代。v2版本的術語叫RecordBatch。我們先來看v2消息的格式,如下圖所示: 

這里的"可變長度"表示Kafka會根據具體的值來確定到底需要幾個字節來保存。為了序列化時降低所需的字節數,0.11版本借鑒了Google PB的Zig-zag編碼方式,使得絕對值較小的整數占用比較少的字節。這是符合Kafka消息使用場景的,畢竟在實際使用過程中,key或value很大的可能性並不高。比如key如果是一個有業務含義的字符串(這是很常見的使用方法),那么這個字符串的長度通常都不會太長,這樣大部分情況下只需要1~2個字節就可以保存了。這比v1版本中固定使用4個字節來保存要節省得多。如果要深入了解pb的編碼方式請參考:https://developers.google.com/protocol-buffers/docs/encoding

總之v2版本的消息格式比起v1有很大的變化。除了可變長度這一點,v2版本的屬性字段被棄用了,CRC被移除了,另外增加了消息總長度、時間戳增量(timestamp delta)、位移增量(offset delta)和headers信息。我們分別說下:

 

  • 消息總長度:直接計算出消息的總長度並保存在第一個字段中,而不需要像v1版本時每次需要重新計算。這樣做的好處在於提升解序列化的效率——拿到總長度后,Kafka可以直接new出一個等長度的ByteBuffer,然后裝填各個字段。同時有了這個總長度,在遍歷消息時可以實現快速地跳躍,省去了很多copy的工作。
  • 時間戳增量:消息時間戳與所屬record batch起始時間戳的差值,保存差值可以進一步節省消息字節數
  • 位移增量:消息位移與所屬record batch起始位移的差值,保存差值可以進一步節省消息字節數
  • headers:這和本文之前提到的所有header都無關。這是0.11版本引入的新字段。它是一個數組,里面的Header只有兩個字段:key和value,分別是String和byte[]類型。
  • v2版本不在對每條消息執行CRC校驗,而是針對整個batch
  • v2版本不在使用屬性字節,原先保存在屬性字段中的諸如壓縮類型、時間戳類型等信息都統一保存在外層的batch中

下面我們依然拿上面的Kafka消息舉例計算下0.11版本的消息長度是多少。假設這條Kafka消息的key是“key”,value是“hello”,同時假設這是batch中的第一條消息,因此時間戳增量和位移增量都是0,另外我們還假設沒有指定任何header,因此header數組個數是0。結合上圖我們可以計算這條消息的長度 = 總長度值占用的字節數 + 1 + 1 + 1 + 1 + 3 + 1 + 5 + 1 =  總長度值占用的字節數 + 14,由於14小於64,因此總長度值只需1個字節,故消息總長度是15字節。同時消息的第一個字節保存的值就是15。這里提一句為什么是64? 先前提到的Zigzag編碼會將有符號32位整數編碼成一個無符號整數,大致的思想是:

0   ---> 0
-1  ---> 1
1   ---> 2
-2    ---> 3
2   ---> 4
...

這樣做,我們不再需要為-1去保存32位的補碼,只需要1個字節就能保存-1,但是zigzag會將每個字節的第一個比特作為特殊之用,故每個字節只能有7位來做實際的編碼任務,也就是表示從0~127。上面的編碼表中我們可以發現正數都會被編碼成其2倍的數字,因此如果一旦上面例子中的長度超過了128/2=64,長度信息就需要用2個字節來保存。這就是上面64這個數字出現的原因。(希望我解釋清楚了。。。。) 我們再舉個例子,假設某條Kafka消息未指定key,value是“hello”,該消息在整個batch中的第100條且時間戳增量也是100,那么該消息總的字節數是=總長度值占用的字節數 + 1 + 2 + 2 + 1 + 1 + 5 + 1 = 總長度值占用的字節數 + 13 = 14字節。

談完了消息格式,我們終於可以說說record batch了。v2的batch格式非常復雜,不廢話了, 直接上圖: 

顯然這比v1版本的batch要復雜得多,簡單解釋一下它和v1的主要區別:

  • CRC被移動batch這一層,而非消息這一層
  • 屬性字段被擴充為2個字節,而不是之前的一個字節,其中第一個字節的低3位比特保存壓縮類型,第4個比特保存時間戳類型,第5個比特保存消息的事務類型(事務型消息和非事務型消息),第6個比特指定batch是否是control batch(control batch以及control message用於支持事務)
  • PID、producer epoch、序列號等信息都是為了實現冪等producer之用,故本文不做詳細展開(其實我也不會😭)
  • 總的overhead是61個字節,看上去比v1大了不少。但v2版本支持batch中包含多條消息,所以引入新字段的開銷被攤薄到每條消息上,整體節省了空間
和v2版本一樣,我們來看下如何計算消息集合大小,還是以上面的兩條Kafka消息為例。第一條消息被封裝進一個batch,那么該batch總的長度 = 61 + 15 = 76字節。我們做個試驗來驗證下:
bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
Created topic "test".
 
bin/kafka-console-producer.sh --topic test --broker-list localhost:9092 --property parse.key=true --property key.separator=:
>key:hello
^C
bogon:test-0 huxi$ pwd
/Users/huxi/SourceCode/newenv/datalogs/kafka_1/test-0
bogon:test-0 huxi$ ll *.log
-rw-r--r--  1 huxi  staff  76 Jul  6 14:38 00000000000000000000.log

 由此可見,我們的計算是正確的。

四、測試對比

如果拿v1和v2的測試結果來看,似乎v2版本占用的磁盤空間反而增加了,這是因為我們的試驗中每個batch只有一條消息,如果我們改用java API程序來批量發送消息的話,我們就會發現兩者的不同。下面簡單地做個試驗:

給v1和v2版本的topic各自發1千萬條消息,value平均大小是500字節,我們比較下兩個版本所占用的磁盤空間: 

由此可見,在未有任何調優的情況下,v2版本消息格式確實可以節省磁盤空間。我們可以說0.11版本的Kafka消息在支持事務、冪等producer的同時還一定程度上減少了網絡IO和磁盤IO的開銷。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM