kafka是如何壓縮消息的?要弄清楚這個問題,就要從kafka的消息格式說起。kafka的消息層次分為兩層:消息集合(message set)以及消息(message)。一個消息集合包含若干條日志項(record item),而日志項才是真正封裝消息的地方。kafka底層的消息日志由一系列消息集合日志項組成。kafka通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。
在kafka中,壓縮可能會發生在兩個地方:生產者端和broker端。
生產者程序中配置compression.type參數即表示啟動指定類型的壓縮算法。
public class KafkaProduce { public void kafkaProducer() throws Exception { Properties pro = new Properties(); ...//其他配置參數 pro.put("partitioner.class", "kafka.KafkaPartitioner"); // 啟用壓縮 pro.put("compression.type", "gzip"); KafkaProducer config = new KafkaProducer(pro); }
它表明producer的壓縮算法使用的是gzip。這樣producer啟動后生產的每個消息集合都是經過gzip壓縮過的,故而能很好地節省網絡傳輸帶寬已經broker端的磁盤占用。
何時壓縮?
大部分情況下,broker從producer端接收到消息后僅僅是原封不動地保存,而不會對其進行任何修改,但這個“大部分情況”也是要滿足條件。有兩種例外的情況會讓broker端重新壓縮消息。
情況一:broker端指定了和producer端不同的壓縮算法。
kafka broker端也有一個參數叫compression.type,和producer端的參數設置一樣。但是這個參數的默認值是producer,表示broker端會尊重producer端使用的壓縮算法,可一旦你在broker端設置了不同的compression.type值,就一定要小心了,因為可能會發生預料之外的壓縮/解壓縮操作,導致broker端CPU使用率飆升。比如broker端接收到gzip壓縮消息后,broker端指定了snappy壓縮算法,這樣broker只能解壓縮然后使用snappy重新壓縮一遍。
情況二:broker端發送了消息格式變化。
所謂的消息格式變化主要是為了兼容老版本的消費者程序。在一個生產環境中,kafka集群中同時保存多種版本的消息格式非常常見。為了兼容老版本的格式,broker端會對新版本消息執行向老版本格式的轉換。這個過程就會涉及到消息的解壓和重新壓縮。一般情況下這種消息格式的轉換對性能是有很大的影響的,除了,這里講的壓縮外,還會讓kafka喪失了引以為豪的zero copy特性。所以盡量保證消息格式的統一,這樣不僅可以避免不必要的解壓縮/重新壓縮,對提升其他方面的性能也很有裨益。
何時解壓縮?
通常來說解壓縮發生在消費者程序中。kafka會將啟用了哪種壓縮算法封裝進消息集合中,這樣當consumer讀取到集合時,它自然就知道了這些消息使用了哪種壓縮算法。用一句話總結:producer端壓縮,broker端保持,consumer端解壓縮。
broker端也會進行解壓縮,注意了,和前面提到的場景不一樣。每個壓縮過的消息集合在broker端寫入時都要發生解壓縮操作,目的就是為了對消息執行各種驗證,這種解壓縮對broker端性能是有一定的影響,特別是對CPU使用率而言。
各種壓縮算法對比
在kafka2.1.0版本之前,kafka支持3種壓縮算法:GZIP、Snappy和LZ4。從2.1.0開始,kafka正式支持Zstandard算法(簡寫zstd)。它是Facebook開源的一個壓縮算法,能夠提供超高的壓縮比。對於kafka測試而言,在吞吐方面:LZ4>Snappy> zstd、GZIP;在壓縮比方面:zstd>lz4>gzip>snappy。具體到物理資源,使用snappy算法占用的網絡帶寬資源最多,zstd最少,這是合理的,畢竟zstd就是要提供超高的壓縮比;在CPU使用率方面,各個算法表現得差不多,只是在壓縮時snappy使用的CPU較多一些,而在解壓縮時gzip算法則可能使用更多的CPU。

