當 auto.commit.enable 設置為false時,表示kafak的offset由customer手動維護,spring-kafka提供了通過ackMode的值表示不同的手動提交方式;
ackMode有以下7種值:
public enum AckMode { // 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交 RECORD, // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交 BATCH, // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大於TIME時提交 TIME, // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大於等於COUNT時提交 COUNT, // TIME | COUNT 有一個條件滿足時提交 COUNT_TIME, // 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, 手動調用Acknowledgment.acknowledge()后提交 MANUAL, // 手動調用Acknowledgment.acknowledge()后立即提交 MANUAL_IMMEDIATE, }
如果設置 AckMode 模式為 MANUAL
或者 MANUAL_IMMEDIATE
,則需要對監聽消息的方法中,引入 Acknowledgment 對象參數,並調用 acknowledge() 方法進行手動提交;
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer的run()方法和processCommits()方法使用:
public void run() { ...... while (isRunning()) { try { if (!this.autoCommit && !this.isRecordAck) { // 當 autoCommit 為false時且 ackMode不是record時 調用 processCommits 方法,判斷如何手動提交
processCommits();
}
processSeeks();
......
} }
}
private void processCommits() { this.count += this.acks.size(); // acks 是一個LinkedBlockingQueue類型的阻塞隊列,存放從kafka讀取到的record數據 handleAcks(); long now; AckMode ackMode = this.containerProperties.getAckMode(); if (!this.isManualImmediateAck) { // 不是使用者手動調用 if (!this.isManualAck) { updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AckMode.COUNT) && countExceeded)) { ...... commitIfNecessary(); this.count = 0; } else { now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) { ......
commitIfNecessary(); this.last = now; } else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) { ...... commitIfNecessary(); this.last = now; this.count = 0; } } } }