RocketMQ解決冪等性問題


一.造成重復消費的原因

在於回饋機制。正常情況下,消費者在消費消息時候,消費完畢后,會發送一個ACK確認信息給消息隊列(broker),消息隊列(broker)就知道該消息被消費了,就會將該消息從消息隊列中刪除。

不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,RocketMQ是返回一個CONSUME_SUCCESS成功標志,kafka實際上有個offset的概念。

  造成重復消費的原因?,就是因為網絡原因閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。(因為消息重試等機制的原因,如果一個consumer斷了,rocketmq有consumer集群,會將該消息重新發給其他consumer)

 

這個問題針對業務場景來答,分以下三種情況:

(1)比如,你拿到這個消息做數據庫的insert操作,那就容易了,給這個消息做一個唯一的主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。

(2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。

(3)如果上面兩種情況還不行,上大招。准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis.那消費者開始消費前,先去redis中查詢有沒有消費記錄即可。

 

二.單機環境解決方案

生產者:發送消息同時set一個key做唯一標識

public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 1; i++) {
                Thread.sleep(1000); // 每秒發送一次MQ
                Message msg = new Message("itmayiedu-topic", // topic 主題名稱
                        "TagA", // tag 臨時值
                        ("itmayiedu-6" + i).getBytes()// body 內容
                );
                //setKey,做唯一標識
                msg.setKeys(System.currentTimeMillis() + "");

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

消費者:

//保存標識的集合
    static private Map<String, String> logMap = new HashMap<>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

        consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("itmayiedu-topic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                String key = null;
                String msgId = null;
                try {
                    for (MessageExt msg : msgs) {
                        key = msg.getKeys();
                        //判斷集合當中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費消息
                        if (logMap.containsKey(key)) {
                            // 無需繼續重試。
                            System.out.println("key:"+key+",無需重試...");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        msgId = msg.getMsgId();
                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                        //模擬異常
                        int i = 1 / 0;
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                    //重試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } finally {
                    //保存key
                    logMap.put(key, msgId);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

執行效果:

 

 

 

三.集群環境解決方案

    在生產者端要保證冪等性的話,大概可以使用以下兩種方式:
    ① RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經發送過該條消息就可以了,不存在則發送,存在則不發送
    ② 引入redis,在發送消息到RocketMQ成功之后,向redis中插入一條數據,如果發生重試,則先去redis中查詢一下該條消息是否已經發送過了,存在的話就不重復發送消息了
    生產者的這兩種冪等性方案都可以實現,但是都存在一定的缺陷
    方案①,RocketMQ消息查詢的性能不是特別好,如果是在高並發的場景下,每條消息在發送到RocketMQ時都去查詢一下,可能會影響接口的性能
    方案②,在一些極端的場景下,redis也無法保證消息發送成功之后,就一定能寫入redis成功,比如寫入消息成功而redis此時宕機,那么再次查詢redis判斷消息是否已經發送過,是無法得到正確結果的

既然在消費者做冪等性的方案都不是特別靠譜,那就再在消費者端來做吧
消息的消費,最后都對應的是數據庫的操作,只要在消息消費的時候,判斷一下數據庫中是否已經消費過了這條消息,就可以保證冪等性了,例如使用唯一索引,保證一條消息只被消費一次。

 

 

 參考:https://blog.csdn.net/LO_YUN/article/details/104135197

 去重原則:1.冪等性 2.業務去重

  冪等性:(處理必須唯一) 無論這個業務請求被(consumer)執行多少次,我們的數據庫的結果都是唯一的,不可變的。

  去重策略:去重表機制,業務拼接去重策略(比如唯一流水號)

  1.建立一個消息表,拿到這個消息做數據庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現重復消費的情況,就會導致主鍵沖突。

    高並發下去重:采用Redis去重(key天然支持原子性並要求不可重復),但是由於不在一個事務,要求有適當的補償策略,但是對於很重要的業務,不應該支持補償

  2.利用redis事務,主鍵(我們必須把全量的操作數據都存放在redis里,然后定時去和數據庫做數據同步)—-即消費處理后,該處理本來應該保存在數據庫的,先保存在redis,再通過一定業務方式從redis中取數據進行db持久化

  3.利用redis和關系型數據庫一起做去重機制

  4.拿到這個消息做redis的set的操作.redis就是天然冪等性 

  5.准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將 < id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。

 

消息重復消費是一個非常常見的問題,在很多系統調用頻繁的場景下,都可能會出現超時重試的情況,還有就是系統頻繁迭代,經常重啟系統更新的場景,也會出現消息重復消費
生產者端發送重復的消息到RocketMQ中其實問題不大,消息只是在RocketMQ中重復了,並沒有影響到系統的數據,我們只需要在最后修改數據庫的時候,保證好冪等性即可


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM