kafka手動維護偏移量


1、kafka手動維護偏移量

  在項目中,kafka和sparkStream采用的是直連方式,使用的是kafka基礎的api,因此需要手動維護偏移量。將偏移量保存在mysql中。

  程序運行時,先去mysql中查詢偏移量,判斷是否是程序第一次啟動,若是第一次啟動,就是不指定偏移量,重頭讀取kafka數據。若是非第一次啟動,即從mysql中有偏移量。此時還要對比數據庫中的偏移量和kafka現在每個分區的最早偏移量getEarliestLeaderOffsets,因為kafka數據默認是保存七天,也就是偏移量有效期就是七天。若數據庫中的偏移量沒過期,那就從數據庫保存的偏移量開始讀。若過期了,那就從現在最新的開始讀。

       這里出現一個問題,kafka的patition分區數不一定不變,有時候就是為了提升spark Streaming的並行處理的能力,這時要必須增加kafka的patition分區數以對應spark Streaming的executor數,--num- executor這個主要設置即可,因為patition分區數要等於executor的數量,大了小了都不好。而新增分區的偏移量若沒有及時保存在數據庫上的話,就會出現數據丟失,消費不到新增分區的數據。

     這里的解決方式,就是每次啟動流程序前,對比一下當前我們自己保存的kafka的分區的個數和從zookeeper里面的存的topic的patition分區個數是否一致,如果不一致,就把新增的分區給添加到我們自己保存的信息中,並發偏移量初始化成 0,這樣以來在程序啟動后,就會自動識別新增分區的數據。 

 

參考博客:Kafka偏移量維護中的坑  https://www.jianshu.com/p/316e50a570dd


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM