RocketMq順序消費


部分內容出處   https://www.jianshu.com/p/453c6e7ff81c

rocketmq內部有4個默認的隊里,在發送消息時,同一組的消息需要按照順序,發送到相應的mq中,同一組消息按照順序進行消費,不同組的消息可以並行的進行消費。

下面看一下producer的代碼:

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 9:20.
 * @Description :
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
        String groupName = "order_producer";
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        producer.start();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateStr = sdf.format(new Date());
        try {
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 0); //0是隊列的下標
                System.out.println(sendResult);
            }
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 1); //1是隊列的下標
                System.out.println(sendResult);
            }
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 2); //2是隊列的下標
                System.out.println(sendResult);
            }
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 3); //3是隊列的下標
                System.out.println(sendResult);
            }
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 4); //4是隊列的下標
                System.out.println(sendResult);
            }
            for (int i = 1; i <= 3; i++) {
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 5); //5是隊列的下標
                System.out.println(sendResult);
            }
        } catch (RemotingException e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

這邊發送多組消息,每組消息的順序分別為1,2,3,

下面查看consumer1,和consumer2,因為要順序消費,需要注意的是,這兩個消費者的監聽器是MessageListenerOrderly,兩個的代碼一樣,我這邊就只展示一個consumer的代碼

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-23.
 * @Time : 9:35.
 * @Description : 順序消息消費
 */
public class Consumer1 {

    public Consumer1() throws Exception {
        String groupName = "order_producer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        /**
         * 設置Consumer第一次啟動是從隊列頭開始消費還是隊列尾開始消費
         * 非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱的主題,以及過濾的標簽內容
        consumer.subscribe("Topic1", "*");
        //注冊監聽
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer1 Started.");
    }

    class Listener implements MessageListenerOrderly {

        private Random random = new Random();

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
            // 設置自動提交
            context.setAutoCommit(true);
            for (MessageExt msg : list) {
                System.out.println(msg + ",context" + new String(msg.getBody()));
            }
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public static void main(String[] args) throws Exception {
        new Consumer1();
    }


}

還是按照先啟動consumer的順序,在啟動producer的順序。

這邊看一下控制台的信息

總共6組消息,broker-a上接收到4組消息,broker-b上接收到2組消息,同一組的3條消息會發送到同一個broker的同一個隊列中,這樣才能保證順序消費,

下面看一下consumer1和consumer2的打印結果

由於順序消費只能單線程,一個線程只能去一個隊列獲取數據。

可以看到,這邊的queueid都是3個 3個打印,不會出現交替,下面看一下一組消息的消費順序,

 

可以看到,消息是按照發送的順序,進行消費,consumer2的打印結果也是類似的,不過consumer2消費了6條消息。

 這樣就實現了rocket的順序消費,雖然實現了順序消費,由於網絡通信,會存在着重復數據的問題,

重復數據的問題,rocket不提供解決方案,讓業務方自行解決,主要有兩個方法:

  1. 消費端處理消息的業務邏輯保持冪等性
  2. 保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現

第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。

第1條解決方案,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條可以消息系統實現,也可以業務端實現。正常情況下出現重復消息的概率其實很小,如果由消息系統來實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。

RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。

下面把consumer修改成多線程的模式,再次查看一下運行的結果:

package com.alibaba.rocketmq.example.message.thread;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-23.
 * @Time : 9:35.
 * @Description : 順序消息消費
 */
public class Consumer {

    public Consumer() throws Exception {
        String groupName = "order_producer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        /**
         * 設置Consumer第一次啟動是從隊列頭開始消費還是隊列尾開始消費
         * 非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //最小線程數
        consumer.setConsumeThreadMin(10);

        //最大線程數
        consumer.setConsumeThreadMax(20);

        //訂閱的主題,以及過濾的標簽內容
        consumer.subscribe("Topic1", "*");
        //注冊監聽
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer Started.");
    }

    class Listener implements MessageListenerConcurrently {

        private Random random = new Random();

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {

            for (MessageExt msg : list) {
                System.out.println(msg + ",context" + new String(msg.getBody()));
            }
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public static void main(String[] args) throws Exception {
        new Consumer();
    }


}

 同樣先啟動consumer,在啟動producer,查看一下打印結果:

從打印結果,可以看出consumer消費並不能保證嚴格的順序,多線程和順序,只能保證其中的一個。producer能保證消息發送的順序,不能保證消息消費的順序,要保證消息消費的順序,consumer端必須實現 MessageListenerOrderly 接口。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM