RabbitMQ保障消息 100% 投遞成功方案


什么是生產端的可靠性投遞?

  • 保障消息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker) 確認應答
  • 完善的消息補償機制

如果想保障消息百分百投遞成功,只做到前三步不一定能夠保障。有些時候或者說有些極端情況,比如生產端在投遞消息時可能就失敗了,或者說生產端投遞了消息,MQ也收到了,MQ在返回確認應答時,由於網絡閃斷導致生產端沒有收到應答,此時這條消息就不知道投遞成功了還是失敗了,所以針對這些情況我們需要做一些補償機制。

互聯網大廠的解決方案

  1. 消息落庫,對消息狀態進行打標
  2. 消息的延遲投遞,做二次確認,回調檢查

具體使用哪種要根據業務場景和並發量、數據量大小來決定

消息信息落庫,對消息狀態進行打標

 
 
  1. 進行數據的入庫
    比如我們要發送一條訂單消息,首先把業務數據也就是訂單信息進行入庫,然后生成一條消息,把消息也進行入庫,這條消息應該包含消息狀態屬性,並設置初始值比如為0,表示消息創建成功正在發送中,這種方式缺陷在於我們要對數據庫進行持久化兩次。
  2. 首先要保證第一步消息都存儲成功了,沒有出現任何異常情況,然后生產端再進行消息發送。如果失敗了就進行快速失敗機制。
  3. MQ把消息收到的結果應答(confirm)給生產端
  4. 生產端有一個Confirm Listener,去異步的監聽Broker回送的響應,從而判斷消息是否投遞成功,如果成功,去數據庫查詢該消息,並將消息狀態更新為1,表示消息投遞成功。

假設第二步OK了,在第三步回送響應時,網絡突然出現了閃斷,導致生產端的Listener就永遠收不到這條消息的confirm應答了,也就是說這條消息的狀態就一直為0了。

  1. 此時我們需要設置一個規則,比如說消息在入庫時候設置一個臨界值timeout,5分鍾之后如果還是0的狀態那就需要把消息抽取出來。這里我們使用的是分布式定時任務,去定時抓取DB中距離消息創建時間超過5分鍾的且狀態為0的消息。
  2. 把抓取出來的消息進行重新投遞(Retry Send),也就是從第二步開始繼續往下走
  3. 當然有些消息可能就是由於一些實際的問題無法路由到Broker,比如routingKey設置不對,對應的隊列被誤刪除了,那么這種消息即使重試多次也仍然無法投遞成功,所以需要對重試次數做限制,比如限制3次,如果投遞次數大於三次,那么就將消息狀態更新為2,表示這個消息最終投遞失敗。

針對這種情況如何去做補償呢,可以有一個補償系統去查詢這些最終失敗的消息,然后給出失敗的原因,當然這些可能都需要人工去操作。

第一種可靠性投遞,在高並發的場景下是否適合?

對於第一種方案,我們需要做兩次數據庫的持久化操作,在高並發場景下顯然數據庫存在着性能瓶頸。其實在我們的核心鏈路中只需要對業務進行入庫就可以了,消息就沒必要先入庫了,我們可以做消息的延遲投遞,做二次確認,回調檢查。

當然這種方案不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的,有些特別極端的情況只能是人工去做補償了,或者使用定時任務去做都可以。
使用第二種方式主要目的是為了減少數據庫操作,提高並發量。

消息的延遲投遞,做二次確認,回調檢查

 
 

Upstream Service上游服務也就是生產端,Downstream service下游服務也就是消費端,Callback service就是回調服務。

  1. 先將業務消息進行入庫,然后生產端將消息發送出去,注意一定是等數據庫操作完成以后再去發送消息。
  2. 在發送消息之后,緊接着生產端再次發送一條消息(Second Send Delay Check),即延遲消息投遞檢查,這里需要設置一個延遲時間,比如5分鍾之后進行投遞。
  3. 消費端去監聽指定隊列,將收到的消息進行處理。
  4. 處理完成之后,發送一個confirm消息,也就是回送響應,但是這里響應不是正常的ACK,而是重新生成一條消息,投遞到MQ中。
  5. 上面的Callback service是一個單獨的服務,其實它扮演了第一種方案的存儲消息的DB角色,它通過MQ去監聽下游服務發送的confirm消息,如果Callback service收到confirm消息,那么就對消息做持久化存儲,即將消息持久化到DB中。
  6. 5分鍾之后延遲消息發送到MQ了,然后Callback service還是去監聽延遲消息所對應的隊列,收到Check消息后去檢查DB中是否存在消息,如果存在,則不需要做任何處理,如果不存在或者消費失敗了,那么Callback service就需要主動發起RPC通信給上游服務,告訴它延遲檢查的這條消息我沒有找到,你需要重新發送,生產端收到信息后就會重新查詢業務消息然后將消息發送出去。

這么做的目的是少做了一次DB的存儲,在高並發場景下,最關心的不是消息100%投遞成功,而是一定要保證性能,保證能抗得住這么大的並發量。所以能節省數據庫的操作就盡量節省,可以異步的進行補償。

其實在主流程里面是沒有這個Callback service的,它屬於一個補償的服務,整個核心鏈路就是生產端入庫業務消息,發送消息到MQ,消費端監聽隊列,消費消息。其他的步驟都是一個補償機制。

第二種方案也是互聯網大廠更為經典和主流的解決方案。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM