RabbitMQ如何防止消息丟失及重復消費


一、RabbitMQ出現消息丟失的情況及其解決辦法

 

 

 

如圖所示,RabbitMQ丟失消息的情況可以發送在任何一個節點。

1.1 生產者沒有成功把消息發送到MQ

 a、丟失的原因:因為網絡傳輸的不穩定性,當生產者在向MQ發送消息的過程中,MQ沒有成功接收到消息,但是生產者卻以為MQ成功接收到了消息,不會再次重復發送該消息,從而導致消息的丟失。

 b、解決辦法: 有兩個解決辦法:事務機制和confirm機制,最常用的是confirm機制。

事務機制:

       RabbitMQ 提供了事務功能,生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發送消息;如果收到了消息,那么可以提交事務channel.txCommit。偽代碼如下:

  1.  
    // 開啟事務
  2.  
    channel.txSelect
  3.  
    try {
  4.  
    // 這里發送消息
  5.  
    } catch (Exception e) {
  6.  
    channel.txRollback
  7.  
     
  8.  
    // 這里再次重發這條消息
  9.  
    }
  10.  
    // 提交事務
  11.  
    channel.txCommit

confirm機制:

       RabbitMQ可以開啟 confirm 模式,在生產者那里設置開啟 confirm 模式之后,生產者每次寫的消息都會分配一個唯一的 id,如果消息成功寫入 RabbitMQ 中,RabbitMQ 會給生產者回傳一個 ack 消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調你的一個 nack 接口,告訴你這個消息接收失敗,生產者可以發送。而且你可以結合這個機制自己在內存里維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那么可以重發。

注意:RabbitMQ的事務機制是同步的,很耗型能,會降低RabbitMQ的吞吐量。confirm機制是異步的,生成者發送完一個消息之后,不需要等待RabbitMQ的回調,就可以發送下一個消息,當RabbitMQ成功接收到消息之后會自動異步的回調生產者的一個接口返回成功與否的消息。

 

2 RabbitMQ接收到消息之后丟失了消息

a、丟失的原因:RabbitMQ接收到生產者發送過來的消息,是存在內存中的,如果沒有被消費完,此時RabbitMQ宕機了,那么再次啟動的時候,原來內存中的那些消息都丟失了。

b、解決辦法:開啟RabbitMQ的持久化。當生產者把消息成功寫入RabbitMQ之后,RabbitMQ就把消息持久化到磁盤。結合上面的說到的confirm機制,只有當消息成功持久化磁盤之后,才會回調生產者的接口返回ack消息,否則都算失敗,生產者會重新發送。存入磁盤的消息不會丟失,就算RabbitMQ掛掉了,重啟之后,他會讀取磁盤中的消息,不會導致消息的丟失。

c、持久化的配置:

  • 第一點是創建 queue 的時候將其設置為持久化,這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是它是不會持久化 queue 里的數據的。
  • 第二個是發送消息的時候將消息的 deliveryMode 設置為 2,就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。

注意:持久化要起作用必須同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數據。

 

3 消費者弄丟了消息

a、丟失的原因:如果RabbitMQ成功的把消息發送給了消費者,那么RabbitMQ的ack機制會自動的返回成功,表明發送消息成功,下次就不會發送這個消息。但如果就在此時,消費者還沒處理完該消息,然后宕機了,那么這個消息就丟失了。

b、解決的辦法:簡單來說,就是必須關閉 RabbitMQ 的自動 ack,可以通過一個 api 來調用就行,然后每次在自己代碼里確保處理完的時候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。

二、如何防止重復消費

先說為什么會重復消費:正常情況下,消費者在消費消息的時候,消費完畢后,會發送一個確認消息給消息隊列,消息隊列就知道該消息被消費了,就會將該消息從消息隊列中刪除;但是因為網絡傳輸等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將消息分發給其他的消費者。

       解決思路是:保證消息的唯一性,就算是多次傳輸,不要讓消息的多次消費帶來影響;保證消息等冪性;

  • 在消息生產時,MQ內部針對每條生產者發送的消息生成一個inner-msg-id,作為去重和冪等的依據(消息投遞失敗並重傳),避免重復的消息進入隊列;
  • 在消息消費時,要求消息體中必須要有一個bizId(對於同一業務全局唯一,如支付ID、訂單ID、帖子ID等)作為去重和冪等的依據,避免同一條消息被重復消費。

       這個問題針對業務場景來答分以下幾點:

    1. 如果消息是做數據庫的insert操作,給這個消息做一個唯一主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。

    2. 如果消息是做redis的set的操作,不用解決,因為無論set幾次結果都是一樣的,set操作本來就算冪等操作。

    3. 如果以上兩種情況還不行,可以准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM