RocketMQ順序消息


消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以並行消費的。RocketMQ可以嚴格的保證消息有序。

順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。

  • 全局順序 對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。 適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
  • 分區順序 對於指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。 適用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。

RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。要全局順序只能一個分區。

非順序的原因,是因為發送消息的時候,消息發送默認是會采用輪詢的方式發送到不通的queue(分區)。而消費端消費的時候,是會分配到多個queue的,多個queue是同時拉取提交消費。如果是輪詢的生產者,加上同時的多線程讀取,自然不能保證順序。

如果把一個分區順序投放在同一個分區里面,RocketMQ的確是能保證FIFO的。那么要做到順序消息,應該怎么實現呢——把消息確保投遞到同一條queue。

比如一個訂單,根據同一個id,把創建-扣款(買家)-付款(給商家)的三條消息,按順序推入一個隊列,那消費的時候也是順序執行的。

關鍵代碼,按訂單ID放入同一分區:

producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根據訂單id選擇發送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//訂單id

 

完整的生產者

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * Producer,發送順序消息
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("192.168.10.11:9876");

        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};

        // 訂單列表
        List<OrderStep> orderList = new Producer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            // 加個時間前綴
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根據訂單id選擇發送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//訂單id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /**
     * 訂單的步驟
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模擬訂單數據
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

結果把同一個id放在一個分區了

SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='創建'}
SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='創建'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='創建'}
SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'}
SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'}
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}

消費者

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)
 */
public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.10.11:9876");
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    //模擬業務邏輯處理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

消費也是分區的順序消費

consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='創建'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='創建'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'}
consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='創建'}
consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'}
consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}

 消費者將鎖定每個消息隊列,以確保他們被逐個消費,雖然這將會導致性能下降,但是當你關心消息順序的時候會很有用。我們不建議拋出異常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作為替代。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM