前段時間在Kafka QQ群中有人問及此事——關於Java consumer如何動態修改topic訂閱的問題。仔細一想才發現這的確是個好問題,因為如果簡單地在另一個線程中直接持有consumer實例然后調用subscribe進行修改,consumer端必然會拋出異常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access
和KafkaProducer不同的是,KafkaConsumer不是線程安全的,所以我們不能直接在沒有同步保護的機制下直接啟用另一個線程調用consumer的任何方法(除了wakeup)。因此,實現這個需求有兩種途徑:
- 使用重量級的synchorinzed機制來實現線程安全
- 借助Java類庫已有的線程安全數據結構來實現
如果是第一種方式,則無論哪個線程訪問consumer都必須要配備必要的同步保護機制,代價相當大且極易出錯。本文選取第二種方式,我們可以借助Java提供的ConcurrentLinkedQueue來幫助我們實現。具體的步驟為:
- 構建ConcurrentLinkedQueue對象分別給兩個線程使用(這里並不限定於兩個線程,但這個需求最可能的實際場景是consumer主線程和一個后台管理類的用戶線程,而后者負責觸發“動態修改訂閱”邏輯)
- 調用KafkaConsumer.poll(timeout)來不斷消費消息。經常有人問這里的timeout到底是做什么用的?這里統一回答一下:這里的timeout賦予了用戶在consumer讀取消息后可以執行其他一些操作的能力,比如定期的記錄日志等。如果你的consumer沒有這樣的需求,那么調用KafkaConsumer.poll(1000)和KafkaConsumer.poll(Integer.MAX)沒有任何區別。事實上, 我們更加推薦用戶使用KafkaConsumer.poll(Integer.MAX) + wakeup的方式來響應后端其他邏輯!
- 每次poll之后嘗試去探查一下ConcurrentLinkedQueue有沒有新東西(如果有說明訂閱topic列表發生變化),響應之
- 使用另一個線程往ConcurrentLinkedQueue中插入新的訂閱信息
完整樣例代碼如下:
public class ConsumerTest {
public static void main(String[] args) {
final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();
// 創建另一個測試線程,啟動后首先暫停10秒然后變更topic訂閱
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// swallow it.
}
// 變更為訂閱topic: btopic, ctopic
subscribedTopics.addAll(Arrays.asList("btopic", "ctopic"));
}
};
new Thread(runnable).start();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group1");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 最開始的訂閱列表:atopic、btopic
consumer.subscribe(Arrays.asList("atopic", "btopic"));
while (true) {
consumer.poll(2000); //表示每2秒consumer就有機會去輪詢一下訂閱狀態是否需要變更
// 本例不關注消息消費,因此每次只是打印訂閱結果!
System.out.println(consumer.subscription());
if (!subscribedTopics.isEmpty()) {
Iterator<String> iter = subscribedTopics.iterator();
List<String> topics = new ArrayList<>();
while (iter.hasNext()) {
topics.add(iter.next());
}
subscribedTopics.clear();
consumer.subscribe(topics); // 重新訂閱topic
}
}
// 本例只是測試之用,使用了while(true),所以這里沒有顯式關閉consumer
// consumer.close();
}
}
輸出如下:
[atopic, btopic]
[atopic, btopic]
[atopic, btopic]
[ctopic, btopic]
[ctopic, btopic]
由此可見,本consumer在沒有關閉的情況下動態進行了topic的訂閱變更。另外需要說一下,動態變更時最好不要直接調用subscribe(topics),而是要顯式地定義ConsumerRebalanceListener以避免位移提交的混亂。
