Kafka 0.9版本開始推出了Java版本的consumer,優化了coordinator的設計以及擺脫了對zookeeper的依賴。社區最近也在探討正式用這套consumer API替換Scala版本的consumer的計划。鑒於目前這方面的資料並不是很多,本文將嘗試給出一個利用KafkaConsumer編寫的多線程消費者實例,希望對大家有所幫助。
這套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的單線程使用方法官網API已有介紹,這里不再贅述了。因此,我們直奔主題——討論一下如何創建多線程的方式來使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是線程安全的,因此我們鼓勵用戶在多個線程中共享一個KafkaProducer實例,這樣通常都要比每個線程維護一個KafkaProducer實例效率要高。但對於KafkaConsumer而言,它不是線程安全的,所以實現多線程時通常由兩種實現方法:
1 每個線程維護一個KafkaConsumer

2
維護一個或多個KafkaConsumer,同時維護多個事件處理線程(worker thread)

當然,這種方法還可以有多個變種:比如每個worker線程有自己的處理隊列。consumer根據某種規則或邏輯將消息放入不同的隊列。不過總體思想還是相同的,故這里不做過多展開討論了。
下表總結了兩種方法的優缺點:
優點 | 缺點 | |
方法1(每個線程維護一個KafkaConsumer) | 方便實現 速度較快,因為不需要任何線程間交互 易於維護分區內的消息順序 |
更多的TCP連接開銷(每個線程都要維護若干個TCP連接) consumer數受限於topic分區數,擴展性差 頻繁請求導致吞吐量下降 線程自己處理消費到的消息可能會導致超時,從而造成rebalance |
方法2 (單個(或多個)consumer,多個worker線程) | 可獨立擴展consumer數和worker數,伸縮性好 |
實現麻煩
通常難於維護分區內的消息順序
處理鏈路變長,導致難以保證提交位移的語義正確性
|
下面我們分別實現這兩種方法。需要指出的是,下面的代碼都是最基本的實現,並沒有考慮很多編程細節,比如如何處理錯誤等。
方法1
ConsumerRunnable類
1 import org.apache.kafka.clients.consumer.ConsumerRecord; 2 import org.apache.kafka.clients.consumer.ConsumerRecords; 3 import org.apache.kafka.clients.consumer.KafkaConsumer; 4 5 import java.util.Arrays; 6 import java.util.Properties; 7 8 public class ConsumerRunnable implements Runnable { 9 10 // 每個線程維護私有的KafkaConsumer實例 11 private final KafkaConsumer<String, String> consumer; 12 13 public ConsumerRunnable(String brokerList, String groupId, String topic) { 14 Properties props = new Properties(); 15 props.put("bootstrap.servers", brokerList); 16 props.put("group.id", groupId); 17 props.put("enable.auto.commit", "true"); //本例使用自動提交位移 18 props.put("auto.commit.interval.ms", "1000"); 19 props.put("session.timeout.ms", "30000"); 20 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 21 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 22 this.consumer = new KafkaConsumer<>(props); 23 consumer.subscribe(Arrays.asList(topic)); // 本例使用分區副本自動分配策略 24 } 25 26 @Override 27 public void run() { 28 while (true) { 29 ConsumerRecords<String, String> records = consumer.poll(200); // 本例使用200ms作為獲取超時時間 30 for (ConsumerRecord<String, String> record : records) { 31 // 這里面寫處理消息的邏輯,本例中只是簡單地打印消息 32 System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() + 33 "th message with offset: " + record.offset()); 34 } 35 } 36 } 37 }
ConsumerGroup類
1 package com.my.kafka.test; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 public class ConsumerGroup { 7 8 private List<ConsumerRunnable> consumers; 9 10 public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) { 11 consumers = new ArrayList<>(consumerNum); 12 for (int i = 0; i < consumerNum; ++i) { 13 ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic); 14 consumers.add(consumerThread); 15 } 16 } 17 18 public void execute() { 19 for (ConsumerRunnable task : consumers) { 20 new Thread(task).start(); 21 } 22 } 23 }
ConsumerMain類
1 public class ConsumerMain { 2 3 public static void main(String[] args) { 4 String brokerList = "localhost:9092"; 5 String groupId = "testGroup1"; 6 String topic = "test-topic"; 7 int consumerNum = 3; 8 9 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList); 10 consumerGroup.execute(); 11 } 12 }
方法2
Worker類
1 import org.apache.kafka.clients.consumer.ConsumerRecord; 2 3 public class Worker implements Runnable { 4 5 private ConsumerRecord<String, String> consumerRecord; 6 7 public Worker(ConsumerRecord record) { 8 this.consumerRecord = record; 9 } 10 11 @Override 12 public void run() { 13 // 這里寫你的消息處理邏輯,本例中只是簡單地打印消息 14 System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition() 15 + "th message with offset: " + consumerRecord.offset()); 16 } 17 }
ConsumerHandler類
1 import org.apache.kafka.clients.consumer.ConsumerRecord; 2 import org.apache.kafka.clients.consumer.ConsumerRecords; 3 import org.apache.kafka.clients.consumer.KafkaConsumer; 4 5 import java.util.Arrays; 6 import java.util.Properties; 7 import java.util.concurrent.ArrayBlockingQueue; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.ThreadPoolExecutor; 10 import java.util.concurrent.TimeUnit; 11 12 public class ConsumerHandler { 13 14 // 本例中使用一個consumer將消息放入后端隊列,你當然可以使用前一種方法中的多實例按照某張規則同時把消息放入后端隊列 15 private final KafkaConsumer<String, String> consumer; 16 private ExecutorService executors; 17 18 public ConsumerHandler(String brokerList, String groupId, String topic) { 19 Properties props = new Properties(); 20 props.put("bootstrap.servers", brokerList); 21 props.put("group.id", groupId); 22 props.put("enable.auto.commit", "true"); 23 props.put("auto.commit.interval.ms", "1000"); 24 props.put("session.timeout.ms", "30000"); 25 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 26 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 27 consumer = new KafkaConsumer<>(props); 28 consumer.subscribe(Arrays.asList(topic)); 29 } 30 31 public void execute(int workerNum) { 32 executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, 33 new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); 34 35 while (true) { 36 ConsumerRecords<String, String> records = consumer.poll(200); 37 for (final ConsumerRecord record : records) { 38 executors.submit(new Worker(record)); 39 } 40 } 41 } 42 43 public void shutdown() { 44 if (consumer != null) { 45 consumer.close(); 46 } 47 if (executors != null) { 48 executors.shutdown(); 49 } 50 try { 51 if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { 52 System.out.println("Timeout.... Ignore for this case"); 53 } 54 } catch (InterruptedException ignored) { 55 System.out.println("Other thread interrupted this shutdown, ignore for this case."); 56 Thread.currentThread().interrupt(); 57 } 58 } 59 60 }
Main類
1 public class Main { 2 3 public static void main(String[] args) { 4 String brokerList = "localhost:9092,localhost:9093,localhost:9094"; 5 String groupId = "group2"; 6 String topic = "test-topic"; 7 int workerNum = 5; 8 9 ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic); 10 consumers.execute(workerNum); 11 try { 12 Thread.sleep(1000000); 13 } catch (InterruptedException ignored) {} 14 consumers.shutdown(); 15 } 16 }
總結一下,這兩種方法或是模型都有各自的優缺點,在具體使用時需要根據自己實際的業務特點來選取對應的方法。就我個人而言,我比較推崇第二種方法以及背后的思想,即不要將很重的處理邏輯放入消費者的代碼中,很多Kafka consumer使用者碰到的各種rebalance超時、coordinator重新選舉、心跳無法維持等問題都來源於此。