kafka系列八、kafka消息重復和丟失的場景及解決方案分析


消息重復和丟失是kafka中很常見的問題,主要發生在以下三個階段:

  1. 生產者階段
  2. broke階段
  3. 消費者階段

一、生產者階段重復場景

1、根本原因

生產發送的消息沒有收到正確的broke響應,導致producer重試。

producer發出一條消息,broke落盤以后因為網絡等種種原因發送端得到一個發送失敗的響應或者網絡中斷,然后producer收到一個可恢復的Exception重試消息導致消息重復。  

2、重試過程

說明:
1. new KafkaProducer()后創建一個后台線程KafkaThread掃描RecordAccumulator中是否有消息;
2. 調用KafkaProducer.send()發送消息,實際上只是把消息保存到RecordAccumulator中;
3. 后台線程KafkaThread掃描到RecordAccumulator中有消息后,將消息發送到kafka集群;
4. 如果發送成功,那么返回成功;
5. 如果發送失敗,那么判斷是否允許重試。如果不允許重試,那么返回失敗的結果;如果允許重試,把消息再保存到RecordAccumulator中,等待后台線程KafkaThread掃描再次發送;

3、可恢復異常說明

異常是RetriableException類型或者TransactionManager允許重試;RetriableException類繼承關系如下:

4、記錄順序問題

  如果設置max.in.flight.requests.per.connection大於1(默認5,單個連接上發送的未確認請求的最大數量,表示上一個發出的請求沒有確認下一個請求又發出了)大於1可能會改變記錄的順序,因為如果將兩個batch發送到單個分區,第一個batch處理失敗並重試,但是第二個batch處理成功,那么第二個batch處理中的記錄可能先出現被消費。

  設置max.in.flight.requests.per.connection為1,可能會影響吞吐量,可以解決單台producer發送順序問題。如果多個producer,producer1先發送一個請求,producer2后發送請求,這是producer1返回可恢復異常,重試一定次數成功了。雖然時producer1先發送消息,但是producer2發送的消息會被先消費。

二、生產者發送重復解決方案

1、啟動kafka的冪等性

  要啟動kafka的冪等性,無需修改代碼,默認為關閉,需要修改配置文件:enable.idempotence=true 同時要求 ack=all 且 retries>1。

  冪等原理:

  每個producer有一個producer id,服務端會通過這個id關聯記錄每個producer的狀態,每個producer的每條消息會帶上一個遞增的sequence,服務端會記錄每個producer對應的當前最大sequence,producerId + sequence ,如果新的消息帶上的sequence不大於當前的最大sequence就拒絕這條消息,如果消息落盤會同時更新最大sequence,這個時候重發的消息會被服務端拒掉從而避免消息重復。該配置同樣應用於kafka事務中。

2、ack=0,不重試。

  可能會丟消息,適用於吞吐量指標重要性高於數據丟失,例如:日志收集。

三、生產者和broke階段消息丟失場景

1、ack=0,不重試

producer發送消息完,不管結果了,如果發送失敗也就丟失了。

2、ack=1,leader crash

producer發送消息完,只等待lead寫入成功就返回了,leader crash了,這時follower沒來及同步,消息丟失。

3、unclean.leader.election.enable 配置true

允許選舉ISR以外的副本作為leader,會導致數據丟失,默認為false。producer發送異步消息完,只等待lead寫入成功就返回了,leader crash了,這時ISR中沒有follower,leader從OSR中選舉,因為OSR中本來落后於Leader造成消息丟失。

四、生產者和broke階段消息丟失解決方案

1、配置:ack=all / -1,tries > 1,unclean.leader.election.enable : false

producer發送消息完,等待ollower同步完再返回,如果異常則重試。這是副本的數量可能影響吞吐量,最大不超過5個,一般三個足夠了。

不允許選舉ISR以外的副本作為leader。

2、配置:min.insync.replicas > 1

當producer將acks設置為“all”(或“-1”)時,min.insync。副本指定必須確認寫操作成功的最小副本數量。如果不能滿足這個最小值,則生產者將引發一個異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

當一起使用時,min.insync.replicas和ack允許執行更大的持久性保證。一個典型的場景是創建一個復制因子為3的主題,設置min.insync復制到2個,用“all”配置發送。將確保如果大多數副本沒有收到寫操作,則生產者將引發異常。

3、失敗的offset單獨記錄

producer發送消息,會自動重試,遇到不可恢復異常會拋出,這時可以捕獲異常記錄到數據庫或緩存,進行單獨處理。

五、消費者數據重復場景及解決方案

1、根本原因

數據消費完沒有及時提交offset到broke。

2、場景

消息消費端在消費過程中掛掉沒有及時提交offset到broke,另一個消費端啟動拿之前記錄的offset開始消費,由於offset的滯后性可能會導致新啟動的客戶端有少量重復消費。

3、解決方案

1、取消自動自動提交

每次消費完或者程序退出時手動提交。這可能也沒法保證一條重復。

2、下游做冪等

一般的解決方案是讓下游做冪等或者盡量每消費一條消息都記錄offset,對於少數嚴格的場景可能需要把offset或唯一ID,例如訂單ID和下游狀態更新放在同一個數據庫里面做事務來保證精確的一次更新或者在下游數據表里面同時記錄消費offset,然后更新下游數據的時候用消費位點做樂觀鎖拒絕掉舊位點的數據更新。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM