Spring-Kafka —— AckMode介紹和手動提交分析總結


 

前言

本文主要講述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情況下,AckMode的選項,及手動提交分析總結。

 

AckMode

RECORD
每處理一條commit一次
BATCH(默認) 每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率
TIME 每次間隔ackTime的時間去commit(跟auto commit interval有什么區別呢?)
COUNT 累積達到ackCount次的ack去commit
COUNT_TIME ackTime或ackCount哪個條件先滿足,就commit
MANUAL listener負責ack,但是背后也是批量上去
MANUAL_IMMEDIATE listner負責ack,每調用一次,就立即commit

 

Manual Commit

  • 消費端手動提交offset代碼如下:
  /**
     * 這是手動提交的消費方式
     * @param record
     * @param ack
     * @throws Exception
     */
    @KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "寫自己的消費組 id")
    public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception {
        String msg = JSONObject.parseObject(record.value(), String.class);
        System.out.println(msg);
        if (new Random().nextInt(100)<50){
            logger.info(String.format("kafka 綜合收費消費消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()));
            ack.acknowledge();
        }        
    }

前提要配置AckMode:

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

 

  • 接下來問題來了, 如果代碼中沒有進行ack.acknowledge(),會怎么辦呢??

消費者在消費消息的過程中,配置參數設置為不自動提交offset,在消費完數據之后如果不手動提交offset,那么在程序中和kafak中的數據會如何被處理呢?

1. 如果在消費kafka的數據過程中,一直沒有提交offset,那么在此程序運行的過程中它不會重復消費。但是如果重啟之后,就會重復消費之前沒有提交offset的數據。

2. 如果在消費的過程中有幾條或者一批數據數據沒有提交offset,后面其他的消息消費后正常提交offset,那么服務端會更新為消費后最新的offset,不會重新消費,就算重啟程序也不會重新消費。

3. 消費者如果沒有提交offset,程序不會阻塞或者重復消費,除非在消費到這個你不想提交offset的消息時你嘗試重新初始化一個客戶端消費者,即可再次消費這個未提交offset的數據。因為客戶端也記錄了當前消費者的offset信息,所以程序會在每次消費了數據之后,自己記錄offset,而手動提交到服務端的offset與這個並沒有關系,所以程序會繼續往下消費。在你重新初始化客戶端消費者之后,會從服務端得到最新的offset信息記錄到本地。所以說如果當前的消費的消息沒有提交offset,此時在你重新初始化消費者之后,可得到這條未提交消息的offset,從此位置開始消費。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM