MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費
MQ系列7:消息通信,追求極致性能
MQ系列8:數據存儲,消息隊列的高可用保障
MQ系列9:高可用架構分析
1 介紹
我們實際系統中有很多操作,不管你執行多少次,都應該產生一樣的效果或返回一樣的結果。 例如:
- 前端頁面重復提交選中的數據,服務端只產生對應這個數據的一個反應結果,只保存一次數據。
- 我們發起一筆付款請求,也只能扣用戶賬戶一次錢,即使遇到網絡重發或系統bug重發,也應該只扣一次金額。
- 消息通知,也應該只能收到一次,如果收到多次的扣款通知短信,會讓用戶誤解的。
- 創建商品訂單,一次業務請求只能創建一個,創建多個就會變成購買多次,就會出問題。
以上等等很多重要的場景,都需要冪等的特性來支持。
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見於抽象代數中。 在編程中.一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。冪等函數,或冪等方法,是指可以使用相同參數重復執行,並能獲得相同結果的函數。這些函數不會影響系統狀態,也不用擔心重復執行會對系統造成改變。
例如,“getUserSex()和setRight()”函數就是一個冪等函數,包括數據庫中的查詢和刪除也是一樣的道理,它是天然冪等的。總之,冪等就是一個操作,不論執行多少次,產生的效果和返回的結果都是一樣的 。
2 消息隊列中如何保證冪等性
2.1 消息隊列的基本構成
我們先來回顧下 Message Queue的構成,這邊以RocketMQ為例子:
RocketMQ主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分。
- NameServer:Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。NameServer 是整個 RocketMQ 的 "中央大腦 " ,它是 RocketMQ 的服務注冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
- Broker: 消息服務器,作為Server提供消息核心服務, 它接收並存儲Producer生產的消息,也提供消息給Consumer消費。Broker一般會分主從,Master 可讀可寫,Slave 只讀。
- Producer: 消息生產者,消息的發送方,負責生產消息傳輸給broker。RocketMQ提供了發送:同步、異步和單向(one-way)的多種模式。
- Consumer: 消息消費者,消息的處理方,負責從broker獲取消息並進行業務邏輯處理。
另外其他如 Topic、 Message,也是重要的組成部分: - Topic:主題,發布/訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ服務器分發到不同的訂閱者,實現消息的廣播
- Message:消息體,根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務數據,實現消息的傳輸。
2.2 消息隊列的冪等分析
可以看出,消息發送和消息消費兩個步驟是有可能產生消息不冪等的問題。
為保證消息的正確性發送,超時重試、異常重試、消費完成確認機制等能力都是可以使用,並對業務產生影響的。
我們舉個例子,如果你購買一件商品,用戶付款完成之后,通過MQ消息的異步通知,告知下游服務出庫和通知。如果消息通知出現了問題或者下游消息消費出現了問題,導致無法ACK,都有可能導致重復的出庫和通知。
2.2.1 消息生產的冪保證
MQ消息生產部分,就是下圖中的步驟1、步驟2、步驟3:
- 步驟1:消息生產端 MQ-Client Producer 將消息發給服務端MQ-server
- 步驟2:消息隊列服務 MQ-Server 將消息持久化存儲
- 步驟3:息隊列服務 MQ-Server 返回確認信息(ACK \ CONSUME_SUCCESS \ offset)給消息生產端 MQ-Client Producer
如果3 消息確認故障導致消息丟失,則消息生產端 MQ-Client Producer 超時后會重發消息,這時候可能就有重復消息,如何保證冪等呢?
因為消息重發也是MQ-Client Producer發起的,消息的處理是消息隊列的服務MQ-Server處理的,MQ-Server將數據進行了持久化么,這時候我們可以設計一個唯一的 msgId,作為去重的依據,無論重發多少次,msgId都是一樣的,然后在DB數據庫中將這個msgId設置為unique key,不允許重復,他有如下特性:
- 全局唯一,不允許重復
- MQ生成與業務無耦,對消息的生產和消費也是無強相關。
使用這個 msgId,可以保證只有1條消息落地到數據庫中,就保證了消息生產端的冪等。
2.2.2 消息消費的冪保證
MQ消息消費部分,就是下圖中的步驟4、步驟5、步驟6:
- 步驟4:消息隊列服務 MQ-Server 將消息發給給消費端 MQ-Client Consumer
- 步驟5:消費端 MQ-Client Consumer 返回確認信息 (ACK \ CONSUME_SUCCESS \ offset) 給 消息隊列服務
- 步驟6:消息隊列服務 MQ-Server 將持久化的消息數據刪除,根據msgId精確刪除
★ 說明:以上步驟須做一致性保障
這邊重災區就是步驟5,如果因為故障導致消息丟失,消息隊列服務 MQ-Server 在超時后會重發消息,這樣 MQ-Client Producer/Consumer 就會重復收到消息。
因為消息重發是 消息隊列服務 MQ-Server 發起的,MQ-Client Consumer 負責消息消費,消息重發必然會導致業務重復消費(比如重復發消息、重復出庫)。所以一樣的道理,必然使用msgId來做判斷,如果存在庫中就進行消費,然后精確刪除庫中的數據。如果數據庫中不存在,就忽略,避免重復消費。
同樣的,這個msgID的特性如下:
- 全局唯一,不允許重復
- MQ生成與業務無耦,對消息的生產和消費也是無強相關。
- 業務消息消費方 MQ-Client Consumer 負責判重,保證冪等性
這種方式最常見應用在:商品下單、消費支付、帖子點贊和留言等。
2.3 總結說明
無論是何種消息隊列,造成重復消費原因其實都是類似的。正常情況下,消費者在消費消息時候,消費完畢后,會發送一個確認信息給消息隊列,消息隊列就知道該消息被消費了,就會將該消息從消息隊列中刪除。
只是不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,RocketMQ是返回一個CONSUME_SUCCESS成功標志,kafka實際上有個offset的概念,每一個消息都有一個offset,kafka消費過消息后,需要提交offset,讓消息隊列知道自己已經消費過了。
那造成重復消費的原因? 就是因為網絡傳輸等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。
如何解決?這個問題針對業務場景來答分以下幾點
(1)給這個消息做一個唯一主鍵,做數據庫insert,如果出現重復消費情況,會導致主鍵沖突,避免數據庫出現臟數據。
(2)update 和 delete 支持天然冪等性,拿到這個消息做redis的set的操作,那就容易了,不用解決,set操作天然冪等操作。
(3)第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。