應用服務啟動時,KafkaListener默認會自動啟動進行消費,如果想不自動消費,可以設置AutoStartup屬性值為false
@Override @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning") public void ackListener(ConsumerRecord record, Acknowledgment ack) { if (LOG.isInfoEnabled()) { LOG.info("###################預警ackListener接收到消息###################"); } // todo 消費業務處理 }
@Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); //容器線程數:小於或等於Topic的分區數 factory.setConcurrency(3); //設置提交偏移量的方式 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); // 禁止自動啟動 factory.setAutoStartup(false); return factory; }
完整消費代碼vk