Spring-Kafka —— 實現批量消費和手動提交offset


 

spring-kafka的官方文檔介紹,可以知道自1.1版本之后,

@KafkaListener開始支持批量消費,只需要設置batchListener參數為true

把application.yml中的enable-auto-commit設置為false,設置為不自動提交

 

@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
    ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(1500);
    factory.setBatchListener(true);//設置為批量消費,每個批次數量在Kafka配置參數中設置
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//設置手動提交ackMode
  return factory; 
}
   //批量消息
    @KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
    public void consumerBatch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){
        log.info("接收到消息數量:{}",record.size());
       //手動提交    ack.acknowledge();
}

這里containerFactory = “batchFactory”要指定為批量消費

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM