kafka05-offset自動提交和手動提交-漏消費和重復消費


offset的默認維護位置:

每個broker都會維護一個對應的_consumer_offsets系統主題,用來記錄消費者消費分區數據的位置。0.9版本以前是記錄在zk中的。

_consumer_offsets主題采用key和value的方式存儲數據,key是groupid + topic + partition號,value對應當前offset的值。每隔一段時間,kafka內部會對這個topic進行compact,也就是每個groupid + topic + partition號根據key去重,只保留最新數據。

自動提交offset(kafka默認使用該模式):

為了使我們能夠專注於自己的業務邏輯,kafka提供了自動提交offset的功能。

 

 

手動提交offset:

雖然自動提交offset十分簡單遍歷,但由於其是基於時間提交的,開發人員難以把握offset提交的時機。因此kafka還提供了手動提交offset的api。

自動提交分為:(通常使用異步提交方式多一些,追求效率)

  1. 同步提交 commitSync:拉取數據提交offset會進入阻塞狀態,直到提交成功后才開始下一次的拉取。

  2. 異步提交 commitAsync:拉取數據和提交offset異步進行,不會管offset是否提交成功

 

 

指定offset消費:

當kafka中沒有初始偏移量(消費者組第一次消費)或服務器上不再存在當前偏移量時(例如該數據已被刪除),該怎么辦?

消費方式有:auto.offset.reset = latest

  1. earliest:自動將偏移量重置為最早的偏移量,--from-bcginning

  2. latest(默認):自動將偏移量重置為最新偏移量

  3. none:如果未找到消費者組的先前偏移量,則向消費者拋出異常

  4. 指定具體的offset:kafkaConsumer.seek(topicPartition,offsetNum)

指定時間消費:

在生產環境中,會遇到最近消費的幾個小時數據異常,想重新按照時間消費。例如按照時間消費前一天的數據。

kafkaConsumer.offsetForTimes(Map); 通過時間獲取對應的offset

漏消費和重復消費:

重復消費:已經消費了數據,但offset沒有提交成功

漏消費:先提交offset后消費,可能會造成數據的漏消費

如果想完成consumer端的精准一次性消費,那么需要kafka消費端將消費過程和提交offset過程做原子綁定。此時我們需要將kafka的offset保存到支持事務的自定義介質(比如mysql)。

 

 

 

 

數據積壓:

 

 

 

Kafka Eagle監控,需要自己手動安裝

 

Kafka-Kraft模式,不再依賴於zk

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM