一、消息消費確認
簡單說下kafka消費的邏輯。
當前生產這發送消息到相應的主題topic,消費端可以去監聽自己所關注的topic消息,從而實現本地邏輯的流轉。
消費的確認的方式:
1、消費端(kafka)自動提交
spring.kafka.consumer.enable-auto-commit=true //這里表示用戶無需關注消費的提交,kafka系統會負責幫我們按照一定時間頻率提交。
2、消費端手動提交
spring.kafka.consumer.enable-auto-commit=false //這里需要注意,並不是說你這里設置了false就能一定確保消費的offset不會被提交。單純的設置這個為false是無法保證offset不會被提交。
二、那么如何真正做到消費者offset的沒收手動提交就不觸發提交呢?
第一:spring.kafka.consumer.enable-auto-commit=false //true的話啟動會報錯
第二:構建自定義工廠
1 /** 2 * 手動提交 3 * topics訂閱的消息主題可以多個{"a","b"} 4 * group-DszConsumer3-1消費組ID 5 * ackContainerFactory自定消費策略 6 */ 7 @KafkaListener(id = "group-DszConsumer3-1", topics = TOPIC, containerFactory = "ackContainerFactory") 8 public void ackListener(ConsumerRecord record, Acknowledgment ack) { 9 String topic = record.topic(); 10 long offset = record.offset(); 11 int partition = record.partition(); 12 log.info("當前消息topic={},offset={},partition={}",topic,offset,partition); 13 //手動提交 14 //ack.acknowledge(); 15 }
自定義消費策略 ackContainerFactory
1 @Configuration 2 public class KafkaConfig { 3 @Bean("ackContainerFactory") 4 public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) { 5 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); 6 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//設置手動模式 7 factory.setConsumerFactory(consumerFactory); 8 return factory; 9 } 10 11 }
按照以上方式,如果沒有手動觸發 ack.acknowledge();那么消費組是不費提交已消費的offset,每次重啟消費組都會再次消費。
這就是真正實現把是否需要提交offset權限交給開發者,自由控制。
總結:spring.kafka.consumer.enable-auto-commit=false並不是真正意義關閉自動提交,而是吧提交的權限交給Spring。Spring會幫我們去提交,省去用戶提交的過程。這就是為什么false的時候明明沒有手動提交去還是提交的疑惑。
而spring.kafka.consumer.enable-auto-commit=true則是自動提交權限由kafka自身在控制。