spring-kafka的官方文檔介紹,可以知道自1.1版本之后,
@KafkaListener開始支持批量消費,只需要設置batchListener參數為true
把application.yml中的enable-auto-commit設置為false,設置為不自動提交
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);//設置為批量消費,每個批次數量在Kafka配置參數中設置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//設置手動提交ackMode
return factory;
}
//批量消息
@KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
public void consumerBatch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){
log.info("接收到消息數量:{}",record.size());
//手動提交 ack.acknowledge();
}
這里containerFactory = “batchFactory”要指定為批量消費

