一、消息如何保證可靠性傳輸
1.1、可能出現消息丟失的情況
1、Producer在把Message發送Broker的過程中,因為網絡問題等發生丟失,或者Message到了Broker,但是出了問題,沒有保存下來
針對這個問題,Producer可以開啟MQ的事務,如果這個過程出現異常,進行回滾,但是有個很大的問題,你提交一個事務就會阻塞在那,
非常影響性能,生產環境肯定不會開啟事務,一般都是使用confirm機制
2、Broker接收到Message暫存到內存,Consumer還沒來得及消費,Broker掛掉了
可以通過持久化設置去解決:
1).創建Queue的時候設置持久化,保證Broker持久化Queue的元數據,但是不會持久化Queue里面的消息
2).將Message的deliveryMode設置為2,可以將消息持久化到磁盤,這樣只有Message支持化到磁盤之后才會發送通知Producer ack
這兩步過后,即使Broker掛了,Producer肯定收不到ack的,就可以進行重發
3、Consumer有消費到Message,但是內部出現問題,Message還沒處理,Broker以為Consumer處理完了,只會把后續的消息發送
這時候,就要關閉autoack,消息處理過后,進行手動ack
1.2、一般通過生產端保證可靠性投遞
1、保證消息的成功發出
2、保證MQ節點的成功接收
3、發送端收到MQ節點(Broker)的確認應答
4、完善的消息補償機制
1.3、解決方案
1、消息落庫,對消息狀態進行變更,對於高並發環境下數據庫壓力很大,因為需要寫多次數據庫
整體流程:
1、業務數據和消息都進行落庫
2、生產端發送message給Broker
3、Broker給Confirm響應返回生產端
4、接收到confirm,對message狀態更改
5、分布式定時任務獲取消息的狀態
6、如果消息不能成功投遞,重新進行發送,記錄重發次數
7、當重發3次之后,將狀態修改,只能人工進行干預
2、消息的延遲投遞,做二次確認,回調檢查。適合高並發環境,減少寫庫的次數
整體流程:
1、上游服務首先將業務代碼入庫,發送message給Broker
2、發送第二個延遲確認消息
3、下游服務監聽消息進行消費
4、發送確認消息,這里不是confirm機制,而是一條新的消息
5、通過回調服務監聽這個confirm消息,然后把消息進行入庫
6、回調服務檢查到延遲確認消息,會在數據庫查詢是否有這條消息
7、如果沒有查到這條消息,回調服務通過RPC給一個重新發送命令到上游系統
相比第一種方案,這里減少了一次message入庫,confirm機制是消息可靠性投遞的一個核心,在下篇文章會講到
二、如何保證消息的冪等性
首先,無論是RabbitMQ、RocketMQ還是kafka,都有可能出現消息的重復發送,這個是MQ無法保障的,而冪等性是開發或者運維人員需要保證的
所謂消息的冪等性是指即使收到多次消息,也不會重復消費,這點很重要,例如用戶付錢,點的太快了,付了多次,但是你也只能扣一次錢,
不然要罵人了
2.1、RabbitMQ可能導致出現非冪等性的情況
1、可靠性消息投遞機制:consumer回復confirm出現網絡閃斷,producer沒有收到ack,定時任務輪詢可能就會重新發送消息,這樣consumer就
會收到兩條消息
2、MQ Broker與消費端傳輸消息的過程出現網絡抖動
3、消費端故障或異常
2.2、kafka可能出現非冪等性的情況
在Consumer端offset沒有提交的時候,Consumer重啟了,這時候就會出現重復消費的情況
2.3、解決方案
1、唯一ID+指紋碼
整體實現相對簡單,需要進行數據庫寫入,利用數據庫主鍵去重,使用ID進行分庫分表算法路由,從單庫的冪等性到多庫的冪等性
1).這里唯一ID一般就是業務表的主鍵,比如商品ID
2).指紋碼:每次操作都要生成指紋碼,可以用時間戳+業務編號+...組成,目的是保證每次操作都是正常的
整體流程:
1、需要一個統一ID生成服務,為了保證可靠性,上游服務也要有個本地ID生成服務,然后發送消息給Broker
2、需要ID規則路由組件去監聽消息,先入庫,如果入庫成功,證明沒有重復,然后發給下游,如果發現庫里面有了這條消息,就不發給下游
2、利用Redis的原子性實現
Redis的實現性能比較好,而且Redis一般使用集群,不用擔心某台機器掛掉了,影響服務。
存在的問題:
是否要進行數據落庫,如果落庫的話,怎么保證緩存和storage的一致性、事務,如果不落庫,如何設置定時同步策略