1、Offset Topic
Consumer通過提交Offset來記錄當前消費的最后位置,以便於消費者發生崩潰或者有新的消費者加入消費者組,而引發的分區再均衡操作,每個消費者可能會分到不同的分區。我測試的kafka版本是:0.11.0.2,消費者往一個特殊的主題“_consumer_offset”發送消息,如圖:
消息的內容包括:
fields | content |
---|---|
Key | Consumer Group, topic, partition |
Payload | Offset, metadata, timestamp |
提交到“_consumer_offset”主題的消息會根據消費組的key進行分區,一個消費組內的所有消息,都會發送到唯一的Partition。
2、Offset Commit
Offset的提交邏輯其實和普通的生產者往kafka發送數據是一樣的。
2.1、Consumer
消費者啟動時會為“_consumer_offset”主題創建一個內置的生產者,用於Offset數據的提交。
2.2、Broker
就是將Offset提交當成是正常的生產請求,邏輯不變。
“_consumer_offset”主題會在集群中的第一個Offset提交請求時被自動創建。
3、Offset的提交方式
Offset提交時會有兩個問題:重復消費和漏消費。
- 當提交的Offset小於客戶端處理的最后一條消息的Offset,會造成重復消費。
情景:先消費,后提交Offset,如果消費成功、提交失敗,消費者下次獲取的Offset還是以前的,所以會造成重復消費。 - 當提交的Offset大於客戶端處理的最后一條消息的Offset,會造成漏消費。
情景:先提交Offset,后消費,如果提交成功、消費失敗,消費者下次獲取的Offset已經是新的,所以會造成漏消費。
根據具體的業務情況,選擇合適的提交方式,可以有效的解決掉重復消費和漏消費的問題。
3.1、自動提交
自動提交是最簡單的提交方式,通過設置參數,可以開啟自動提交也可以設置提交的時間間隔。缺點就是,當消費了一些數據后,還未達到自動的提交時間,這個時候,有新的消費者加入,或者當前消費者掛掉,會出現分區再均衡操作,之后消費者重新在上一次提交的Offset開始消費,造成重復消費。雖然可以縮短自動提交間隔,但是還是無法解決這個問題。
3.2、同步提交當前Offset
關閉手動提交,可以通過同步提交接口來提交當前的Offset,雖然可以獲取主動性,但是也犧牲了吞吐量,因為同步提交必然是阻塞的,而且會有重試機制。
3.3、異步提交當前Offset
使用異步提交方式,既有主動性,也可以增加kafka消費的吞吐量,沒有重試機制,也解決不掉重復消費的問題。
3.4、同步和異步組合提交
正常使用的時候使用異步提交,速度快。當要關閉消費者的時候,使用同步提交,即使失敗了也會一直重試,直到提交成功或者發生無法恢復的錯誤。不管是同步提交還是異步提交都避免不了重復消費和漏消費的問題。
3.5、提交指定的Offset
因為自動提交、同步提交與異步提交都是將最后一個Offset提交上去。通過提交指定的Offset,可以減輕重復消費和漏消費的問題,但是相應的消費端就需要復雜的業務處理,而且需要自己維護Offset。