消息處理問題
在從Kafka主題接收消息之后立即處理消息的消費者的實現非常簡單。不幸的是,現實要復雜得多,並且由於各種原因,消息處理可能會失敗。其中一些原因是永久性問題,例如數據庫約束失敗或消息格式無效。其他,如消息處理中涉及的依賴系統的臨時不可用,可以在將來解決。在這些情況下,重試消息處理可能是一種有效的解決方案。
非阻塞重試邏輯
在像Kafka這樣的流媒體系統中,我們不能跳過消息並在以后回復它們。一旦我們移動當前消息中指向Kafka的指針,我們就無法返回。為簡單起見,我們假設消息偏移在成功的消息處理之后就被記住了。在這種情況下,除非我們成功處理當前消息,否則我們無法接收下一條消息。如果處理單個消息不斷失敗,則會阻止系統處理下一條消息。很明顯,我們希望避免這種情況,因為通常一次消息處理失敗並不意味着下一次消息處理失敗。此外,在較長時間(例如一小時)之后,由於各種原因,失敗消息的處理可能成功。對他們來說,我們所依賴的系統可以再次出現。
在消息處理失敗時,我們可以將消息的副本發布到另一個主題並等待下一條消息。讓我們將新主題稱為'retry_topic'。'retry_topic'的消費者將從Kafka接收消息,然后在開始消息處理之前等待一些預定義的時間,例如一小時。通過這種方式,我們可以推遲下一次消息處理嘗試,而不會對'main_topic'消費者產生任何影響。如果'retry_topic'消費者中的處理失敗,我們只需放棄並將消息存儲在'failed_topic'中,以便進一步手動處理此問題。
業務重試場景
現在讓我們考慮以下場景。一條新消息被寫入主題'main_topic'。如果此消息的處理失敗,那么我們應該在5分鍾內再次嘗試。我們怎么做?我們應該向'retry_topic'寫一條新消息,它包裝失敗的消息並添加2個字段:
- 'retry_number',值為1
- 'retry_timestamp',其值計算為現在+ 5分鍾
這意味着'main_topic'使用者將失敗的消息處理的責任委托給另一個組件。'main_topic'消費者未被阻止,可以接收下一條消息。'retry_topic'消費者將立即收到'main_topic'消費者發布的消息。它必須從消息中讀取'retry_timestamp'值並等到那一刻,阻塞線程。線程喚醒后,它將再次嘗試處理該消息。如果成功,那么我們可以獲取下一個可用消息。否則我們必須再次嘗試。我們要做的是克隆消息,遞增'attempt_number'值(它將為2)並將'retry_timestamp'值設置為now + 5分鍾。消息克隆將再次發布到'retry__topic。
如果我們到達重試最高次數。現在是時候說“停止”了。我們將消息寫入'failed_topic'並將此消息視為未處理。有人必須手動處理它。
下面的圖片可以幫助您理解消息流:
總結
正如您所注意到的,在發生某些故障時實施推遲消息處理並不是一件容易的事情。請記住:
- 可以僅按順序從主題分區中使用消息
- 您不能跳過消費並稍后再處理此消息
- 如果要推遲處理某些消息,可以將它們重新發布到單獨的主題,每個延遲值一個
- 處理失敗的消息可以通過克隆消息並將其重新發布到重試主題之一來實現,其中包含有關嘗試次數和下次重試時間戳的更新信息
- 除非是時候處理消息,否則重試主題的消費者應該阻止該線程
- 重試主題中的消息按時間順序自然組織,必須按順序處理
問題