第一個文章
在分布式系統中,重試是不可避免的,我們經常使用后台跑定時進行數據同步,同步不成功就實現重試,重試次數多少取決於你追求一致性還是可用性,如果希望兩個系統之前無論如何都必須一致,那么你設置重試次數為無限,當然這是理想情況,實際情況是有重試次數限制和重試時間限制,如果超過不成功怎么辦?丟棄會造成數據丟失進而永久不一致,人工介入又非常復雜,通過引入死信隊列可以優雅處理這種問題。本文是優步Uber工程師夏寧(Ning Xia)發布的一篇如何使用Kafka的死信隊列實現重試處理的。
從網絡錯誤到復制問題甚至下游依賴關系等場景中隨時可能發生的中斷,大規模運行的服務必須盡可能地優雅發現、識別並處理故障。
考慮到優步Uber的運維范圍和效率,我們的系統發生故障時必須智能化地具有容錯性和不妥協性。為了實現這一目標,我們決定使用開源分布式消息傳遞平台Apache Kafka,該平台已經過業界測試,並能提供大規模的高性能。
利用這些屬性,優步行車保險工程團隊通過擴展卡夫卡,在我們現有的事件驅動架構中使用無阻塞請求重新處理和死信隊列(DLQ),實現錯誤處理的解耦,在不中斷實時流量情況下實現可觀察的錯誤處理。這一策略有助於我們遍布200多個城市的駕駛員能夠可靠地實現每次行程的保費扣除。
在本文中,我們重點介紹了使用實時SLA重新處理大型系統中的請求並分享經驗教訓的方法。
在事件驅動的體系結構中工作
優步的駕駛員損傷保護系統的后端位於Kafka消息傳遞架構中,該架構貫穿優步大型微服務生態系統內的多個依賴關系的Java服務。本文我們更專注於我們的重試和死信的策略,並通過一個總的應用程序來管理不同產品的預訂,以實現蓬勃發展的在線業務。
在這個模型中,我們希望提供以下功能:
a)能進行支付
b)為每個用戶的每個產品的預購訂單創建單獨的報表記錄,以生成實時產品分析。
每個功能都可以通過其各自服務的API提供。根據功能要求,設計了兩個服務(消費組),一個是支付消費組完成a功能,一個是報表消費組完成b功能,這兩個消費組都預訂了相同的預訂事件頻道(也就是訂閱了Kafka主題PreOrder):
當系統收到預訂請求時,商店服務發布包含相關請求數據的PreOrder消息。兩個消費組都會監聽這個PreOrder消息,從而執行自己的業務邏輯並調用其相應的服務。
實施重試的簡單快速的解決方案是在客戶端調用呼叫時使用定時循環重試。例如,如果支付服務正在發生延遲等待並開始拋出超時異常,則商店服務將繼續在指定重試次數下進行重試以完成支付),直到它成功或達到另一個停止條件為止。
簡單的重試問題
雖然在客戶端層次進行定時循環的重試可能很有用,但大量大規模的系統重試仍可能會受到以下因素影響:
1.阻止批處理。當我們需要實時處理大量消息時,反復重試產生的失敗消息可能會阻塞正常的批處理。最嚴重的情況是超過重試時間限制,這意味會花最長時間,使用的資源會最多。如果沒有成功的回應,卡夫卡消費者將會不斷提交.
2.難以檢索元數據。在重試上獲取元數據會很麻煩,比如時間戳和第n次重試。
如果下游支付服務出現重大變化,例如,對於之前是有效的預購訂單卻遭遇收費策略調整導致拒絕接受,那么這些消息的所有重試都會無效。接收到該特定消息的消費者不會提交該消息的Kafka指針(偏移量),這意味着該消息將被一次又一次消費,代價是導致到達該通道的大量新消息被迫處於等待而無法被正常讀取。
如果請求在重試后繼續重試失敗,我們希望在DLQ中收集這些故障以進行可視性查看和診斷。DLQ應允許以列表方式查看隊列的內容,清除管理這些內容,並合並重新處理死信消息,允許全面解決所有受共享問題影響的故障。在優步,我們需要一個可靠並且可擴展地為我們提供這些功能的重試策略。
在單獨的隊列中處理
為了解決批處理被重試處理阻塞的問題,我們使用單獨定義的Kafka主題,專門為重試設計單獨隊列。在這種情況下,當消費者處理程序在指定的消息重試次數之后會返回特定消息的失敗響應,消費者將該消息發布到相應的重試主題中。該處理程序然后將true 返回給原始使用者,該使用者會確認提交了它的Kafka偏移量,從而保證Kafka消息能夠持續向下讀取。
在這種類型的系統中重試請求非常簡單。與主處理流程一樣,單獨一組消費者將讀取重試隊列。這些消費者的行為與原始架構中的消費者行為類似,只是消費者使用不同的卡夫卡話題。同時,執行多次重試是通過創建多個主題來完成的,其中每一組不同的監聽器(也就是消費組)訂閱每個重試主題。當特定主題的處理程序返回給定消息的錯誤響應時,它會將該消息發布到它下面的下一個重試主題。
最后,在此設計中,DLQ被定義為最終的卡夫卡主題。如果最后一次重試主題的消費者仍然沒有成功,那么它會將該消息發布到死信主題。在那里,可以使用許多技術來以主題方式進行數據列表,清除和合並,
重要的是不要一個接一個地立即重新嘗試失敗的請求; 這樣做會放大調用的數量,實質上是等同於垃圾郵件的惡意請求。相反,每個后續級別的重試使用者都可以執行處理延遲,換句話說,隨着消息在每個重試主題中逐步下降,超時會增加。此機制遵循漏桶模式。因此,這種重試隊列其實是延遲處理隊列。
我們通過基於隊列的重新處理獲得了什么
現在,我們討論這種方法的好處,因為它涉及確保可靠和可擴展的重新處理:
1.不會都是正常批處理
失敗的消息輸入他們自己的指定通道,使正在進行批處理能夠成功繼續進行,而不是要求在出現故障時重新處理它們。因此,傳入請求的消耗向前暢通無阻,實現更高的實時吞吐量。
2.解耦
獨立工作流在同一個事件上運行,每個工作流都有自己的消費者流程,重試有單獨的再處理和死信隊列。一個隊列中處理失敗並不需要重試那些已經成功的其他消息。
3.可配置
創建新主題實際上不會產生開銷,並且這些主題產生的消息可以遵循相同的架構。原始處理以及每個重試通道都可以分別在易於編寫的較高級別的消費者級別下進行管理,該級別由配置進行管理。
我們還可以區分不同類型錯誤的處理方式,允許重新嘗試網絡脆弱等情況,而空指針異常和其他代碼錯誤應該直接進入DLQ,因為重試不會修復它們。
4.觀測
將消息處理分割成不同的主題有助於容易地跟蹤錯誤消息的路徑,重試消息的時間和次數以及其有效負載的確切屬性。將生產率與再處理主題和DLQ的生產率相比較,可以為自動警報提供閾值並跟蹤實時服務正常運行時間。
5.靈活性
雖然Kafka本身是用Scala和Java編寫的,但Kafka支持多種語言的客戶端庫。例如,優步的許多服務都使用Go作為他們的Kafka客戶端。
使用像Avro這樣的序列化框架的Kafka消息格式支持可演化的模式。如果我們的數據模型如果需要更新,則只需要最小的調整來反映這一變化。
6.性能和可靠性
Kafka默認提供至少一次的語義。這種耐久性保證在容錯和消息失敗的情況下非常有價值; 當談到提供關鍵業務數據時(如Uber的情況),消息無損(消息不丟失)是最重要的。而且,Kafka的並行模型和基於拉的系統可實現高吞吐量和低延遲。
其他考慮
由於Kafka只能保證分區內的順序處理,而跨分區接受無法保證順序,因此應用程序必須能夠處理事件發生的確切順序以外的事件。此外,至少一次消息傳遞需要消費者依賴性冪等性,這是任何分布式系統的共同特征。
前面闡述了死信隊列提供的顯著優勢,但真正實施可能因用例而異。例如,根據指定的應用程序處理的數據類型的數量,每個主題代表不同的事件類型,這可能導致需要管理大量主題。在這種情況下,基於計數隊列的替代方案可能是比較好的選擇,將事件類型與其他字段一起打包,從而以更易於管理的方式跟蹤重試次數和時間戳。這種權衡還需要重新考慮如何執行調度,因為這是通過一系列隊列階梯進行管理的。
使用基於計數的卡夫卡主題可實現死信隊列,進行重試的單獨的重新處理執行,使我們能夠在基於事件的系統中重試請求,而不會阻止實時流量。在此框架內,工程師可以根據需要配置,擴展,更新和監控消息傳遞,但不會對開發人員時間或應用程序正常運行時間造成任何損失。
第二個文章
https://www.cnblogs.com/xsirfly/p/11533501.html
背景
在kafka的消費者中,如果消費某條消息出錯,會導致該條消息不會被ack,該消息會被不斷的重試,阻塞該分區的其他消息的消費,因此,為了保證消息隊列不被阻塞,在出現異常的情況下,我們一般還是會ack該條消息,再另外對失敗的情況進行重試
目標
實現一個完善的重試邏輯,一般需要考慮一下幾個因素:
- 重試的時間間隔
- 最大重試次數
- 是否會漏掉消息
實現
扔回隊尾
在消息出錯時,將消息扔回隊尾
優點:
- 實現簡單,沒有別的依賴項
缺點:
- 無法控制重試時間間隔
基於數據庫任務表的掃描方案
在數據庫中增加一個任務的狀態表,然后用一個定時任務去掃描任務表中,失敗的任務,然后進行重試,其中記錄下重試的次數即可
優點:
- 實現簡單,一般這種離線任務,根據統計的需求,都會有一個任務狀態表的,所以僅僅是增加一個定時任務去掃表
缺點:
- 性能較差,定時任務,一般都在無意義的掃描,浪費性能
新增重試隊列的方案
新增一個重試隊列,消費消息出錯時,將時間戳和消息發送到重試隊列,然后在重試隊列中,根據時間,來判斷阻塞時間,代碼如下:
func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) { defer common.Recover(ctx, &err) log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent") retryEvent := &MergeRetryEvent{} err = json.Unmarshal(data, retryEvent) if err != nil { log.WithError(err).Error("failed to unmarshal data") return nil } log.WithField("contact_id", retryEvent.ContactId).Info("receive message") delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix() if delaySecond <= 0 { log.Info("send message to account merge event") err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId) return err } else { log.Infof("sleep %d seconds", delaySecond) time.Sleep(time.Duration(delaySecond) * time.Second) err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId) return err } }
優點:
相對於掃表的方案,改方案沒有無意義的掃表操作,性能更好
注意:之前在網上看到一個重試隊列的實現,因為害怕開過多的線程(協程),作者用了一個channel來緩存重試消息,然后在一個協程池中去消費消息,消費的邏輯和上面的實例代碼差不多,這樣做是有風險的,因為channel是在本機的內存中,沒有本地存儲的,是存在丟消息的風險的(服務重啟等情況)
參考鏈接:
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
第三個文章
https://www.cnblogs.com/caoweixiong/p/11181587.html
消息處理問題
在從Kafka主題接收消息之后立即處理消息的消費者的實現非常簡單。不幸的是,現實要復雜得多,並且由於各種原因,消息處理可能會失敗。其中一些原因是永久性問題,例如數據庫約束失敗或消息格式無效。其他,如消息處理中涉及的依賴系統的臨時不可用,可以在將來解決。在這些情況下,重試消息處理可能是一種有效的解決方案。
非阻塞重試邏輯
在像Kafka這樣的流媒體系統中,我們不能跳過消息並在以后回復它們。一旦我們移動當前消息中指向Kafka的指針,我們就無法返回。為簡單起見,我們假設消息偏移在成功的消息處理之后就被記住了。在這種情況下,除非我們成功處理當前消息,否則我們無法接收下一條消息。如果處理單個消息不斷失敗,則會阻止系統處理下一條消息。很明顯,我們希望避免這種情況,因為通常一次消息處理失敗並不意味着下一次消息處理失敗。此外,在較長時間(例如一小時)之后,由於各種原因,失敗消息的處理可能成功。對他們來說,我們所依賴的系統可以再次出現。
在消息處理失敗時,我們可以將消息的副本發布到另一個主題並等待下一條消息。讓我們將新主題稱為'retry_topic'。'retry_topic'的消費者將從Kafka接收消息,然后在開始消息處理之前等待一些預定義的時間,例如一小時。通過這種方式,我們可以推遲下一次消息處理嘗試,而不會對'main_topic'消費者產生任何影響。如果'retry_topic'消費者中的處理失敗,我們只需放棄並將消息存儲在'failed_topic'中,以便進一步手動處理此問題。
業務重試場景
現在讓我們考慮以下場景。一條新消息被寫入主題'main_topic'。如果此消息的處理失敗,那么我們應該在5分鍾內再次嘗試。我們怎么做?我們應該向'retry_topic'寫一條新消息,它包裝失敗的消息並添加2個字段:
- 'retry_number',值為1
- 'retry_timestamp',其值計算為現在+ 5分鍾
這意味着'main_topic'使用者將失敗的消息處理的責任委托給另一個組件。'main_topic'消費者未被阻止,可以接收下一條消息。'retry_topic'消費者將立即收到'main_topic'消費者發布的消息。它必須從消息中讀取'retry_timestamp'值並等到那一刻,阻塞線程。線程喚醒后,它將再次嘗試處理該消息。如果成功,那么我們可以獲取下一個可用消息。否則我們必須再次嘗試。我們要做的是克隆消息,遞增'attempt_number'值(它將為2)並將'retry_timestamp'值設置為now + 5分鍾。消息克隆將再次發布到'retry__topic。
如果我們到達重試最高次數。現在是時候說“停止”了。我們將消息寫入'failed_topic'並將此消息視為未處理。有人必須手動處理它。
下面的圖片可以幫助您理解消息流:
總結
正如您所注意到的,在發生某些故障時實施推遲消息處理並不是一件容易的事情。請記住:
- 可以僅按順序從主題分區中使用消息
- 您不能跳過消費並稍后再處理此消息
- 如果要推遲處理某些消息,可以將它們重新發布到單獨的主題,每個延遲值一個
- 處理失敗的消息可以通過克隆消息並將其重新發布到重試主題之一來實現,其中包含有關嘗試次數和下次重試時間戳的更新信息
- 除非是時候處理消息,否則重試主題的消費者應該阻止該線程
- 重試主題中的消息按時間順序自然組織,必須按順序處理
問題