前言
本文主要講述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情況下,AckMode的選項,及手動提交分析總結。
AckMode
RECORD
每處理一條commit一次
BATCH(默認)
每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率
TIME
每次間隔ackTime的時間去commit(跟auto commit interval有什么區別呢?)
COUNT
累積達到ackCount次的ack去commit
COUNT_TIME
ackTime或ackCount哪個條件先滿足,就commit
MANUAL
listener負責ack,但是背后也是批量上去
MANUAL_IMMEDIATE
listner負責ack,每調用一次,就立即commit
Manual Commit
- 消費端手動提交offset代碼如下:
/** * 這是手動提交的消費方式 * @param record * @param ack * @throws Exception */ @KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "寫自己的消費組 id") public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception { String msg = JSONObject.parseObject(record.value(), String.class); System.out.println(msg); if (new Random().nextInt(100)<50){ logger.info(String.format("kafka 綜合收費消費消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value())); ack.acknowledge(); } }
前提要配置AckMode:
instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
- 接下來問題來了, 如果代碼中沒有進行ack.acknowledge(),會怎么辦呢??
消費者在消費消息的過程中,配置參數設置為不自動提交offset,在消費完數據之后如果不手動提交offset,那么在程序中和kafak中的數據會如何被處理呢?
1. 如果在消費kafka的數據過程中,一直沒有提交offset,那么在此程序運行的過程中它不會重復消費。但是如果重啟之后,就會重復消費之前沒有提交offset的數據。
2. 如果在消費的過程中有幾條或者一批數據數據沒有提交offset,后面其他的消息消費后正常提交offset,那么服務端會更新為消費后最新的offset,不會重新消費,就算重啟程序也不會重新消費。
3. 消費者如果沒有提交offset,程序不會阻塞或者重復消費,除非在消費到這個你不想提交offset的消息時你嘗試重新初始化一個客戶端消費者,即可再次消費這個未提交offset的數據。因為客戶端也記錄了當前消費者的offset信息,所以程序會在每次消費了數據之后,自己記錄offset,而手動提交到服務端的offset與這個並沒有關系,所以程序會繼續往下消費。在你重新初始化客戶端消費者之后,會從服務端得到最新的offset信息記錄到本地。所以說如果當前的消費的消息沒有提交offset,此時在你重新初始化消費者之后,可得到這條未提交消息的offset,從此位置開始消費。