RocketMQ的broker節點掛掉后重啟導致的消息重復消費問題解決方案


我的RocketMQ架構圖如下:

 

 

 

故障描述: Broker-b所在服務器宕機8小時(1:00-9:00), 重啟Broker-b后, 8小時期間產生的消息被消費者訂閱消費, 由於消費者等冪條件是:2小時內相同消息(msgId相同)不重復發送,  但是此時已經超過兩小時, 故 1:00-7:00 期間產生的消息被重復消費。

期望: Broker-b重啟后, 消費者只訂閱當前時間開始的消息, 之前的消息不再訂閱。

 

處理方案1:

設置消費者訂閱消息的位置為最新的位置

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

這里要注意代碼注釋。這個參數只對一個新的consumeGroup第一次啟動時有效。
就是說,如果是一個consumerGroup重啟,他只會從自己上次消費到的offset,繼續消費。這個參數是沒用的。 而判斷是不是一個新的ConsumerGroup是在broker端判斷。
要知道,消費到哪個offset最先是存在Consumer本地的,定時和broker同步自己的消費offset。broker在判斷是不是一個新的consumergroup,就是查broker端有沒有這個consumergroup的offset記錄。

另外,對於一個新的queue,這個參數也是沒用的,都是從0開始消費。

優點: 處理簡單, 修改一行即可。

缺點:需要新的消費者才生效, 故此方法對於broker中已包含了ConsumerGroup的, 不起作用。

 

處理方案2:

修改broker-b的offset, 和 broker-a的配置保持一致。

方法:

把broker-a的rocketmq路徑:${userPath}/store/config/consumerOffset.json 文件拷貝到broker-b的相同位置, 重啟broker-b

 

優點: 簡單快捷, 不需要修改程序代碼。

缺點:由於broker-a節點的消息隊列一直在更新,offset偏移量一直在增加, 故復制過來到重啟這段時間差內, 依然有少部分信息會被重復消費, 這只能依賴等冪代碼的處理。

 

事后補救:

 1. 消費者的等冪操作的條件時長設置更長(1天或者3天, 我是用redis存儲msgId, 消息量不大的情況下可以設置更長的時長)

//緩存消息ID 防止消費重復
boolean setResult = RedisUtil.setnx(msgExt.getMsgId(), 60 * 60 * 24 * 1, msgExt.getMsgId().getBytes("utf-8"));
if (!setResult) { // 存儲不成功說明已經存儲過, 直接返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }

2. 增加rocketmq宕機告警

總結:
1. 程序等冪處理依然是關鍵, 把防重復寄望於MQ是不可靠的, 且MQ本來就是設計高並發, 這是它的優點, 所以還是在程序等冪處理上下功夫, 防止此類不可預知的問題導致數據重復。
2. 監控很重要, 及時發現問題, 及時處理。
3. 搞清楚原理, 減少誤操作。
 
 
 
 
參考鏈接:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM