每個broker都會維護一個對應的_consumer_offsets系統主題,用來記錄消費者消費分區數據的位置。0.9版本以前是記錄在zk中的。
_consumer_offsets主題采用key和value的方式存儲數據,key是groupid + topic + partition號,value對應當前offset的值。每隔一段時間,kafka內部會對這個topic進行compact,也就是每個groupid + topic + partition號根據key去重,只保留最新數據。
自動提交offset(kafka默認使用該模式):
為了使我們能夠專注於自己的業務邏輯,kafka提供了自動提交offset的功能。
手動提交offset:
雖然自動提交offset十分簡單遍歷,但由於其是基於時間提交的,開發人員難以把握offset提交的時機。因此kafka還提供了手動提交offset的api。
自動提交分為:(通常使用異步提交方式多一些,追求效率)
同步提交 commitSync:拉取數據提交offset會進入阻塞狀態,直到提交成功后才開始下一次的拉取。
異步提交 commitAsync:拉取數據和提交offset異步進行,不會管offset是否提交成功
指定offset消費:
當kafka中沒有初始偏移量(消費者組第一次消費)或服務器上不再存在當前偏移量時(例如該數據已被刪除),該怎么辦?
消費方式有:auto.offset.reset = latest
earliest:自動將偏移量重置為最早的偏移量,--from-bcginning
latest(默認):自動將偏移量重置為最新偏移量
none:如果未找到消費者組的先前偏移量,則向消費者拋出異常
指定具體的offset:kafkaConsumer.seek(topicPartition,offsetNum)
指定時間消費:
在生產環境中,會遇到最近消費的幾個小時數據異常,想重新按照時間消費。例如按照時間消費前一天的數據。
kafkaConsumer.offsetForTimes(Map); 通過時間獲取對應的offset
漏消費和重復消費:
重復消費:已經消費了數據,但offset沒有提交成功
漏消費:先提交offset后消費,可能會造成數據的漏消費
如果想完成consumer端的精准一次性消費,那么需要kafka消費端將消費過程和提交offset過程做原子綁定。此時我們需要將kafka的offset保存到支持事務的自定義介質(比如mysql)。
數據積壓:
Kafka Eagle監控,需要自己手動安裝
Kafka-Kraft模式,不再依賴於zk