Kafka重復消費原因
底層根本原因:已經消費了數據,但是offset沒提交。
原因1:強行kill線程,導致消費后的數據,offset沒有提交。
原因2:設置offset為自動提交,關閉kafka時,如果在close之前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費。例如:
try {
consumer.unsubscribe();
} catch (Exception e) {
}
try {
consumer.close();
} catch (Exception e) {
}
上面代碼會導致部分offset沒提交,下次啟動時會重復消費。
Kafka Consumer丟失數據原因
猜測:設置offset為自動定時提交,當offset被自動定時提交時,數據還在內存中未處理,此時剛好把線程kill掉,那么offset已經提交,但是數據未處理,導致這部分內存中的數據丟失。
記錄offset和恢復offset的方案
理論上記錄offset,下一個group consumer可以接着記錄的offset位置繼續消費。
offset記錄方案:
每次消費時更新每個topic+partition位置的offset在內存中,
Map<key, value>,key=topic+'-'+partition,value=offset
當調用關閉consumer線程時,把上面Map的offset數據記錄到 文件中*(分布式集群可能要記錄到redis中)。
下一次啟動consumer,需要讀取上一次的offset信息,方法是 以當前的topic+partition為key,從上次的Map中去尋找offset。
然后使用consumer.seek()方法指定到上次的offset位置。
說明:
1、該方案針對單台服務器比較簡單,直接把offset記錄到本地文件中即可,但是對於多台服務器集群,offset也要記錄到同一個地方,並且需要做去重處理。
如果線上程序是由多台服務器組成的集群,是否可以用一台服務器來支撐?應該可以,只是消費慢一點,沒多大影響。
2、如何保證接着offset消費的數據正確性
為了確保consumer消費的數據一定是接着上一次consumer消費的數據,
consumer消費時,記錄第一次取出的數據,將其offset和上次consumer最后消費的offset進行對比,如果相同則繼續消費。如果不同,則停止消費,檢查原因。