Kafka中實現延遲隊列
在發送延時消息的時候並不是先投遞到要發送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對用戶不可見,然后通過一個自定義的服務拉取這些內部主題中的消息,並將滿足條件的消息再投遞到要發送的真實的主題中,消費者所訂閱的還是真實的主題。
如果采用這種方案,那么一般是按照不同的延時等級來划分的,比如設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的消息按照延時時間投遞到不同等級的主題中,投遞到同一主題中的消息的延時時間會被強轉為與此主題延時等級一致的延時時間,這樣延時誤差控制在兩個延時等級的時間差范圍之內(比如延時時間為17s的消息投遞到30s的延時主題中,之后按照延時時間為30s進行計算,延時誤差為13s)。雖然有一定的延時誤差,但是誤差可控,並且這樣只需增加少許的主題就能實現延時隊列的功能。
因為一個主題中一般不止一個分區,分區之間的消息並不會按照投遞時間進行排序,DelayQueue的作用是將消息按照再次投遞時間進行有序排序,這樣下游的消息發送線程就能夠按照先后順序獲取最先滿足投遞條件的消息。
Kafka中實現死信隊列和重試隊列
死信可以看作消費者不能處理收到的消息,也可以看作消費者不想處理收到的消息,還可以看作不符合處理要求的消息。比如消息內包含的消息內容無法被消費者解析,為了確保消息的可靠性而不被隨意丟棄,故將其投遞到死信隊列中,這里的死信就可以看作消費者不能處理的消息。再比如超過既定的重試次數之后將消息投入死信隊列,這里就可以將死信看作不符合處理要求的消息。
重試隊列其實可以看作一種回退隊列,具體指消費端消費消息失敗時,為了防止消息無故丟失而重新將消息回滾到 broker 中。與回退隊列不同的是,重試隊列一般分成多個重試等級,每個重試等級一般也會設置重新投遞延時,重試次數越多投遞延時就越大。
理解了他們的概念之后我們就可以為每個主題設置重試隊列,消息第一次消費失敗入重試隊列 Q1,Q1 的重新投遞延時為5s,5s過后重新投遞該消息;如果消息再次消費失敗則入重試隊列 Q2,Q2 的重新投遞延時為10s,10s過后再次投遞該消息。
然后再設置一個主題作為死信隊列,重試越多次重新投遞的時間就越久,並且需要設置一個上限,超過投遞次數就進入死信隊列。重試隊列與延時隊列有相同的地方,都需要設置延時級別。