kafka消息格式演變


主要基於下面博文進行學習與驗證
一文看懂kafka消息格式演變

概述

Kafka根據topic(主題)對消息進行分類,發布到Kafka集群的每條消息都需要指定一個topic,每個topic將被分為多個partition(分區)。每個partition在存儲層面是追加log(日志)文件,任何發布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型的數值,它唯一標記一條消息。

1、v0版消息格式(kafka 0.10之前的版本)

crc32(4B):crc32校驗值。校驗范圍為magic至value之間。
magic(1B):消息格式版本號,此版本的magic值為0。
attributes(1B):消息的屬性。總共占1個字節,低3位表示壓縮類型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
key length(4B):表示消息的key的長度。如果為-1,則表示沒有設置key,即key=null。
key:可選,如果沒有key則無此字段。
value length(4B):實際消息體的長度。如果為-1,則表示消息為空。
value:消息體。可以為空,比如tomnstone消息。

2、v1版本(從0.10.0版本開始到0.11.0版本之前的版本)


v1版本比v0版本多一個8B的timestamp字段;

timestamp字段作用:
內部而言:影響日志保存、切分策略;
外部而言:影響消息審計、端到端延遲等功能的擴展

2.1 消息壓縮

常見的壓縮算法是數據量越大壓縮效果越好,一條消息通常不會太大,這就導致壓縮效果並不太好。而kafka實現的壓縮方式是將多條消息一起進行壓縮,這樣可以保證較好的壓縮效果。而且在一般情況下,生產者發送的壓縮數據在kafka broker中也是保持壓縮狀態進行存儲,消費者從服務端獲取也是壓縮的消息,消費者在處理消息之前才會解壓消息,這樣保持了端到端的壓縮。

壓縮后的消息格式

3、v2版本(0.11.0版本及之后的版本)

相對v0和v1改動較大,引入了變長整形Varints和ZigZag編碼。

Varints作用:根據數值的大小,調整占用的字節數,數值越小,占用的字節數就越小
      0-63之間的數字占1個字節,64-8191之間的數字占2個字節,8192-1048575之間的數字占3個字節
      kafka broker的配置message.max.bytes的默認大小為1000012(Varints編碼占3個字節)

ZigZag編碼:使絕對值較小的負數仍然享有較小的Varints編碼值

V2版本消息集稱為Record Batch(v0和v1稱為Message Set),相較於V0、V1版本

    (1)將多個消息(Record)打包存放到單個RecordBatch中,v2版本的單個Record Batch Header相較於v0、v1版本的多個Log_OVERHEAD(每個Record都會有1個LOG_OVERHEARD),會節省空間;
    (2)引入變長整形Varints和ZigZag編碼,能夠靈活的節省空間

附錄

1、日志消息格式驗證

(1)新建一個分區為1、副本為1的topic,名稱為msg_format_v0

bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0

(2)向topic中寫入key="key", value="value"消息

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("msg_format_v0", "key", "value"));

        producer.close();

    }
}

pom依賴

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <!-- 根據測試的kafka版本,需用不同版本的依賴 -->
      <version>0.8.2.1</version>
    </dependency>

(3)日志消息大小預期與驗證

預期

TotalSzie = LOG_OVERHEAD + RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B

驗證

root@hadoop2 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3

(4)向topic中寫入key=null, value="value"消息

        Producer<Object, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("msg_format_v0", "value"));

(5)日志消息大小預期與驗證

預期

TotalSize = pre_size + LOG_OVERHEAD + RECORD_OVERHEAD_V0 + 0B的key + 5B的value = 34 + 12 + 14 + 5 = 65B

驗證

[root@hadoop2 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /data/kafka-logs/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
offset: 1 position: 34 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 2898297856

參考:

(1)一文看懂kafka消息格式演變


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM