Kafka多個消費者的小例子
public class FirstMultiConsumerThreadDemo2 {
public static final String brokerList = "10.211.55.3:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// props.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String,String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties props,String topic,int threadNumber) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
//將一批消息,即records封裝成任務類,提交給線程池
//通常這一步最為耗時,通過異步的方式,降低處理業務邏輯所耗費的時間
executorService.submit(new RecordHandler(records));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
public static class RecordHandler extends Thread{
public final ConsumerRecords<String,String> records;
public RecordHandler(ConsumerRecords<String,String> records){
this.records = records;
}
@Override
public void run() {
//處理records
}
}
}
RecordHandler
類是用來處理消息的,注意線程池的最后一個參數設置的是:CallerrunsPolicy
,這樣可以防止線程池的總體消費能力跟不上poll()的能力,從而導致異常現象的發生。
一般而言,poll()
拉取消息的速度是相當快的,而整體消費的瓶頸也正是在處理消息這一塊,如果我們通過異步的方式,就能 帶動整體消費性能的提升。
如下圖:
將處理消息改成多線程的實現方式。