一次項目想,多線程消費主題的中的數據,百度了一波之后,可以按分區進行單獨消費,記錄一下
首先,傳統按照主題消費:
1 @KafkaListener(topics = {Constants.KAFKA_TOPIC_PISHH}, containerFactory = "kafkaListenerMonitorFactory") 2 public void listenDataInfo(List<ConsumerRecord> records, Acknowledgment ack) { 3 try { 4 long beginTime = System.currentTimeMillis(); 5 batchList.clear(); 6 for (ConsumerRecord record : records) { 7 count++; 8 Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 9 //獲取kafka消息 10 logger.info(kafkaMessage.get().toString()); 11 } 12 } catch (Exception e){ 13 e.printStackTrace(); 14 } finally { 15 ack.acknowledge();//手動提交偏移量 16 } 17 }
按照主題分區消費:
1 @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = Constants.KAFKA_TOPIC_PISHH, partitions = { "0" }) }, containerFactory = "kafkaListenerMonitorFactory")
2 public void listenDataInfo0(List<ConsumerRecord> records, Acknowledgment ack) { 3 try { 4 long beginTime = System.currentTimeMillis(); 5 batchList.clear(); 6 for (ConsumerRecord record : records) { 7 count++; 8 Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 9 //獲取kafka消息 10 logger.info(kafkaMessage.get().toString()); 11 } 12 13 } catch (Exception e){ 14 e.printStackTrace(); 15 } finally { 16 ack.acknowledge();//手動提交偏移量 17 } 18 }
參考:https://blog.csdn.net/russle/article/details/81258590