Kafka Producer 異步發送消息居然也會阻塞?


Kafka 一直以來都以高吞吐量的特性而家喻戶曉,就在上周,在一個性能監控項目中,需要使用到 Kafka 傳輸海量消息,在這過程中遇到了一個 Kafka Producer 異步發送消息會被阻塞的問題,導致生產端發送耗時很大。

是的,你沒聽錯,Kafka Producer 異步發送消息也會發生阻塞現象,那究竟是怎么回事呢?

在新版的 Kafka Producer 中,設計了一個消息緩沖池,客戶端發送的消息都會被存儲到緩沖池中,同時 Producer 啟動后還會開啟一個 Sender 線程,不斷地從緩沖池獲取消息並將其發送到 Broker,如下圖所示:

這么看來,Kafka 的所有發送,都可以看作是異步發送了,因此在新版的 Kafka Producer 中廢棄掉異步發送的方法了,僅保留了一個 send 方法,同時返回一個 Futrue 對象,需要同步等待發送結果,就使用 Futrue#get 方法阻塞獲取發送結果。而我在項目中直接調用 send 方法,為何還會發送阻塞呢?

我們在構建 Kafka Producer 時,會有一個自定義緩沖池大小的參數 buffer.memory,默認大小為 32M,因此緩沖池的大小是有限制的,我們不妨想一下,緩沖池內存資源耗盡了會怎么樣?

Kafka 源碼的注釋是非常詳細的,RecordAccumulator 類是 Kafka Producer 緩沖池的核心類,而 RecordAccumulator 類就有那么一段注釋:

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

大概的意思是:

當緩沖池的內存塊用完后,消息追加調用將會被阻塞,直到有空閑的內存塊。

由於性能監控項目每分鍾需要發送幾百萬條消息,只要 Kafka 集群負載很高或者網絡稍有波動,Sender 線程從緩沖池撈取消息的速度趕不上客戶端發送的速度,就會造成客戶端發送被阻塞。

我寫個例子讓大家直觀感受一下被阻塞的現象:

public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ProducerConfig.ACKS_CONFIG, "0");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);
  properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
  KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
  String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  List<byte[]> bytesList = new ArrayList<>();
  Random random = new Random();
  for (int j = 0; j < 1024; j++) {
    int i1 = random.nextInt(10);
    if (i1 == 0) {
      i1 = 1;
    }
    byte[] bytes = new byte[1024 * i1];
    for (int i = 0; i < bytes.length - 1; i++) {
      bytes[i] = (byte) str.charAt(random.nextInt(62));
    }
    bytesList.add(bytes);
  }

  while (true) {
    long start = System.currentTimeMillis();
    producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));
    long end = System.currentTimeMillis() - start;
    if (end > 100) {
      System.out.println("發送耗時:" + end);
    }
    // Thread.sleep(10);
  }
}

以上例子構建了一個 Kafka Producer 對象,同時使用死循環不斷地發送消息,這時如果把 Thread.sleep(10);注釋掉,則會出現發送耗時很長的現象:

使用 JProfiler 可以查看到分配內存的地方出現了阻塞:

跟蹤到源碼:

發現在 org.apache.kafka.clients.producer.internals.BufferPool#allocate 方法中,如果判斷緩沖池沒有空閑的內存了,則會阻塞內存分配,直到有空閑內存為止。

如果不注釋 Thread.sleep(10);這段代碼則不會發生阻塞現象,打斷點到阻塞的地方,也不會被 Debug 到,從現象能夠得知,Thread.sleep(10);使得發送消息的頻率變低了,此時 Sender 線程發送的速度超過了客戶端的發送速度,緩沖池一直處於未滿狀態,因此不會產生阻塞現象。

除了以上緩沖池內存滿了會發生阻塞之外,Kafka Produer 其它情況都不會發生阻塞了嗎?非也,其實還有一個地方,也會發生阻塞!

Kafka Producer 通常在第一次發送消息之前,需要獲取該主題的元數據 Metadata,Metadata 內容包括了主題相關分區 Leader 所在節點信息、副本所在節點信息、ISR 列表等,Kafka Producer 獲取 Metadata 后,便會根據 Metadata 內容將消息發送到指定的分區 Leader 上,整個獲取流程大致如下:

如上圖所示,Kafka Producer 在發送消息之前,會檢查主題的 Metadata 是否需要更新,如果需要更新,則會喚醒 Sender 線程並發送 Metatadata 更新請求,此時 Kafka Producer 主線程則會阻塞等待 Metadata 的更新。

如果 Metadata 一直無法更新,則會導致客戶端一直阻塞在那里。

作者簡介

作者張乘輝,擅長消息中間件技能,負責公司百萬 TPS 級別 Kafka 集群的維護,作者維護的公號「后端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的源碼分析;同時作者也是阿里開源分布式事務框架 Seata Contributor,因此也會分享關於 Seata 的相關知識;當然公號也會分享 WEB 相關知識比如 Spring 全家桶等。內容不一定面面俱到,但一定讓你感受到作者對於技術的追求是認真的!

公眾號:后端進階

技術博客:https://objcoding.com/

GitHub:https://github.com/objcoding/

公眾號「后端進階」,專注后端技術分享!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM