RocketMQ消息至少一次(At least Once)投遞和消費


至少一次(At least Once)指每個消息必須投遞一次。Consumer先Pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,所以RocketMQ可以很好的支持此特性。

生產者

在同步非順序投遞的時候,每次都是輪詢到不同的隊列:

        Message message = new Message("topic_family", ("  同步發送  ").getBytes());
        SendResult sendResult = producer.getProducer().send(message);

結果:

Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C3BBE0000, offsetMsgId=C0A80A0B00002A9F00000000007B4A5C, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11014]
Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C6A3D0002, offsetMsgId=C0A80A0B00002A9F00000000007B4BB0, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=3], queueOffset=11012]
Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CAEE20004, offsetMsgId=C0A80A0B00002A9F00000000007B4D04, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11004]
Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CEA6D0006, offsetMsgId=C0A80A0B00002A9F00000000007B4E58, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11005]

異步

寫入補償log,進行重發
@GetMapping("/async")
    private void async() throws Exception {
        //創建消息
        Message message = null;
        for (int i=0;i<100;i++){
            if (i==90) {
                new RuntimeException("");
            }
            message = new Message("topic_family", ("異步發送:" + i).getBytes());
            System.out.println("異步發送:"+ i);

            //異步發送消息
            producer.getProducer().send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    //log.info("Product-異步發送-輸出信息={}", sendResult);
                    System.out.println("Product-異步發送-輸出信息={}" + sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    //e.printStackTrace();
                    System.out.println("Product-異步發送-異常" + e.getMessage());
                    //寫入補償log,進行重發
                }
            });
        }
    }

重發帶來的重復消息問題-上半場冪等

1,發送端MQ-client將消息發給服務端MQ-server
2,服務端MQ-server將消息落地
3,服務端MQ-server回ACK給發送端MQ-client
如果3丟失,發送端MQ-client超時后會重發消息,可能導致服務端MQ-server收到重復消息。
此時重發是MQ-client發起的,消息的處理是MQ-server,為了避免步驟2落地重復的消息,對每條消息,MQ系統內部必須生成一個inner-msg-id,作為去重和冪等的依據,這個內部消息ID的特性是:
(1)全局唯一
(2)MQ生成,具備業務無關性,對消息發送方和消息接收方屏蔽
有了這個inner-msg-id,就能保證上半場重發,也只有1條消息落到MQ-server的DB中,實現上半場冪等。
以上的措施實施比較麻煩,實際上可以通過key來記錄重復的業務記錄,比如訂單id。
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProduceOnce {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("192.168.10.11:9876");

        producer.start();
        Message sendMessage = new Message(
                JmsConfig.TOPIC,
                "訂單001".getBytes());

        sendMessage.setKeys("OD0000000001");//模擬同一個ID
        SendResult sendResult1 = producer.send(sendMessage);
        SendResult sendResult2 = producer.send(sendMessage);
        System.out.println("Product1-同步發送-Product信息={}" + sendResult1);
        System.out.println("Product2-同步發送-Product信息={}" + sendResult2);
        producer.shutdown();
    }
    }

結果:

Product1-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE870, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=0], queueOffset=11258]
Product2-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE926, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11262]

 

消費端

在非去重對的消費情況下,會產生重復消費
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerOnce {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 設置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

結果,消費了2次

ConsumeMessageThread_1 Receive New Messages: [properties={MIN_OFFSET=0, MAX_OFFSET=11260, KEYS=OD0000000001, CONSUME_START_TIME=1591515973997, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages:  properties={MIN_OFFSET=0, MAX_OFFSET=11264, KEYS=OD0000000001, CONSUME_START_TIME=1591515974001, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 

 

業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批消息(默認是1條)是消費完成的。(具體如何ACK見后面章節)

如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。

為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。

注:

  1. 如果業務的回調沒有處理好而拋出異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。
  2. 當使用順序消費的回調MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有RECONSUME_LATER的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停隊列的其余消費,直到原消息不斷重試成功為止才能繼續消費。

我們可以使用db的唯一鍵,或者緩存的唯一Id來記錄需要消費一次的id。

RocketMQ無法避免消息重復(Exactly-Once),所以如果業務對消費重復非常敏感,務必要在業務層面進行去重處理。可以借助關系數據庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內容中的唯一標識字段,例如訂單Id等。在消費之前判斷唯一鍵是否在關系數據庫中存在。如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)

msgId一定是全局唯一標識符,但是實際使用中,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發、因客戶端重投機制導致的重復等),這種情況就需要使業務字段進行重復消費。

每個消息在業務層面的唯一標識碼要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic、key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。

   // 訂單Id   
   String orderId = "20034568923546";   
   message.setKeys(orderId);   

模擬代碼:

package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.ArrayList;
import java.util.List;

public class ConsumerOnce {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        List<String> redisKeyList = new ArrayList<String>();
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 設置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    // 默認msgs只有一條消息
                    String repeatID = "";
                    for (MessageExt msg : msgs) {
                        String key = msg.getKeys();
                        return noRepeat(key);
                        //return noRepeatConsume(repeatID,key);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            private ConsumeConcurrentlyStatus noRepeat(String key) {
                for (String item : redisKeyList){
                    System.out.println("Redis 緩存中的keys:" + item);
                }
               if (!redisKeyList.contains(key)) {
                   redisKeyList.add(key);
                   System.out.println("Redis redisKeyList.size():" + redisKeyList.size());
                   System.out.println("Redis 緩存插入:" + key);
                   System.out.printf("%s Receive Messages: %s %n", Thread.currentThread().getName(), key);
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
               else{
                   System.out.printf("%s 重復Messages: %s %n", Thread.currentThread().getName(), key);
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
            }

        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

對於並發的消費監聽器,你可以返回 RECONSUME_LATER 來通知消費者現在不能消費這條消息,並且希望可以稍后重新消費它。然后,你可以繼續消費其他消息。對於有序的消息監聽器,因為你關心它的順序,所以不能跳過消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。

結果沒有重復消費了

Redis redisKeyList.size():1
Redis 緩存插入:OD0000000001
ConsumeMessageThread_2 Receive Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_1 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_3 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_4 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_5 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_6 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_7 重復Messages: OD0000000001 
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_8 重復Messages: OD0000000001 

結果是不斷地重復嘗試消費,該怎么處理?可以刪除重復的記錄。

消費端常見的冪等操作總結
  1. 業務操作之前進行狀態查詢 消費端開始執行業務操作時,通過冪等id首先進行業務狀態的查詢,如:修改訂單狀態環節,當訂單狀態為成功/失敗則不需要再進行處理。那么我們只需要在消費邏輯執行之前通過訂單號進行訂單狀態查詢,一旦獲取到確定的訂單狀態則對消息進行提交,通知broker消息狀態為:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。
  2. 業務操作前進行數據的檢索 邏輯和第一點相似,即消費之前進行數據的檢索,如果能夠通過業務唯一id查詢到對應的數據則不需要進行再后續的業務邏輯。如:下單環節中,在消費者執行異步下單之前首先通過訂單號查詢訂單是否已經存在,這里可以查庫也可以查緩存。如果存在則直接返回消費成功,否則進行下單操作。
  3. 唯一性約束保證最后一道防線 上述第二點操作並不能保證一定不出現重復的數據,如:並發插入的場景下,如果沒有樂觀鎖、分布式鎖作為保證的前提下,很有可能出現數據的重復插入操作,因此我們務必要對冪等id添加唯一性索引,這樣就能夠保證在並發場景下也能保證數據的唯一性。
  4. 引入鎖機制 上述的第一點中,如果是並發更新的情況,沒有使用悲觀鎖、樂觀鎖、分布式鎖等機制的前提下,進行更新,很可能會出現多次更新導致狀態的不准確。如:對訂單狀態的更新,業務要求訂單只能從初始化->處理中,處理中->成功,處理中->失敗,不允許跨狀態更新。如果沒有鎖機制,很可能會將初始化的訂單更新為成功,成功訂單更新為失敗等異常的情況。 高並發下,建議通過狀態機的方式定義好業務狀態的變遷,通過樂觀鎖、分布式鎖機制保證多次更新的結果是確定的,悲觀鎖在並發環境不利於業務吞吐量的提高因此不建議使用。
  5. 消息記錄表 這種方案和業務層做的冪等操作類似,由於我們的消息id是唯一的,可以借助該id進行消息的去重操作,間接實現消費的冪等。

首先准備一個消息記錄表,在消費成功的同時插入一條已經處理成功的消息id記錄到該表中,注意一定要 與業務操作處於同一個事物 中,當新的消息到達的時候,根據新消息的id在該表中查詢是否已經存在該id,如果存在則表明消息已經被消費過,那么丟棄該消息不再進行業務操作即可。

肯定還有更多的場景我沒有涉及到,這里說到的操作均是互相之間有關聯的,將他們配合使用更能夠保證消費業務的冪等性。

不論怎樣,請牢記:緩存是不可靠的,在享受異步化、削峰、消息堆積等的好處之外,增加了業務復雜性,需要謹慎處理冪等操作 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM