交易系統使用storm,在消息高可靠情況下,如何避免消息重復


概要:在使用storm分布式計算框架進行數據處理時,如何保證進入storm的消息的一定會被處理,且不會被重復處理。這個時候僅僅開啟storm的ack機制並不能解決上述問題。那么該如何設計出一個好的方案來解決上述問題?

  現有架構背景:本人所在項目組的實時系統負責為XXX的實時產生的交易記錄進行處理,根據處理的結果向用戶推送不同的信息。實時系統平時接入量每秒1000條,雙十一的時候,最大幾十萬條。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6219878.html

新浪微博:intsmaze劉洋洋哥

  架構設計:

  storm設置的超時時間為3分鍾;kafkaspout的pending的長度為2000;storm開啟ack機制,拓撲程序中如果出現異常則調用ack方法,向spout發出ack消息;每一個交易數據會有一個全局唯一性di。

  處理流程:

  交易數據會發送到kafka,然后拓撲A去kafka取數據進行處理,拓撲A中的OnceBolt會先對從kafka取出的消息進行一個唯一性過濾(根據該消息的全局id判斷該消息是否存儲在redis中,如果有,則說明拓撲A已經對該消息處理過了,則不會把該消息發送該下游的calculateBolt,直接向spout發送ack響應;如果沒有,則把該消息發送該下游的calculateBolt。),calculateBolt對接收到來自上游的數據進行規則的匹配,根據該消息所符合的規則推送到不同的kafka通知主題中。

  拓撲B則是不同的通知拓撲,去kafka讀取對應通知的主題,然后把該消息推送到不同的客戶端(微信客戶端,支付寶客戶端等)。

  架構設計的意義:

  通過借用redis,來保證消息不會被重復處理,對異常的消息,我們不讓該消息重發。

  因為系統只是對交易成功后的數據通過配置的規則進行區分來向用戶推送不同的活動信息,從業務上看,系統並不需要保證所有交易的用戶都一定要收到活動信息,只需要保證交易的用戶不會收到重復的數據即可。

  但是在線上運行半年后,還是發現了消息重復處理的問題,某些用戶還是會收到兩條甚至多條重復信息。

  通過對現有架構的查看,我們發現問題出在拓撲B中(各個不同的通知拓撲),原因是拓撲B沒有添加唯一性過濾bolt,雖然上游的拓撲對消息進行唯一性過濾了(保證了外部系統向kafka生產消息出現重復下,拓撲A不進行重復處理),但是回看拓撲B,我們可以知道消息重發絕對不是kafka主題中存在重復的兩條消息,且拓撲B消息重復不是系統異常導致的(我們隊異常進行ack應答),那么導致消息重復處理的原因就一定是消息超時導致的。ps:消息在storm中被處理,沒有發生異常,而是由於集群硬件資源的爭搶或者下游接口瓶頸無法快速處理拓撲B推送出去的消息,導致一條消息在3分鍾內沒有處理完,spout就認為該消息fail,而重新發該消息,但是超時的那一條消息並不是說不會處理,當他獲得資源了,仍然會處理結束的。

   解決方案:在拓撲B中添加唯一性過濾bolt即可解決。

  個人推測:當時實時系統架構設計時,設計唯一性過濾bolt時,可能僅僅是考慮到外部系統向kafka推送數據可能會存在相同的消息,並沒有想到storm本身tuple超時導致的消息重復處理。

  該系統改進:雖然從業務的角度來說,並不需要保證每一個交易用戶都一定要收到活動信息,但是我們完全可以做到每一個用戶都收到活動信息,且收到的消息不重復。

我們可以做到對程序的異常進行控制,但是超時導致的fail我們無法控制。

  我們對消息處理異常控制,當發生異常信息,我們在發送fail應答前,把該異常的消息存儲到redis中,這樣唯一性過濾的bolt就會對收到的每一條消息進行判斷,如果在redis中,我們就知道該消息是異常導致的失敗,就讓該消息繼續處理,如果該消息不在redis中,我們就知道該消息是超時導致的fail,那么我們就過濾掉該消息,不進行下一步處理。

這樣我們就做到了消息的可靠處理且不會重復處理。

 

博主解決的是90%的問題,主要是因為:

1,徹頭徹尾的異常是不會給你寫redis的機會的,只能說絕大多數時候是OK的。
2,超時的任務最終也可能運行成功,這也會導致你做了2次。

我的看法:
既然是交易系統,最重要的就是業務本身滿足冪等性和可重入,架構上容錯導致的重試和重入,都不應該導致業務錯亂。

所以,我認為在架構上能做的,是要保障at least once,博主判斷redis不存在就認為是超時重發,殊不知超時的bolt可能很久之后異常退出,這樣消息就沒有人處理了。

不過具體場景具體分析,看業務需求取舍既可。
    超時的任務最終也可能運行成功,這也會導致你做了2次。(ps:這個不會,我們認為超時的任務最終會處理成功,所以再次發送,我們會在唯一性過濾bolt中把該消息過濾掉)
   超時的bolt可能很久之后異常退出,這樣消息就沒有人處理了(ps:這個我要研究下,就是超時后,再異常向spout發送fial響應是否還會重發消息,如果還會重發,那么就可以保證該異常消息可以再一次被處理)

  徹頭徹尾的異常是不會給你寫redis的機會的,只能說絕大多數時候是OK的。(ps:正確,但是是不可控的吧,就像kafka把offset存儲在zookeeper中,如果zookeeper掛掉就沒有辦法,確實絕大部分是ok
的,解決辦法不知道有沒有。)
  最重要的就是業務本身滿足冪等性和可重入,架構上容錯導致的重試和重入,都不應該導致業務錯亂(ps:我不是很明白,我這里並不要求一條消息具備事務的特性和冪等性有什么關系)
以上是我對該朋友對本系統架構找出的問題的個人思考。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM