Kafka 實現延遲隊列、死信隊列、重試隊列


Kafka中實現延遲隊列

在發送延時消息的時候並不是先投遞到要發送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對用戶不可見,然后通過一個自定義的服務拉取這些內部主題中的消息,並將滿足條件的消息再投遞到要發送的真實的主題中,消費者所訂閱的還是真實的主題。

如果采用這種方案,那么一般是按照不同的延時等級來划分的,比如設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的消息按照延時時間投遞到不同等級的主題中,投遞到同一主題中的消息的延時時間會被強轉為與此主題延時等級一致的延時時間,這樣延時誤差控制在兩個延時等級的時間差范圍之內(比如延時時間為17s的消息投遞到30s的延時主題中,之后按照延時時間為30s進行計算,延時誤差為13s)。雖然有一定的延時誤差,但是誤差可控,並且這樣只需增加少許的主題就能實現延時隊列的功能。

發送到內部主題(delaytopic*)中的消息會被一個獨立的 DelayService 進程消費,這個 DelayService 進程和 Kafka broker 進程以一對一的配比進行同機部署(參考下圖),以保證服務的可用性。

針對不同延時級別的主題,在 DelayService 的內部都會有單獨的線程來進行消息的拉取,以及單獨的 DelayQueue(這里用的是 JUC 中 DelayQueue)進行消息的暫存。與此同時,在 DelayService 內部還會有專門的消息發送線程來獲取 DelayQueue 的消息並轉發到真實的主題中。從消費、暫存再到轉發,線程之間都是一一對應的關系。如下圖所示,DelayService 的設計應當盡量保持簡單,避免鎖機制產生的隱患。

為了保障內部 DelayQueue 不會因為未處理的消息過多而導致內存的占用過大,DelayService 會對主題中的每個分區進行計數,當達到一定的閾值之后,就會暫停拉取該分區中的消息。

因為一個主題中一般不止一個分區,分區之間的消息並不會按照投遞時間進行排序,DelayQueue的作用是將消息按照再次投遞時間進行有序排序,這樣下游的消息發送線程就能夠按照先后順序獲取最先滿足投遞條件的消息。

Kafka中實現死信隊列和重試隊列

死信可以看作消費者不能處理收到的消息,也可以看作消費者不想處理收到的消息,還可以看作不符合處理要求的消息。比如消息內包含的消息內容無法被消費者解析,為了確保消息的可靠性而不被隨意丟棄,故將其投遞到死信隊列中,這里的死信就可以看作消費者不能處理的消息。再比如超過既定的重試次數之后將消息投入死信隊列,這里就可以將死信看作不符合處理要求的消息。

重試隊列其實可以看作一種回退隊列,具體指消費端消費消息失敗時,為了防止消息無故丟失而重新將消息回滾到 broker 中。與回退隊列不同的是,重試隊列一般分成多個重試等級,每個重試等級一般也會設置重新投遞延時,重試次數越多投遞延時就越大。

理解了他們的概念之后我們就可以為每個主題設置重試隊列,消息第一次消費失敗入重試隊列 Q1,Q1 的重新投遞延時為5s,5s過后重新投遞該消息;如果消息再次消費失敗則入重試隊列 Q2,Q2 的重新投遞延時為10s,10s過后再次投遞該消息。

然后再設置一個主題作為死信隊列,重試越多次重新投遞的時間就越久,並且需要設置一個上限,超過投遞次數就進入死信隊列。重試隊列與延時隊列有相同的地方,都需要設置延時級別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM