Rabbitmq如何保證消息不丟失


 

1.mq原則

數據不能多,也不能少,不能多是說消息不能重復消費;不能少,就是說不能丟失數據。如果mq傳遞的是非常核心的消息,支撐核心的業務,那么這種場景是一定不能丟失數據的。

2.丟失數據場景

丟數據一般分為三種,一種是mq把消息丟了,一種就是消費時將消息丟了。下面從rabbitmq和kafka分別說一下,丟失數據的場景,
A:生產者弄丟了數據
生產者將數據發送到rabbitmq的時候,可能在傳輸過程中因為網絡等問題而將數據弄丟了。
B:rabbitmq自己丟了數據
如果沒有開啟rabbitmq的持久化,那么rabbitmq一旦重啟,那么數據就丟了。所依必須開啟持久化將消息持久化到磁盤,這樣就算rabbitmq掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟失。除非極其罕見的情況,rabbitmq還沒來得及持久化自己就掛了,這樣可能導致一部分數據丟失。
C:消費端弄丟了數據
如果一個消費者應用在消費的時候,剛消費到,還沒處理,如進程掛了,比如重啟了,rabbitmq認為你都消費了,這數據就丟了。

3.如何解決

A:生產者丟失消息

①:可以選擇使用rabbitmq提供是事物功能,就是生產者在發送數據之前開啟事物,然后發送消息,如果消息沒有成功被rabbitmq接收到,那么生產者會受到異常報錯,這時就可以回滾事物,然后嘗試重新發送;如果收到了消息,那么就可以提交事物。

特別說明:AMQP 協議中的事務僅僅是指生產者發送消息給 broker 這一系列流程處理的事務機制,並不包含消費端的處理流程。

  channel.txSelect();//開啟事物
  try{
      //發送消息
  }catch(Exection e){
      channel.txRollback();//回滾事物
      //重新提交
  }

缺點:rabbitmq事物已開啟,就會變為同步阻塞操作,生產者會阻塞等待是否發送成功,太耗性能會造成吞吐量的下降。

②:可以開啟confirm模式。在生產者哪里設置開啟了confirm模式之后,每次寫的消息都會分配一個唯一的id,然后如何寫入了rabbitmq之中,rabbitmq會給你回傳一個ack消息,告訴你這個消息發送OK了;如果rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息失敗了,你可以進行重試。而且你可以結合這個機制知道自己在內存里維護每個消息的id,如果超過一定時間還沒接收到這個消息的回調,那么你可以進行重發。

    //開啟confirm
    channel.confirm();
    //發送成功回調
    public void ack(String messageId){
      
    }

    // 發送失敗回調
    public void nack(String messageId){
        //重發該消息
    }

二者不同
事務機制是同步的,你提交了一個事物之后會阻塞住,但是confirm機制是異步的,發送消息之后可以接着發送下一個消息,然后rabbitmq會回調告知成功與否。
一般在生產者這塊避免丟失,都是用confirm機制。
B:rabbitmq自己弄丟了數據
設置消息持久化到磁盤。設置持久化有兩個步驟:
①創建queue的時候將其設置為持久化的,這樣就可以保證rabbitmq持久化queue的元數據,但是不會持久化queue里面的數據。
②發送消息的時候講消息的deliveryMode設置為2,這樣消息就會被設為持久化方式,此時rabbitmq就會將消息持久化到磁盤上。
必須要同時開啟這兩個才可以。

而且持久化可以跟生產的confirm機制配合起來,只有消息持久化到了磁盤之后,才會通知生產者ack,這樣就算是在持久化之前rabbitmq掛了,數據丟了,生產者收不到ack回調也會進行消息重發。
C:消費者弄丟了數據

rabbitmq有手動ack機制與自動ack機制來解決消費者弄丟數據:
如果使用rabbitmq提供的ack機制,首先關閉rabbitmq的自動ack,使用手動ack,每次在確保處理完這個消息之后,在代碼里手動調用ack。這樣就可以避免消息還沒有處理完就ack。

但是ack機制在異常情況下可能造成重復消費:當消費者異常斷掉連接,但並未掛掉,broker 會得知, 此時broker 尚未獲得 ack,那么消息會被重新放入其他隊列,這樣就導致數據被重復消費了。

應用層解決重復的方式:

  • 1. 專門的 Map 存儲:用來存儲每個消息的執行狀態(用 msgid 區分),執行成功之后更新 Map,有另外消息重復消費的時候,讀取 Map 數據判斷 msgid 對應的執行狀態,已消費則不執行。
  • 2. 業務邏輯判斷:消息執行完會更改某個實體狀態,判斷實體狀態是否更新,如果更新,則不進行重復消費。

總結:AMQP 提供的是“至少一次交付”(at-least-once delivery),異常情況下,消息會被重復消費,此時業務要實現冪等性(重復消息處理)

refer:https://www.cnblogs.com/xishuai/p/9174719.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM