之所以想聊一聊這個話題,是因為在剛開始使用rocketmq時,Consumer服務寫的有問題的情況下,消息隊列會重發,這是因為消費失敗會導致消息被放入RETRY重試隊列,根據用戶配置的重試次數(默認16次)進行重試,這部分我們已經在之前的 RocketMQ存儲機制與確認重傳機制一文中討論過,這個情況引起了我探究“什么情況下消息隊里會進行重試,會不會導致重復消費?”這一問題的好奇心。
為什么會出現消息重復的問題?
對於 Producer
我們知道,RocketMQ提供了三種發送消息模式:
1、同步發送
Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。
DefaultMQProducerImpl中設置如果超時沒有響應或者發送失敗,會重發。
2、異步發送
Producer 首先構建一個向 broker 發送消息的任務,把該任務提交給線程池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果。
3、Oneway 發送
Oneway 方式只負責發送請求,不等待應答,Producer 只負責把請求發出去,而不處理響應結果。
上述前兩種發送方式,如果成功發送到消息隊列,但消息隊列返回結果時出現網絡問題,則會導致消息已經發送成功而生產者認為發送失敗,重新發送,導致出現多條重復消息。
對於 Consumer
1、可能因為 Broker 的消息進度丟失,導致消息重復投遞給 Consumer 。
2、Consumer 消費成功,但是因為網絡問題/ JVM 異常崩潰,導致消息消息隊列沒能收到消費成功確認,以為消費失敗,導致重復推送 。
注:對於大多數消息隊列,考慮到性能,消費進度是異步定時同步給 Broker 。
如何解決
消費者實現冪等性,一般是在框架層統一封裝或者業務層自己實現。
框架層統一封裝
首先,需要有一個消息排重的唯一標識,該編號只能由 Producer 生成,例如說使用 uuid、或者其它唯一編號的算法 。
對於RocketMQ來說,Producer 在發送消息時,默認會生成消息編號( msgId ),可見org.apache.rocketmq.common.message.MessageClientExt 類。Broker 在存儲消息時,會生成結合 offset 的消息編號( offsetMsgId ) 。Consumer 在消費消息失敗后,將該消息發回 Broker 后,會產生新的 offsetMsgId 編號,但是 msgId 不變。
然后,就需要有一個排重的存儲器,例如說:
使用關系數據庫,增加一個排重表,使用消息編號作為唯一主鍵。
使用 KV 數據庫,KEY 存儲消息編號,VALUE 任一。此處,暫時不考慮 KV 數據庫持久化的問題。
那么,我們要什么時候插入這條排重記錄呢?
在消息消費執行業務邏輯之前,插入這條排重記錄。但是,此時會有可能 JVM 異常崩潰。那么 JVM 重啟后,這條消息就無法被消費了。因為,已經存在這條排重記錄。
在消息消費執行業務邏輯之后,插入這條排重記錄。
如果業務邏輯執行失敗,顯然,我們不能插入這條排重記錄,因為我們后續要消費重試。
如果業務邏輯執行成功,此時,我們可以插入這條排重記錄。但是,萬一插入這條排重記錄失敗呢?那么,需要讓插入記錄和業務邏輯在同一個事務當中,此時,我們只能使用數據庫。
業務層自己實現
方式很多,這個和 HTTP 請求實現冪等是一樣的邏輯:
先查詢數據庫,判斷數據是否已經被更新過。如果是,則直接返回消費完成,否則執行消費。
在並發量比較高的系統下,我們可以使用redis來進行冪等性判斷。
正常情況下,出現重復消息的概率其實很小,如果由框架層統一封裝來實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務層自己實現處理消息重復的問題。
我的做法
我的項目中主要是針對“任務”提交時進行冪等性判斷,我的做法是在下派任務時把任務id放到redis中,然后再任務提交時從redis中刪除,但如果消費事務執行失敗,redis會重新把任務id添加回來作出補償。從而完成冪等性判斷。