spring kafka消費者配置介紹----ackMode


當 auto.commit.enable 設置為false時,表示kafak的offset由customer手動維護,spring-kafka提供了通過ackMode的值表示不同的手動提交方式;

ackMode有以下7種值:

public enum AckMode {
        // 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
        RECORD,
        // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交
        BATCH,
        // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大於TIME時提交
        TIME,
        // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大於等於COUNT時提交
        COUNT,
        // TIME | COUNT 有一個條件滿足時提交
        COUNT_TIME,
        // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, 手動調用Acknowledgment.acknowledge()后提交
        MANUAL,
        // 手動調用Acknowledgment.acknowledge()后立即提交
        MANUAL_IMMEDIATE,
    }

如果設置 AckMode 模式為 MANUAL 或者 MANUAL_IMMEDIATE,則需要對監聽消息的方法中,引入 Acknowledgment 對象參數,並調用 acknowledge() 方法進行手動提交;

在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer的run()方法和processCommits()方法使用:

public void run() {
  ......
     while (isRunning()) {
         try {
            if (!this.autoCommit && !this.isRecordAck) { // 當 autoCommit 為false時且 ackMode不是record時 調用 processCommits 方法,判斷如何手動提交
          processCommits();
        }
        processSeeks();
        ......
    }  }
}
private void processCommits() {
            this.count += this.acks.size();  // acks 是一個LinkedBlockingQueue類型的阻塞隊列,存放從kafka讀取到的record數據
            handleAcks();
            long now;
            AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {  // 不是使用者手動調用
                if (!this.isManualAck) {
                    updatePendingOffsets();
                }
                boolean countExceeded = this.count >= this.containerProperties.getAckCount(); 
                if (this.isManualAck || this.isBatchAck || this.isRecordAck
                        || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                    ......
                    commitIfNecessary();
                    this.count = 0;
                }
                else {
                    now = System.currentTimeMillis();
                    boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals(AckMode.TIME) && elapsed) {
                        ......
               c
ommitIfNecessary(); this.last = now; } else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {               ...... commitIfNecessary(); this.last = now; this.count = 0; } } } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM