https://blog.csdn.net/duyusean
RabbitMQ消息丟失的情況
第一種:生產者弄丟了數據。生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因為網絡問題啥的,都有可能。
第二種:RabbitMQ 弄丟了數據。MQ還沒有持久化自己掛了
第三種:消費端弄丟了數據。剛消費到,還沒處理,結果進程掛了,比如重啟了。
解決方案
一:針對生產者
方案1.開啟RabbitMQ事務
可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發送消息;如果收到了消息,那么可以提交事務channel.txCommit。
// 開啟事務
channel.txSelect
try {
// 這里發送消息
} catch (Exception e) {
channel.txRollback
// 這里再次重發這條消息
}
// 提交事務
channel.txCommit
缺點:
但是問題是,RabbitMQ 事務機制是同步的,你提交一個事務之后會阻塞在那兒,采用這種方式基本上吞吐量會下來,因為太耗性能。
方案2 使用confirm機制
事務機制和 confirm 機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是 confirm 機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調你的一個接口通知你這個消息接收到了。
二、針對RabbitMQ
1.消息持久化
2.設置集群鏡像模式
3.消息補償機制
第一種:消息持久化
RabbitMQ 的消息默認存放在內存上面,如果不特別聲明設置,消息不會持久化保存到硬盤上面的,如果節點重啟或者意外crash掉,消息就會丟失。
所以就要對消息進行持久化處理。如何持久化,下面具體說明下:
要想做到消息持久化,必須滿足以下三個條件,缺一不可。
1) Exchange 設置持久化
2)Queue 設置持久化
3)Message持久化發送:發送消息設置發送模式deliveryMode=2,代表持久化消息
第二種:設置集群鏡像模式
我們先來介紹下RabbitMQ三種部署模式:
1)單節點模式:最簡單的情況,非集群模式,節點掛了,消息就不能用了。業務可能癱瘓,只能等待。
2)普通模式:默認的集群模式,某個節點掛了,該節點上的消息不能用,有影響的業務癱瘓,只能等待節點恢復重啟可用(必須持久化消息情況下)。
3)鏡像模式:把需要的隊列做成鏡像隊列,存在於多個節點,屬於RabbitMQ的HA方案
為什么設置鏡像模式集群,因為隊列的內容僅僅存在某一個節點上面,不會存在所有節點上面,所有節點僅僅存放消息結構和元數據。下面自己畫了一張圖介紹普通集群丟失消息情況:
如果想解決上面途中問題,保證消息不丟失,需要采用HA 鏡像模式隊列。
下面介紹下三種HA策略模式:
1)同步至所有的
2)同步最多N個機器
3)只同步至符合指定名稱的nodes
命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
1)為每個以“rock.wechat”開頭的隊列設置所有節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
2)為每個以“rock.wechat.”開頭的隊列設置兩個節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
3)為每個以“node.”開頭的隊列分配指定的節點做鏡像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
但是:HA 鏡像隊列有一個很大的缺點就是: 系統的吞吐量會有所下降
第三種:消息補償機制
為什么還要消息補償機制呢?難道消息還會丟失,沒錯,系統是在一個復雜的環境,不要想的太簡單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問題,
但是作為有追求的程序員來講,要絕對保證我的系統的穩定性,有一種危機意識。
比如:持久化的消息,保存到硬盤過程中,當前隊列節點掛了,存儲節點硬盤又壞了,消息丟了,怎么辦?
產線網絡環境太復雜,所以不知數太多,消息補償機制需要建立在消息要寫入DB日志,發送日志,接受日志,兩者的狀態必須記錄。
然后根據DB日志記錄check 消息發送消費是否成功,不成功,進行消息補償措施,重新發送消息處理。
三、針對消費者
ACK確認機制
多個消費者同時收取消息,比如消息接收到一半的時候,一個消費者死掉了(邏輯復雜時間太長,超時了或者消費被停機或者網絡斷開鏈接),如何保證消息不丟?
這個使用就要使用Message acknowledgment 機制,就是消費端消費完成要通知服務端,服務端才把消息從內存刪除。
這樣就解決了,及時一個消費者出了問題,沒有同步消息給服務端,還有其他的消費端去消費,保證了消息不丟的case。