Kafka 一直以來都以高吞吐量的特性而家喻戶曉,就在上周,在一個性能監控項目中,需要使用到 Kafka 傳輸海量消息,在這過程中遇到了一個 Kafka Producer 異步發送消息會被阻塞的問題,導致生產端發送耗時很大。
是的,你沒聽錯,Kafka Producer 異步發送消息也會發生阻塞現象,那究竟是怎么回事呢?
在新版的 Kafka Producer 中,設計了一個消息緩沖池,客戶端發送的消息都會被存儲到緩沖池中,同時 Producer 啟動后還會開啟一個 Sender 線程,不斷地從緩沖池獲取消息並將其發送到 Broker,如下圖所示:
這么看來,Kafka 的所有發送,都可以看作是異步發送了,因此在新版的 Kafka Producer 中廢棄掉異步發送的方法了,僅保留了一個 send 方法,同時返回一個 Futrue 對象,需要同步等待發送結果,就使用 Futrue#get 方法阻塞獲取發送結果。而我在項目中直接調用 send 方法,為何還會發送阻塞呢?
我們在構建 Kafka Producer 時,會有一個自定義緩沖池大小的參數 buffer.memory
,默認大小為 32M,因此緩沖池的大小是有限制的,我們不妨想一下,緩沖池內存資源耗盡了會怎么樣?
Kafka 源碼的注釋是非常詳細的,RecordAccumulator 類是 Kafka Producer 緩沖池的核心類,而 RecordAccumulator 類就有那么一段注釋:
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
大概的意思是:
當緩沖池的內存塊用完后,消息追加調用將會被阻塞,直到有空閑的內存塊。
由於性能監控項目每分鍾需要發送幾百萬條消息,只要 Kafka 集群負載很高或者網絡稍有波動,Sender 線程從緩沖池撈取消息的速度趕不上客戶端發送的速度,就會造成客戶端發送被阻塞。
我寫個例子讓大家直觀感受一下被阻塞的現象:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.ACKS_CONFIG, "0");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
List<byte[]> bytesList = new ArrayList<>();
Random random = new Random();
for (int j = 0; j < 1024; j++) {
int i1 = random.nextInt(10);
if (i1 == 0) {
i1 = 1;
}
byte[] bytes = new byte[1024 * i1];
for (int i = 0; i < bytes.length - 1; i++) {
bytes[i] = (byte) str.charAt(random.nextInt(62));
}
bytesList.add(bytes);
}
while (true) {
long start = System.currentTimeMillis();
producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));
long end = System.currentTimeMillis() - start;
if (end > 100) {
System.out.println("發送耗時:" + end);
}
// Thread.sleep(10);
}
}
以上例子構建了一個 Kafka Producer 對象,同時使用死循環不斷地發送消息,這時如果把 Thread.sleep(10);
注釋掉,則會出現發送耗時很長的現象:
使用 JProfiler 可以查看到分配內存的地方出現了阻塞:
跟蹤到源碼:
發現在 org.apache.kafka.clients.producer.internals.BufferPool#allocate
方法中,如果判斷緩沖池沒有空閑的內存了,則會阻塞內存分配,直到有空閑內存為止。
如果不注釋 Thread.sleep(10);
這段代碼則不會發生阻塞現象,打斷點到阻塞的地方,也不會被 Debug 到,從現象能夠得知,Thread.sleep(10);
使得發送消息的頻率變低了,此時 Sender 線程發送的速度超過了客戶端的發送速度,緩沖池一直處於未滿狀態,因此不會產生阻塞現象。
除了以上緩沖池內存滿了會發生阻塞之外,Kafka Produer 其它情況都不會發生阻塞了嗎?非也,其實還有一個地方,也會發生阻塞!
Kafka Producer 通常在第一次發送消息之前,需要獲取該主題的元數據 Metadata,Metadata 內容包括了主題相關分區 Leader 所在節點信息、副本所在節點信息、ISR 列表等,Kafka Producer 獲取 Metadata 后,便會根據 Metadata 內容將消息發送到指定的分區 Leader 上,整個獲取流程大致如下:
如上圖所示,Kafka Producer 在發送消息之前,會檢查主題的 Metadata 是否需要更新,如果需要更新,則會喚醒 Sender 線程並發送 Metatadata 更新請求,此時 Kafka Producer 主線程則會阻塞等待 Metadata 的更新。
如果 Metadata 一直無法更新,則會導致客戶端一直阻塞在那里。
作者簡介
作者張乘輝,擅長消息中間件技能,負責公司百萬 TPS 級別 Kafka 集群的維護,作者維護的公號「后端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的源碼分析;同時作者也是阿里開源分布式事務框架 Seata Contributor,因此也會分享關於 Seata 的相關知識;當然公號也會分享 WEB 相關知識比如 Spring 全家桶等。內容不一定面面俱到,但一定讓你感受到作者對於技術的追求是認真的!
公眾號:后端進階
GitHub:https://github.com/objcoding/