RocketMQ順序消息發送和消費實例


1.消息順序發送,具體說明在注釋中,參考官網

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @description 消息順序發送示例
 */
public class SortSendMsg {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("order_group");
        producer.setNamesrvAddr("localhost:8080");
        producer.start();
        String[] tags = {"TagA", "TagB", "TagC"};
        //創建訂單列表
        List<OrderStep> orders = new SortSendMsg().buildOrders();
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            //為消息體加個時間戳
            String body = dateStr+"hello RocketMQ"+orders.get(i);
            Message message = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    //根據訂單id選擇對應的queue
                    Long id = (Long)o;//參數o 就是send方法第三個參數傳入的值,我們傳入了orderid,所以代表訂單編號
                    long index = id % list.size();//根據訂單編號進行取模,相同編號的模值是一樣的
                    return list.get((int)index);//獲取對應的隊列並返回進行發送消息
                }
            },orders.get(i).getOrderId());
        }
        //消息發送完了,關閉producer
        producer.shutdown();
    }
    /**
     * 訂單的步驟
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

2.消息順序消費

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description 消息順序消費
 */
public class SortConsumerMsg {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_Group");
        consumer.setNamesrvAddr("localhost:8080");
        //如果第一次啟動從隊列頭開始拉取消息,如果非第一次啟動那么就按照上次消息的offset繼續拉取消息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest","TagA||TagB||TagC");//從哪拉取消息
        consumer.registerMessageListener(new MessageListenerOrderly() {//這個代表單線程處理有序的消息
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                consumeOrderlyContext.setAutoCommit(true);//自動提交事務
                try {
                    for (MessageExt messageExt : list) {
                        //對有序的消息進行業務處理
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }catch (Exception e){
                    //如果消息處理有問題.返回一個狀態,告訴mq暫停一下,稍后在繼續拉消息
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
        consumer.start();//啟動消費者
    }
}

3.使用順序消息的情況

比如訂單系統更新數據庫的操作,如果有另外一個系統比如大數據系統需要同步更新訂單數據到自己的數據庫中,

訂單系統有一筆交易,涉及到用戶插入和更新,insert into order(money,id) values('0','1');update order set money='100' where id='1';

訂單系統推送了這個操作記錄到了兩個messagequeue中,這個時候,兩個消費者可能有一個先處理了更新的操作,結果數據不存在,更新失敗,然后另外一個執行了插入操作,這個時候,兩個數據庫的內容就會出現不一致的情況,對於涉及到金錢的操作,這樣肯定是不行的,所以在發送端,producer,我們根據訂單id%messagequeue的總數取模,這樣相同的id操作就會發送到相同的隊列中,實現了順序發送的目的,依賴的就是send方法中的

MessageQueueSelector()方法;

那么你僅僅是順序發送還不行,對於消息的消費,你還必須保障順序消費,還不能開啟多線程,多線程消費,也會導致消息亂序了;
這個就依賴了消費者中的
MessageListenerOrderly()方法;
這個保障了消息消費只有一個線程進行處理.

4.對於消息重復消費
可能是重復發送,也可能是消費者重復消費,
可以在發送端進行消息冪等,引入redis(不能絕對保證冪等)或者發消息之前先去mq查詢是否存在
可以在消費端進行業務判斷,查詢是否給客戶發送過消息

這里推薦消費端進行業務判斷實現消息冪等;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM