主要基於下面博文進行學習與驗證
一文看懂kafka消息格式演變
概述
Kafka根據topic(主題)對消息進行分類,發布到Kafka集群的每條消息都需要指定一個topic,每個topic將被分為多個partition(分區)。每個partition在存儲層面是追加log(日志)文件,任何發布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型的數值,它唯一標記一條消息。
1、v0版消息格式(kafka 0.10之前的版本)
crc32(4B):crc32校驗值。校驗范圍為magic至value之間。
magic(1B):消息格式版本號,此版本的magic值為0。
attributes(1B):消息的屬性。總共占1個字節,低3位表示壓縮類型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
key length(4B):表示消息的key的長度。如果為-1,則表示沒有設置key,即key=null。
key:可選,如果沒有key則無此字段。
value length(4B):實際消息體的長度。如果為-1,則表示消息為空。
value:消息體。可以為空,比如tomnstone消息。
2、v1版本(從0.10.0版本開始到0.11.0版本之前的版本)
v1版本比v0版本多一個8B的timestamp字段;
timestamp字段作用:
內部而言:影響日志保存、切分策略;
外部而言:影響消息審計、端到端延遲等功能的擴展
2.1 消息壓縮
常見的壓縮算法是數據量越大壓縮效果越好,一條消息通常不會太大,這就導致壓縮效果並不太好。而kafka實現的壓縮方式是將多條消息一起進行壓縮,這樣可以保證較好的壓縮效果。而且在一般情況下,生產者發送的壓縮數據在kafka broker中也是保持壓縮狀態進行存儲,消費者從服務端獲取也是壓縮的消息,消費者在處理消息之前才會解壓消息,這樣保持了端到端的壓縮。
3、v2版本(0.11.0版本及之后的版本)
相對v0和v1改動較大,引入了變長整形Varints和ZigZag編碼。
Varints作用:根據數值的大小,調整占用的字節數,數值越小,占用的字節數就越小
0-63之間的數字占1個字節,64-8191之間的數字占2個字節,8192-1048575之間的數字占3個字節
kafka broker的配置message.max.bytes的默認大小為1000012(Varints編碼占3個字節)
ZigZag編碼:使絕對值較小的負數仍然享有較小的Varints編碼值
V2版本消息集稱為Record Batch(v0和v1稱為Message Set),相較於V0、V1版本
(1)將多個消息(Record)打包存放到單個RecordBatch中,v2版本的單個Record Batch Header相較於v0、v1版本的多個Log_OVERHEAD(每個Record都會有1個LOG_OVERHEARD),會節省空間;
(2)引入變長整形Varints和ZigZag編碼,能夠靈活的節省空間
附錄
1、日志消息格式驗證
(1)新建一個分區為1、副本為1的topic,名稱為msg_format_v0
bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0
(2)向topic中寫入key="key", value="value"消息
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("msg_format_v0", "key", "value"));
producer.close();
}
}
pom依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- 根據測試的kafka版本,需用不同版本的依賴 -->
<version>0.8.2.1</version>
</dependency>
(3)日志消息大小預期與驗證
預期
TotalSzie = LOG_OVERHEAD + RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B
驗證
root@hadoop2 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
(4)向topic中寫入key=null, value="value"消息
Producer<Object, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("msg_format_v0", "value"));
(5)日志消息大小預期與驗證
預期
TotalSize = pre_size + LOG_OVERHEAD + RECORD_OVERHEAD_V0 + 0B的key + 5B的value = 34 + 12 + 14 + 5 = 65B
驗證
[root@hadoop2 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
offset: 1 position: 34 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 2898297856