原文鏈接:https://cloud.tencent.com/developer/article/1462432
一、概述
在新消費者客戶端中,消費位移是存儲在Kafka內部的主題 __consumer_offsets 中。把消費位移存儲起來(持久化)的動作稱為 “提交” ,消費者在消費完消息之后需要執行消費位移的提交。
參考下圖的消費位移,x 表示某一次拉取操作中此分區消息的最大偏移量,假設當前消費者已經消費了 x 位置的消息,那么我們就可以說消費者的消費位移為 x ,圖中也用了 lastConsumedOffset 這個單詞來標識它。

不過需要非常明確的是,當前消費者需要提交的消費位移並不是 x ,而是 x+1 ,對應上圖中的 position ,它表示下一條需要拉取的消息的位置。
KafkaConsumer 類提供了 partition(TopicPartition) 和 committed(TopicPartition) 兩個方法來分別獲取上面所說的 postion 和 committed offset 的值。這兩個方法的定義如下所示:
- public long position(TopicPartition partition)
- public OffsetAndMetadata committed(TopicPartition partition)
可通過 TestOffsetAndPosition.java 來測試consumed offset、committed offset、position之間的關系。該 TestOffsetAndPosition.java 文件的地址為:
https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/TestOffsetAndPosition.java
二、offset 提交的兩種方式
1、自動提交
在 Kafka 中默認的消費位移的提交方式為自動提交,這個由消費者客戶端參數 enable.auto.commit 配置,默認值為 true 。這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端 auto.commit.interval.ms 配置,默認值為 5 秒,此參數生效的前提是 enable.auto.commit 參數為 true 。
在默認的配置下,消費者每隔 5 秒會將拉取到的每個分區中最大的消息位移進行提交。自動位移提交的動作是在 poll() 方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移。
2、手動提交
Kafka 自動提交消費位移的方式非常簡便,它免去了復雜的位移提交邏輯,但並沒有為開發者留有余地來處理重復消費和消息丟失的問題。自動位移提交無法做到精確的位移管理,所以Kafka還提供了手動位移提交的方式,這樣就可以使得開發人員對消費位移的管理控制更加靈活。開啟手動提交功能的前提是消費者客戶端參數 enable.auto.commit 配置為 false 。示例如下:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手動提交又分為同步提交和異步提交,對應於 KafkaConsumer 中的 commitSync() 和 commitAsync() 兩種類型的方法。
2.1、同步提交
消費者可以調用 commitSync() 方法,來實現位移的同步提交。
commitSync() 方法會根據 poll() 方法拉取的最新位移來進行提交,只要沒有發生不可回復的錯誤,它就會阻塞消費者線程直至位移提交完成。
對於采用 commitSync() 的無參方法而言,它提交消費位移的頻率和拉取批次消息、處理批次消息的頻率是一樣的。如果想尋求更細粒度的、更精准的提交,那么就需要使用 commitSync() 的另一個含參方法,具體定義如下:
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
該方法提供了一個 offsets 參數,用來提交指定分區的位移。
2.2、異步提交
與 commitSync() 方法相反,異步提交的方式在執行的時候消費者線程不會被阻塞,可以在提交消費位移的結果還未返回之前就開始新一次的拉取操作。異步提交可以使消費者的性能得到一定的增強。commitAsync() 方法有三個不同的重載方法:
public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
第一個無參方法和第三個方法中的 offsets 都很好理解,對照 commitSync() 方法即可。關鍵是第二個方法與第三個方法的 callback 參數,它提供了一個異步提交的回調方法,當位移提交完成后會回調 OffsetCommitCallback 中的 onComplete() 方法。如下圖所示:

發送提交請求后可以繼續做其它事情。如果提交失敗,錯誤信息和偏移量會被記錄下來。
三、同步和異步組合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在 關閉消費者 或 再均衡(分區的所屬權從一個消費者轉移到另一個消費者的行為) 前的最后一次提交,就要確保能夠提交成功。
因此,在消費者關閉前一般會組合使用 commitAsync() 和 commitSync() 。使用 commitAsync() 方式來做每條消費信息的提交(因為該種方式速度更快),最后再使用 commitSync() 方式來做位移提交最后的保證。
try { while (true) { // 消費者poll並且執行一些操作 // ... // 異步提交,也可使用有回調函數的異步提交。較同步提交速度更快。 consumer.commitAsync(); } } catch (Exception e) { logger.error("Unexpected error" , e); } finally { try { // 同步提交,來做位移提交最后的保證。 consumer.commitSync(); } finally { consumer.close(); } }
四、總結
本文主要講解了消費者提交消息位移的兩種方式,分為:
- 自動提交
- 手動提交
而 手動提交 又分為:
- 同步提交
- 異步提交
而在一般情況下,建議使用手動的方式:異步和同步組合提交消息位移。因為異步提交不需要等待提交的反饋結果,即可進行新一次的拉取消息操作,速度較同步提交更快。但在最后一次提交消息位移之前,為了保證位移提交成功,還是需要再做一次同步提交操作。
本文參考《Kafka權威指南》與《深入理解Kafka:核心設計與實踐原理》,也推薦大家閱讀這兩本書。