我的RocketMQ架構圖如下:
故障描述: Broker-b所在服務器宕機8小時(1:00-9:00), 重啟Broker-b后, 8小時期間產生的消息被消費者訂閱消費, 由於消費者等冪條件是:2小時內相同消息(msgId相同)不重復發送, 但是此時已經超過兩小時, 故 1:00-7:00 期間產生的消息被重復消費。
期望: Broker-b重啟后, 消費者只訂閱當前時間開始的消息, 之前的消息不再訂閱。
處理方案1:
設置消費者訂閱消息的位置為最新的位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
這里要注意代碼注釋。這個參數只對一個新的consumeGroup第一次啟動時有效。
就是說,如果是一個consumerGroup重啟,他只會從自己上次消費到的offset,繼續消費。這個參數是沒用的。 而判斷是不是一個新的ConsumerGroup是在broker端判斷。
要知道,消費到哪個offset最先是存在Consumer本地的,定時和broker同步自己的消費offset。broker在判斷是不是一個新的consumergroup,就是查broker端有沒有這個consumergroup的offset記錄。
另外,對於一個新的queue,這個參數也是沒用的,都是從0開始消費。
優點: 處理簡單, 修改一行即可。
缺點:需要新的消費者才生效, 故此方法對於broker中已包含了ConsumerGroup的, 不起作用。
處理方案2:
修改broker-b的offset, 和 broker-a的配置保持一致。
方法:
把broker-a的rocketmq路徑:${userPath}/store/config/consumerOffset.json 文件拷貝到broker-b的相同位置, 重啟broker-b
優點: 簡單快捷, 不需要修改程序代碼。
缺點:由於broker-a節點的消息隊列一直在更新,offset偏移量一直在增加, 故復制過來到重啟這段時間差內, 依然有少部分信息會被重復消費, 這只能依賴等冪代碼的處理。
事后補救:
1. 消費者的等冪操作的條件時長設置更長(1天或者3天, 我是用redis存儲msgId, 消息量不大的情況下可以設置更長的時長)
//緩存消息ID 防止消費重復
boolean setResult = RedisUtil.setnx(msgExt.getMsgId(), 60 * 60 * 24 * 1, msgExt.getMsgId().getBytes("utf-8"));
if (!setResult) { // 存儲不成功說明已經存儲過, 直接返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
2. 增加rocketmq宕機告警