kafka設計的目標之一就是高吞吐量。除了最基礎的將一個topic划分為多個partition外,還從以下各個方面優化。
kafka broker端為了提高吞吐量:實現順序讀寫磁盤、利用page cache,將文件數據映射到內存,利用sendfile網傳時socket通信時直接讀取內存區域(減少操作系統上下文切換、零拷貝提速);
producer端,將消息buffer起來,當消息的條數達到一定閥值時(一定數量或時間),批量發送給broker;
consumer,批量fetch多條消息.通過配置到達一定閾值時(一定數量或時間),批量從broker拉取信息;
對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制減少網傳數據量;壓縮需要消耗少量的CPU資源,可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式。
順序寫磁盤
根據《一些場景下順序寫磁盤快於隨機寫內存》所述,將寫磁盤的過程變為順序寫,可極大提高對磁盤的利用率。
Kafka的整個設計中,Partition相當於一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些數據,並且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。
由於磁盤有限,不可能保存所有數據,實際上作為消息系統Kafka也沒必要保存所有數據,需要刪除舊的數據。而這個刪除過程,並非通過使用“讀-寫”模式去修改文件,而是將Partition分為多個Segment,每個Segment對應一個物理文件,通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。
通過如下代碼可知,Kafka刪除Segment的方式,是直接刪除Segment對應的整個log文件和整個index文件而非刪除文件中的部分內容。
1 |
/** |
充分利用Page Cache
使用Page Cache的好處如下
- I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能
- I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
- 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔
- 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過Page Cache)交換數據
- 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用
Broker收到數據后,寫磁盤時只是將數據寫入Page Cache,並不保證數據一定完全寫入磁盤。從這一點看,可能會造成機器宕機時,Page Cache內的數據未寫入磁盤從而造成數據丟失。但是這種丟失只發生在機器斷電等造成操作系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下數據不丟失而強制將Page Cache中的數據Flush到磁盤,反而會降低性能。也正因如此,Kafka雖然提供了flush.messages
和flush.ms
兩個參數將Page Cache中的數據強制Flush到磁盤,但是Kafka並不建議使用。
如果數據消費速度與生產速度相當,甚至不需要通過物理磁盤交換數據,而是直接通過Page Cache交換數據。同時,Follower從Leader Fetch數據時,也可通過Page Cache完成。下圖為某Partition的Leader節點的網絡/磁盤讀寫信息。
從上圖可以看到,該Broker每秒通過網絡從Producer接收約35MB數據,雖然有Follower從該Broker Fetch數據,但是該Broker基本無讀磁盤。這是因為該Broker直接從Page Cache中將數據取出返回給了Follower。
支持多Disk Drive
Broker的log.dirs
配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs
里。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。
零拷貝
Kafka中存在大量的網絡數據持久化到磁盤(Producer到Broker)和磁盤文件通過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。
傳統模式下的四次拷貝與四次上下文切換
以將磁盤文件通過網絡發送為例。傳統模式下,一般使用如下偽代碼所示的方法先將文件數據讀入內存,然后通過Socket將內存中的數據發送出去。
1 |
buffer = File.read |
這一過程實際上發生了四次數據拷貝。首先通過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),然后應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接着用戶程序通過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最后通過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨着四次上下文切換,如下圖所示。
sendfile和transferTo實現零拷貝
Linux 2.4+內核通過sendfile
系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer后,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile
調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。零拷貝過程如下圖所示。
從具體實現來看,Kafka的數據傳輸通過TransportLayer來完成,其子類PlaintextTransportLayer
通過Java NIO的FileChannel的transferTo
和transferFrom
方法實現零拷貝,如下所示。
1 |
@Override |
注: transferTo
和transferFrom
並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile
這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。
減少網絡開銷
批處理
批處理是一種常用的用於提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。
Kafka 0.8.1及以前的Producer區分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage作為參數,一次發送一條消息。另一種是接受一批KeyedMessage作為參數,一次性發送多條消息。而對於異步發送而言,無論是使用哪個send方法,實現上都不會立即將消息發送給Broker,而是先存到內部的隊列中,直到消息條數達到閾值或者達到指定的Timeout才真正的將消息發送出去,從而實現了消息的批量發送。
Kafka 0.8.2開始支持新的Producer API,將同步Producer和異步Producer結合。雖然從send接口來看,一次只能發送一個ProducerRecord,而不能像之前版本的send方法一樣接受消息列表,但是send方法並非立即將消息發送出去,而是通過batch.size
和linger.ms
控制實際發送頻率,從而實現批量發送。
由於每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協議本身的一些內容(稱為Overhead),所以將多條消息合並到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。
從零拷貝章節的圖中可以看到,雖然Broker持續從網絡接收數據,但是寫磁盤並非每秒都在發生,而是間隔一段時間寫一次磁盤,並且每次寫磁盤的數據量都非常大(最高達到718MB/S)。
數據壓縮降低網絡負載
Kafka從0.7開始,即支持將數據壓縮后再傳輸給Broker。除了可以將每條消息單獨壓縮然后傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一起壓縮后傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。
Broker接收消息后,並不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch到數據后再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網絡傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了Consumer與Broker間的網絡傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
高效的序列化方式
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。Kafka設計解析(六)- Kafka高性能架構之道