一、為什么出現消息重復
從 Product 看
Rocketmq 提供三種發送消息模式
同步發送:Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。DefaultMQProducerImpl 中如果沒有設置 超時、發送失敗,就會重發。
異步發送:先構建一個broker發送消息的任務,把任務提交給線程池,等執行完任務時,回調用戶自定義的回調函數,執行處理結果。
Oneway發送:只負責發送請求,不等待應答,
注:同步發送、異步發送 如果發送成功,返回結果出現網絡問題,會導致重新發送,多條重復消息。
從 Consumer 看
Broker 消息進度丟失,導致消息重復投遞給 Consumer。
Consumer 消費成功,但是因為網絡問題,JVM 異常崩潰,導致rocketmq沒收到 消費成功確認,會重復推送。
注:從性能考慮,消費進度 用異步定時同步給 Broker。
二、Rocketmq ack 機制保證消息消費成功。
ACK
發送者為保證消息肯定消費成功,需要使用方明確標識消費成功,rocketmq 才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功(會重新投遞)。
public enum Action { /** * 消費成功,繼續消費下一條消息 */ CommitMessage, /** * 消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息 */ ReconsumeLater, }
例:
消費者回執
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RuleForwardListener implements MessageListener { private Logger log = LoggerFactory.getLogger(RuleForwardListener.class); @Override public Action consume(Message message, ConsumeContext context) { try { String msg = new String(message.getBody()); log.info("rrpc response message:{}", msg); return Action.CommitMessage; // 消費成功 } catch (Throwable t) { log.error("rrpc-response error", t); return Action.ReconsumeLater; // 消費失敗 } } }
僅當回執函數返回 CommitMessage時,rocketmq 就會認為此消息消費成功。
返回 ReconsumeLater ,rocketmq 認為消息消費失敗。
為保證消息肯定被至少消費成功一次,rocketmq 會把消息重發回 broker(topic不是原topic 而是消費組的 retry topic),在延遲的某個時間點(默認 10s, 可設置),再次投遞到這個 ConsumerGroup。如果一直這樣重復消費都失敗,默認 16次,就會投遞到 DLQ 死信隊列。應用可以監控死信隊列來做人工干預。
修改重試時間
broker 日志中發現默認重試時間:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以在配置文件中設置:
messageDelayLevel = 10s 1m 5m
(或者在邏輯中,人工干預重復次數。達到次數后,返回 CommitMessage )
三、解決重復消費 - 消費者實現 消費冪等。
根據業務上 唯一 key 對消息做冪等處理。
當出現消費者對某條消息重復消費的情況時,重復消費的結果與消費一次的結果相同,並且多次消費並未對業務系統產生任何負面影響,那么整個過程就可實現消息冪等。
Message 中的 key
Message message = new Message(msgTopic, msgTag, "uuid", messageBody);
SendResult sendResult = producer.send(message);