Kafka重復消費總結


一、重復消費的原因

  消息重復消費的根本原因都在於:已經消費了數據,但是offset沒有成功提交。

  其中很大一部分原因在於發生了再均衡。

    1)消費者宕機、重啟等。導致消息已經消費但是沒有提交offset。

    2)消費者使用自動提交offset,但當還沒有提交的時候,有新的消費者加入或者移除,發生了rebalance。再次消費的時候,消費者會根據提交的偏移量來,於是重復消費了數據。

    3)消息處理耗時,或者消費者拉取的消息量太多,處理耗時,超過了max.poll.interval.ms的配置時間,導致認為當前消費者已經死掉,觸發再均衡。

 

 

 

 

二、Kafka知識回顧

  1、消費者常見參數

    ①:fetch.min.bytes,配置Consumer一次拉取請求中能從Kafka中拉取的最小數據量,默認為1B,如果小於這個參數配置的值,就需要進行等待,直到數據量滿足這個參數的配置大小。調大可以提交吞吐量,但也會造成延遲

    ②:fetch.max.bytes,一次拉取數據的最大數據量,默認為52428800B,也就是50M,但是如果設置的值過小,甚至小於每條消息的值,實際上也是能消費成功的

    ③:fetch.wait.max.ms,若是不滿足fetch.min.bytes時,等待消費端請求的最長等待時間,默認是500ms

    ④:max.poll.records,單次poll調用返回的最大消息記錄數,如果處理邏輯很輕量,可以適當提高該值。一次從kafka中poll出來的數據條數,max.poll.records條數據需要在在session.timeout.ms這個時間內處理完,默認值為500

 

    ⑤:consumer.poll(100) ,100 毫秒是一個超時時間,一旦拿到足夠多的數據(fetch.min.bytes 參數設置),consumer.poll(100)會立即返回 ConsumerRecords<String, String> records。如果沒有拿到足夠多的數據,會阻塞100ms,但不會超過100ms就會返回

    ⑥:session. timeout. ms ,默認值是10s,該參數是 Consumer Group 主動檢測 (組內成員comsummer)崩潰的時間間隔。若超過這個時間內沒有收到心跳報文,則認為此消費者已經下線。將觸發再均衡操作

    ⑦:max.poll.interval.ms,兩次拉取消息的間隔,默認5分鍾;通過消費組管理消費者時,該配置指定拉取消息線程最長空閑時間,若超過這個時間間隔沒有發起poll操作,則消費組認為該消費者已離開了消費組,將進行再均衡操作(將分區分配給組內其他消費者成員)

      若超過這個時間則報如下異常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 

      即:無法完成提交,因為組已經重新平衡並將分區分配給另一個成員。這意味着對poll()的后續調用之間的時間比配置的max.poll.interval.ms長,這通常意味着poll循環花費了太多的時間來處理消息。

        可以通過增加max.poll.interval.ms來解決這個問題,也可以通過減少在poll()中使用max.poll.records返回的批的最大大小來解決這個問題

 

  2、poll機制

    ①:每次poll的消息處理完成之后再進行下一次poll,是同步操作

    ②:每次poll之前檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移

    ③:每次poll時,consumer都將嘗試使用上次消費的offset作為起始offset,然后依次拉取消息

    ④:poll(long timeout),timeout指等待輪詢緩沖區的數據所花費的時間,單位是毫秒

 

  3、再均衡 rebalance

    將分區的所有權從一個消費者轉移到其他消費者的行為稱為再均衡(重平衡,rebalance)。

    消費者通過向組織協調者(kafka broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認為 消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

    如果過了一段時間Kafka停止發送心跳了,會話(session)就會過期,組織協調者就會認為這個consumer已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鍾,確認它死亡了才會觸發重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,盡量降低處理停頓。

    重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社區還無法修改。也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費(Stop The World),等待重平衡的完成。而且重平衡這個過程很慢。

    

   觸發再均衡的三種情況:

    ①:有新的消費者加入消費組、或已有消費者主動離開組

    ②:消費者超過session時間未發送心跳(已有 consumer 崩潰了)

    ③:一次poll()之后的消息處理時間超過了max.poll.interval.ms的配置時間,因為一次poll()處理完才會觸發下次poll() (已有 consumer 崩潰了)

    ④:訂閱主題數發生變更

    ⑤:訂閱主題的分區數發生變更

 

 

三、重復消費的解決方案

  由於網絡問題,重復消費不可避免,因此,消費者需要實現消費冪等。

  方案:

    ①:消息表

    ②:數據庫唯一索引

    ③:緩存消費過的消息id

 

 

四、項目kafka重復消費的排查

 重復消費問題1:

  每次拉取的消息記錄數max.poll.records為100,poll最大拉取間隔max.poll.interval.ms為 300s,消息處理過於耗時導致時長大於了這個值,導致再均衡發生重復消費

  解決辦法:

    ①:減少每次拉取的消息記錄數和增大poll之間的時間間隔

    ②:拉取到消息之后異步處理(保證成功消費)

 

 

END.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM