kafka自動提交方式下的重復消費解決辦法


最近項目上遇到一個kafka重復消費的問題,大體描述一下:

程序日志顯示一直在重復消費從kafka中獲取到的其中500條記錄,處理這500條數據耗時8分鍾。kafka的server.log日志一直在提示rebalance。

網上找了很多帖子,發現其中對於max.poll.records和session.timeout.ms兩者關系的描述基本都是錯誤的,錯誤描述如下:

# max.poll.records條數據需要在session.timeout.ms這個時間內處理完,默認:500

# 即 max.poll.records * (處理能力)  <= session.timeout.ms 可正常消費,不滿足會出現rebalance

spring.kafka.consumer.max-poll-records=500

ok,口說無憑,實操一下:

配置如下:

spring.kafka.consumer.max-poll-records=20

spring.kafka.consumer.properties.session.timeout.ms=10000

處理程序每次用Thread.sleep(1000)來模擬,即程序的處理能力為1秒消費一條記錄,安裝上述配置,20條記錄至少需要20秒,配置10秒顯然不夠,肯定會出現rebalance。

實測結果,正常處理,kafka也沒有出現rebalance。

why? 顯然,max.poll.records和session.timeout.ms兩者的關系不是如此,繼續深入一下。

我們再來看看上面的session.timeout.ms,指的是什么,為了准確,直接去官網:

 

翻譯一下,基本就是說:

session.timeout.ms為會話的超時限制。如果consumer在這段時間內沒有發送心跳信息,則它會被認為掛掉了,並且reblance將會產生,必須在[group.min.session.timeout.ms, group.max.session.timeout.ms]范圍內。默認:10000。

顯然並沒有提到與max.poll.records的關系。

 

查閱官網,發現一個屬性max.poll.interval.ms,官網描述如下:

 

基本意思就是消費者兩次調用poll()取數據的最大延遲時間,超過這個時間消費組會發生rebalance。

消費者第一次poll到數據后,會開始消費,直到本次數據處理完畢,才會進行下一次poll,也就是說:

max.poll.records * (處理能力)  <= max.poll.interval.ms,程序即可正常消費。

實操一下,現象確實如此,這里就不附帶結果了。

寫這篇帖子,主要是因為鄙人踩了這個坑,希望其它人不要繼續了,哈哈。

從上面公式可以看到,我們只要保證程序處理能力穩定,不會隨着時間或者數據量增大,那這rebalacnce就不會出現了。

可以采用異步消費的方式。
————————————————

原文鏈接:https://blog.csdn.net/u011801264/article/details/103921462


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM