Spring-Kafka —— 消費重試機制實現


 

消息處理問題

在從Kafka主題接收消息之后立即處理消息的消費者的實現非常簡單。不幸的是,現實要復雜得多,並且由於各種原因,消息處理可能會失敗。其中一些原因是永久性問題,例如數據庫約束失敗或消息格式無效。其他,如消息處理中涉及的依賴系統的臨時不可用,可以在將來解決。在這些情況下,重試消息處理可能是一種有效的解決方案。

 

 

非阻塞重試邏輯

在像Kafka這樣的流媒體系統中,我們不能跳過消息並在以后回復它們。一旦我們移動當前消息中指向Kafka的指針,我們就無法返回。為簡單起見,我們假設消息偏移在成功的消息處理之后就被記住了。在這種情況下,除非我們成功處理當前消息,否則我們無法接收下一條消息。如果處理單個消息不斷失敗,則會阻止系統處理下一條消息。很明顯,我們希望避免這種情況,因為通常一次消息處理失敗並不意味着下一次消息處理失敗。此外,在較長時間(例如一小時)之后,由於各種原因,失敗消息的處理可能成功。對他們來說,我們所依賴的系統可以再次出現。

 

在消息處理失敗時,我們可以將消息的副本發布到另一個主題並等待下一條消息。讓我們將新主題稱為'retry_topic'。'retry_topic'的消費者將從Kafka接收消息,然后在開始消息處理之前等待一些預定義的時間,例如一小時。通過這種方式,我們可以推遲下一次消息處理嘗試,而不會對'main_topic'消費者產生任何影響。如果'retry_topic'消費者中的處理失敗,我們只需放棄並將消息存儲在'failed_topic'中,以便進一步手動處理此問題。

 

業務重試場景

  • 'retry_timestamp',其值計算為現在+ 5分鍾

總結

正如您所注意到的,在發生某些故障時實施推遲消息處理並不是一件容易的事情。請記住:

  • 您不能跳過消費並稍后再處理此消息
  • 如果要推遲處理某些消息,可以將它們重新發布到單獨的主題,每個延遲值一個
  • 處理失敗的消息可以通過克隆消息並將其重新發布到重試主題之一來實現,其中包含有關嘗試次數和下次重試時間戳的更新信息
  • 除非是時候處理消息,否則重試主題的消費者應該阻止該線程
  • 重試主題中的消息按時間順序自然組織,必須按順序處理

 

問題

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM