Kafka重復消費和丟失數據研究



Kafka重復消費原因

底層根本原因:已經消費了數據,但是offset沒提交。

原因1:強行kill線程,導致消費后的數據,offset沒有提交。

原因2:設置offset為自動提交,關閉kafka時,如果在close之前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費。例如:

try {

consumer.unsubscribe();

} catch (Exception e) {

}

try {

consumer.close();

} catch (Exception e) {

}

上面代碼會導致部分offset沒提交,下次啟動時會重復消費。


Kafka Consumer丟失數據原因

猜測:設置offset為自動定時提交,當offset被自動定時提交時,數據還在內存中未處理,此時剛好把線程kill掉,那么offset已經提交,但是數據未處理,導致這部分內存中的數據丟失。



 

記錄offset和恢復offset的方案

 

理論上記錄offset,下一個group consumer可以接着記錄的offset位置繼續消費。

offset記錄方案:

每次消費時更新每個topic+partition位置的offset在內存中,

Map<key, value>,key=topic+'-'+partition,value=offset

當調用關閉consumer線程時,把上面Map的offset數據記錄到 文件中*(分布式集群可能要記錄到redis中)。

 

下一次啟動consumer,需要讀取上一次的offset信息,方法是 以當前的topic+partition為key,從上次的Map中去尋找offset。

然后使用consumer.seek()方法指定到上次的offset位置。

 

說明:

1、該方案針對單台服務器比較簡單,直接把offset記錄到本地文件中即可,但是對於多台服務器集群,offset也要記錄到同一個地方,並且需要做去重處理。

      如果線上程序是由多台服務器組成的集群,是否可以用一台服務器來支撐?應該可以,只是消費慢一點,沒多大影響。


2、如何保證接着offset消費的數據正確性

為了確保consumer消費的數據一定是接着上一次consumer消費的數據,

consumer消費時,記錄第一次取出的數據,將其offset和上次consumer最后消費的offset進行對比,如果相同則繼續消費。如果不同,則停止消費,檢查原因。







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM