最近發現一個Kafka producer異步發送在某些情況會阻塞主線程,后來在排查解決問題過程中發現這可以算是Kafka的一個說明不恰當的地方。
問題說明
在很多場景下我們會使用異步方式來發送Kafka的消息,會使用KafkaProducer中的以下方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}
根據文檔的說明它是一個異步的發送方法,按道理不管如何它都不應該阻塞主線程,但實際中某些情況下會出現阻塞線程,比如broker未正確運行,topic未創建等情況,有些時候我們不需要對發送的結果做保證,但是如果出現阻塞的話,會影響其他業務邏輯。
問題出現點
從KafkaProducer send這個方法聲明上看並沒有什么問題,那么我們來看一下她的具體實現:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //出現問題的地方
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
...
} catch (ApiException e) {
...
}
}
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
//一直獲取topic的元數據信息,直到獲取成功,若獲取時間超過maxWaitMs,則拋出異常
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs) { //判斷執行時間是否大於maxWaitMs
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowException();
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
從它的實現我們可以看出,會導致線程阻塞的原因在於以下這個邏輯:
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException
通過KafkaProducer 執行send的過程中需要先獲取Metadata,而這是一個不斷循環的操作,直到獲取成功,或者拋出異常。
其實Kafka本意這么實現並沒有問題,因為你要發送消息的前提就是能獲取到border和topic的信息,問題在於這個send對外暴露的是Future的方法,但是內部實現卻是有阻塞的,那么在有些時候沒有考慮到這種情況,一旦出現border或者topic異常,將會阻塞系統線程,導致系統響應變慢,直到奔潰。
問題解決
其實解決這個問題很簡單,就是單獨創建幾個線程用於消息發送,這樣即使遇到意外情況,也只會阻塞幾個線程,不會引起系統線程大面積阻塞,不可用,具體實現:
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
class ProducerF[K,V](kafkaProducer: KafkaProducer[K,V]) {
val executor: ExecutorService = Executors.newScheduledThreadPool(1)
def sendAsync(producerRecord: ProducerRecord[K,V], callback: Callback) = {
executor.submit(new Callable[RecordMetadata]() {
def call = kafkaProducer.send(producerRecord, callback).get()
})
}
}
