RocketMQ生產消費模型選擇


一. 生產者,根據某個標識將消息放到同一個隊列中

在發送消息時,使用SelectMessageQueueByHash,該類根據傳入進去的arg,進行hash計算,將消息分配到相應的隊列中。

public class Producer {

    public static void main(String[] args) throws MQClientException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("10.130.41.36:9876");
        producer.setInstanceName("Producer");
        producer.setVipChannelEnabled(false);
        producer.start();

        String[] tags = {"tagA","tagB"};

        for (int i = 1; i <= 10; i++) {
            try {
                Message msg = new Message("TopicTest",tags[i%tags.length],"key1"+i,("訂單一號" + i).getBytes());
                SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),1);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        for (int i = 1; i <= 10; i++) {
            try {
                Message msg = new Message("TopicTest",tags[i%tags.length],"key2"+i,("訂單二號" + i).getBytes());
                SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),2);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        for (int i = 1; i <= 10; i++) {
            try {
                Message msg = new Message("TopicTest",tags[i%tags.length],"key3"+i,("訂單三號" + i).getBytes());
                SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),3);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

  上述代碼執行后Topic隊列中的內容:

 

二. 消費者

(1). 順序消費

使用MessageListenerOrderly,順序消費同一個隊列中的數據,只有第一個數據消費成功了才會消費第二個數據。

模擬在消費某個隊列中的數據時出現了阻塞狀態。

public class ConsumerOrderly {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer1");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                context.setAutoCommit(true);
                try {
                    for (MessageExt msg:msgs) {
                        String msgKey = msg.getKeys();
                        if(msgKey.equals("key13") || msgKey.equals("key22")){
                            Thread.sleep(1000);
                        }
                        System.out.println(" 消費者1 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                //消費成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        /**
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可
         */
        consumer.start();

        System.out.println("C1 Started.");
    }
}

  測試結果如下:

當"訂單一號3"沒有消費時 "訂單一號4","訂單一號5"是不能被消費的,"訂單二號2"也是同樣的情況。

(2). 並發消費

使用MessageListenerConcurrently,並發消費同一個隊列中的數據,不能保證消費的順序。

模擬在消費某個數據時出現了阻塞狀態。

public class ConsumerConcurrently {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer1");
        consumer.setMessageModel(MessageModel.CLUSTERING);

        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg:msgs) {
                        String msgKey = msg.getKeys();
                        if(msgKey.equals("key13") || msgKey.equals("key22")){
                            Thread.sleep(1000);
                        }
                        System.out.println(" 消費者1 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                //消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("C1 Started.");
    }
}

  測試結果如下

當消費"訂單一號3"阻塞時,會將后面的數據交給其他線程消費,所以"訂單一號4" ,"訂單一號5"在 "訂單一號3"之前消費了。

(3). 集群消費

不同消費者設置成相同的組名,在MessageModel.CLUSTERING模式下,不同消費者會消費不同的隊列,同一個消費者中保證順序

消費者1

public class ConsumerOrderly_1 {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer1");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                context.setAutoCommit(true);
                try {
                    for (MessageExt msg:msgs) {
                        String msgKey = msg.getKeys();
                        if(msgKey.equals("key13")){
                            Thread.sleep(1000);
                        }
                        System.out.println(" 消費者1 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                //消費成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        /**
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可
         */
        consumer.start();

        System.out.println("C1 Started.");
    }
}

 

消費者2

public class ConsumerOrderly_2 {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer2");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                context.setAutoCommit(true);
                try {
                    for (MessageExt msg:msgs) {
                        String msgKey = msg.getKeys();
                        if(msgKey.equals("key22")){
                            Thread.sleep(1000);
                        }
                        System.out.println(" 消費者2 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                //消費成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        /**
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可
         */
        consumer.start();

        System.out.println("C2 Started.");
    }
}

測試結果如下:

消費者1負責隊列1,並保證隊列1中的所有消息是按照順序消費的

消費者2負責隊列2和隊列3,根據"訂單二號2"可以看出,他保證了隊列2和隊列3的順序消費。

(4). 消費者A和消費者B同組,消費者A消費tagA,消費者B消費tagB如圖

 在這種情況下,因為集群中訂閱消息不一致,導致消費出現問題,最后啟動的消費者才可以正常消費消息。

要解決這個問題,需要保證集群中的消費者擁有統一的訂閱消息,Topic和Tag要一致才可以。

參考:
https://www.jianshu.com/p/524ef06ce25a
https://mp.weixin.qq.com/s/HbIS0yEJsCPMYwwYDBIvMQ

(5). 消費者A和消費者B不同組,消費者A消費tagA,消費者B消費tagB

在消費者1中,能保證tagA1,tagA2順序的消費,消費者2中能保證tagB1,tagB2順序的消費。
但是不能保證tagA1和tagB1的消費順序。

測試代碼:

消費者1

public class ConsumerOrderly_1 {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer1");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "tagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                context.setAutoCommit(true);
                try {
                    for (MessageExt msg:msgs) {
                        System.out.println(" 消費者1 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                //消費成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        /**
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可
         */
        consumer.start();

        System.out.println("C1 Started.");
    }
}

 

消費者2

public class ConsumerOrderly_2 {
    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName1");
        consumer.setNamesrvAddr("10.130.41.36:9876");
        consumer.setInstanceName("Consumer2");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "tagB");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                context.setAutoCommit(true);
                try {
                    for (MessageExt msg:msgs) {
                        String msgKey = msg.getKeys();
                        if(msgKey.equals("key11")){
                            Thread.sleep(1000);
                        }
                        System.out.println(" 消費者2 ==> 當前線程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                //消費成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        /**
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可
         */
        consumer.start();

        System.out.println("C2 Started.");
    }
}

 

測試結果:

消費者1

消費者2

"訂單一號2" 在 "訂單一號1" 前被消費了。




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM