Rocketmq 重復消息,冪等


一、為什么出現消息重復

從 Product 看

Rocketmq 提供三種發送消息模式

同步發送:Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。DefaultMQProducerImpl 中如果沒有設置 超時、發送失敗,就會重發。

異步發送:先構建一個broker發送消息的任務,把任務提交給線程池,等執行完任務時,回調用戶自定義的回調函數,執行處理結果。

Oneway發送:只負責發送請求,不等待應答,

 

注:同步發送、異步發送 如果發送成功,返回結果出現網絡問題,會導致重新發送,多條重復消息。

從 Consumer 看

Broker 消息進度丟失,導致消息重復投遞給 Consumer。

Consumer 消費成功,但是因為網絡問題,JVM 異常崩潰,導致rocketmq沒收到 消費成功確認,會重復推送。

注:從性能考慮,消費進度 用異步定時同步給 Broker。

 

 

 

二、Rocketmq ack 機制保證消息消費成功。

ACK

發送者為保證消息肯定消費成功,需要使用方明確標識消費成功,rocketmq 才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功(會重新投遞)。

public enum Action {
    /**
     * 消費成功,繼續消費下一條消息
     */
    CommitMessage,
    /**
     * 消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息
     */
    ReconsumeLater,
}

例:

消費者回執

 
         
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RuleForwardListener implements MessageListener {
    private Logger log = LoggerFactory.getLogger(RuleForwardListener.class);

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            String msg = new String(message.getBody());
            log.info("rrpc response message:{}", msg);
            return Action.CommitMessage;  // 消費成功
        } catch (Throwable t) {
            log.error("rrpc-response error", t);
            return Action.ReconsumeLater;  // 消費失敗
        }
    }
}

 

僅當回執函數返回 CommitMessage時,rocketmq 就會認為此消息消費成功。

返回 ReconsumeLater ,rocketmq 認為消息消費失敗。

 

為保證消息肯定被至少消費成功一次,rocketmq 會把消息重發回 broker(topic不是原topic 而是消費組的 retry topic),在延遲的某個時間點(默認 10s, 可設置),再次投遞到這個 ConsumerGroup。如果一直這樣重復消費都失敗,默認 16次,就會投遞到 DLQ 死信隊列。應用可以監控死信隊列來做人工干預。

 

修改重試時間

broker 日志中發現默認重試時間:

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

 

可以在配置文件中設置:

messageDelayLevel = 10s 1m 5m

(或者在邏輯中,人工干預重復次數。達到次數后,返回 CommitMessage )

 

三、解決重復消費 - 消費者實現 消費冪等。

根據業務上 唯一 key 對消息做冪等處理。

當出現消費者對某條消息重復消費的情況時,重復消費的結果與消費一次的結果相同,並且多次消費並未對業務系統產生任何負面影響,那么整個過程就可實現消息冪等

Message 中的 key

Message message = new Message(msgTopic, msgTag, "uuid", messageBody);
SendResult sendResult = producer.send(message);

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM