上文中主要介紹了Kafka 的消費位移從Zookeeper 轉移到了自己管理。本文主要介紹一下位移的提交方式。
Consumer 需要向 Kafka 匯報自己的位移數據,這個匯報過程被稱為提交位移。因為 Consumer 能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即Consumer 需要為分配給它的每個分區提交各自的位移數據。
提交位移主要是為了表征 Consumer 的消費進度,這樣當 Consumer 發生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應的位移處繼續消費,從而免整個消費過程重來一遍。換句話說,位移提交是 Kafka 提供給你的一個工具或語義保障,你負責維持這個語義保障,即如果你提交了位移 X,那么 Kafka 會認為所有位移值小於 X 的消息你都已經成功消費了。
但是位移提交的語義保障需要由用戶來負責,Kafka 只會“無腦”地接受你提交的位移。用戶對位移提交的管理直接影響了Consumer 所能提供的消息語義保障。
Kafka 位移提交從用戶的角度來說分為手動提交和自動提交默認的方式是自動提交,在用戶零感知的情況下將消費唯一提交了上去。從consumer 角度來說分布同步提交和異步提交。
我們先來說說自動提交和手動提交。所謂自動提交,就是指 Kafka Consumer 在后台默默地為你提交位移,作為用戶的你完全不必操心這些事;而手動提交,則是指你要自己提交位移,Kafka Consumer 壓根不管。開啟自動提交位移的方法很簡單。Consumer 端有個參數 enable.auto.commit,把它設置為 true 或者壓根不設置它就可以了。因為它的默認值就是 true,即 Java Consumer 默認就是自動提交位移的。如果啟用了自動提交,Consumer 端還有個參數就派上用場了:auto.commit.interval.ms
。它的默認值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。
下面給出一個例子:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
和自動提交相反的就是手動提交了。看着上面的例子很容易就想到了就是設置enable.auto.commit
為 false。但是,僅僅設置它為 false 還不夠,因為你只是告訴Kafka Consumer 不要自動提交位移而已,你還需要調用相應的 API 手動提交位移。最簡單的 API 就是KafkaConsumer.commitSync()
。該方法會提交KafkaConsumer.poll()
返回的最。位移。從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出.
下面給出一個sample
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records);
try {
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
我們最好在處理完邏輯之后再去提交位移不然過早的提交可能會導致數據丟失的情況。
既然手動提交有這種考慮,那么自動提交就好么 我們來分析下
一旦設置了 enable.auto.commit 為 true也就是自動提交,Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息。從順序上來說,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況。但自動提交位移的一個問題在於,它可能會出現重復消費。
在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之后的 3 秒發生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms
的值來提高提交頻率,但這么做只能縮小重復消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。
它的好處就在於更加靈活,你完全能夠把控位移提交的時機和頻率。但是,它也有一個缺陷,就是在調用 commitSync() 時,Consumer 程序會處於阻塞狀態,直到遠端Broker 返回提交結果,這個狀態才會結束。在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS。當然,你可以選擇拉長提交隔,但這樣做的后果是 Consumer 的提交頻率下降,當Consumer 發生 Rebalance 的時候,會有更多的消息被重新消費。
鑒於這個問題,Kafka 社區為手動提交位移提供了另一個 API 方法:KafkaConsumer.commitAsync()
。從名字上來看它就不是同步的,而是一個異步操作。調用commitAsync() 之后,它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是異步的,Kafka 提供了回調函數(callback),供你實現提交之后的邏輯。下面給出一個sample:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records);
consumer.commitAsync((offsets, exception) -> {
if (exception != null){
exception.printStackTrace();
}
});
}
commitAsync 是否能夠替代 commitSync 呢?答案是不能。commitAsync 的問題在於,出現問題時它不會自動重試。因為它是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。
那么既然手動提交的兩種方式似乎都有一定的缺陷是否可以相互彌補一下呢。下面分析一下
1. 我們可以利用 commitSync 的自動重試來規避那些瞬時錯誤,比如網絡的瞬時抖動,Broker 端 GC 等。因為這些問題都是短暫的,自動重試通常都會成功,因此,我們不想自己重試,而是希望 Kafka Consumer 幫我們做這件事。
2. 我們不希望程序總處於阻塞狀態,影響 TPS。
於是可以給出一個sample:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records);
commitAysnc();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
這段代碼同時使用了 commitSync() 和 commitAsync()。對於常規性、階段性的手動提交,我們調用 commitAsync() 避免程序阻塞,而在 Consumer 要關閉前,我們調用commitSync() 方法執行同步阻塞式的位移提交,以確保 Consumer 關閉前能夠保存正確的位移數據。將兩者結合后,我們既實現了異步無阻塞式的位移管理,也確保了Consumer 位移的正確性. 僅供參考。