RocketMQ-順序消費


  看了https://www.jianshu.com/p/453c6e7ff81c這篇博客,得出順序消費的結論。“要實現嚴格的順序消息,簡單且可行的辦法就是:保證生產者 - MQServer - 消費者是一對一的關系”。

我們下面通過幾個實例來學習RocketMQ的順序消費。

一、單節點,也就是一個Producer一個Consumer。

操作步驟:

  1、先啟動ConsumerQueue1

  2、再啟動ProducerQueue

Producer端:

package org.hope.lee.consumer.queue;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class ProducerQueue {
    public static void main(String[] args) {
        String group_name = "order_producer";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            producer.start();
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            /**
             * 第一個隊列
             * 通過  public SendResult send(Message msg, MessageQueueSelector selector, Object arg)來指定發送消息到哪個隊列
             */
            for(int i = 1; i <= 5; i++) {
                String body = dateStr + "body_1_" + i;
                Message message = new Message("TopicTest", "order1", "KEY" + i, body.getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 0); //0是隊列的下標
                System.out.println(sendResult + ", body:" + body);
            }

            producer.shutdown();

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }
    }
}

Consumer端:

package org.hope.lee.producer.queue;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ConsumerQueue1 {

    public ConsumerQueue1() throws Exception {
        String group_name = "order_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱的主題, 以及過濾的標簽內容
        consumer.subscribe("TopicTest", "*");
        //注冊監聽
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer Started.....");
    }

    /**
     * 這里實現MessageListenerOrderLy接口就是為了達到順序消費的目的,
     * 如果是使用MessageListenerConcurrently,則需要把線程池改為單線程模式。
     * 但是也不能保證說一定會順序消費,因為如果master宕機了,導致寫入隊列的數量上
     * 出現變化。
     *
     * 從消費端,如果想保證這批消息是M1消費完成再消費M2的話,可以使用MessageListenerOrderly接口,但是這樣的話會有以下問題:
     * 1. 遇到消息失敗的消息,無法跳過,當前隊列消費暫停
     * 2. 目前版本的RocketMQ的MessageListenerOrderly是不能從slave消費消息的。
     */
    class Listener implements MessageListenerOrderly {
        private Random random = new Random();

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for(MessageExt msg : list) {
                System.out.println(msg + ", content:" + new String(msg.getBody()));
            }
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5)); //隨機休眠時間,模擬業務處理時間

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
    public static void main(String[] args) throws Exception {
        ConsumerQueue1 c = new ConsumerQueue1();

    }
}

Consuerm端輸出結果:(橫向拖到最后看藍色字體)

Consumer Started.....
MessageExt [queueId=0, storeSize=159, queueOffset=0, sysFlag=0, bornTimestamp=1515420522468, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551480, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F00000000000001EF, commitLogOffset=495, bodyCRC=829956747, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY1, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_1
MessageExt [queueId=0, storeSize=159, queueOffset=1, sysFlag=0, bornTimestamp=1515420522668, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551683, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000028E, commitLogOffset=654, bodyCRC=678523697, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY2, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_2
MessageExt [queueId=0, storeSize=159, queueOffset=2, sysFlag=0, bornTimestamp=1515420522781, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551716, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000032D, commitLogOffset=813, bodyCRC=1601586087, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY3, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_3
MessageExt [queueId=0, storeSize=159, queueOffset=3, sysFlag=0, bornTimestamp=1515420522792, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551753, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F00000000000003CC, commitLogOffset=972, bodyCRC=1091753476, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY4, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_4
MessageExt [queueId=0, storeSize=159, queueOffset=4, sysFlag=0, bornTimestamp=1515420522833, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551768, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000046B, commitLogOffset=1131, bodyCRC=907404946, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY5, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_5

一、Consumer端集群消費的順序消費,也就是一個Producer多個Consumer。

 步驟一、我們先改造一下ProducerQueue,在第一個隊列下面再加兩個隊列

/**
 * 第二個隊列
 */
for(int i = 1; i <= 5; i++) {
    //時間戳
    String body = dateStr + "order_2" + i;
    Message message = new Message("TopicTest", "body_2_", "KEY" + i, body.getBytes());
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
            Integer id = (Integer) o;
            return list.get(id);
        }
    }, 1); //1是隊列的下標
    System.out.println(sendResult + ", body:" + body);
}
/**
 * 第三個隊列
 */
for(int i = 1; i <= 5; i++) {
    //時間戳
    String body = dateStr + "order_3" + i;
    Message message = new Message("TopicTest", "body_3_", "KEY" + i, body.getBytes());
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
            Integer id = (Integer) o;
            return list.get(id);
        }
    }, 2); //2是隊列的下標
    System.out.println(sendResult + ", body:" + body);
}

步驟二、再創建一個消費端ConsumerQueue2,代碼跟ConsumerQueue1一樣,這里就不重復了。

步驟三、啟動ConsumerQueue1和ConsumerQueue3

步驟四、啟動ProducerQueue

結果:

 

從結果中看到,兩個消費端都是按照隊列順序消費的,並且負載均衡,ConsumerQueue1消費了第三個隊列,ConsumerQueue2消費了第一個隊列和第二個隊列。

需要注意的一點是,對於順序消費,我們是不能再Consumer端再使用多線程去消費的。這樣就破壞了順序消費的生態環境。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM