RocketMQ學習筆記(15)----RocketMQ的消息模式


在前面學習ActiveMQ時,看到ActiveMQ可以是隊列消息模式,也可以是訂閱發布模式。

  同樣,在RocketMQ中,也存在兩種消息模式,即是集群消費模式和廣播消費模式。

1. 集群消費模式

  跟AciiveMQ一樣,當存在多個消費者時,消息通過一定負載均衡策略,將消息分發到多個consumer中。

  如圖:

  

  在RockeMQ中,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/……的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!

  消息分發也是有多個策略可以配置的,配置方式如下:

  可以使用setAllocateMessageQueueStrategy()方法傳入如下參數實現不同的負載均衡策略,默認AllocateMessageQueueAveragely,輪詢算法策略。

  

  RocketMQ默認的消息模式就是集群模式。

  開啟兩個不同的consumer,控制台打印結果如下:

  consumer1

收到來自topic: MyTopic,的消息:2
收到來自topic: MyTopic,的消息:3
收到來自topic: MyTopic,的消息:4
收到來自topic: MyTopic,的消息:5

  consumer2:

收到來自topic: MyTopic,的消息:0
收到來自topic: MyTopic,的消息:1
收到來自topic: MyTopic,的消息:6
收到來自topic: MyTopic,的消息:7
收到來自topic: MyTopic,的消息:8
收到來自topic: MyTopic,的消息:9

  可以看出消息是被分發給兩個消費者的,可以通過consumer.setMessageModel(MessageModel.CLUSTERING);設置集群消費策略。

2. 廣播模式

  廣播模式跟ActiveMQ的發布訂閱一樣,即是將所有消息分發給Consume Group中每個消費者消費。

  代碼實現如下:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup");

        consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("MyTopic", "*");
        //設置消費模式為廣播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    MessageExt ext = msgs.get(0);
                    String topic = ext.getTopic();
                    String body = new String(ext.getBody(),"utf-8");
                    System.out.println("收到來自topic: " + topic + ",的消息:" + body);
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        consumer.start();
    }
}

  啟動兩個消費者,每個消費者都能接收到所有消息,控制台打印如下:

收到來自topic: MyTopic,的消息:0
收到來自topic: MyTopic,的消息:1
收到來自topic: MyTopic,的消息:2
收到來自topic: MyTopic,的消息:3
收到來自topic: MyTopic,的消息:4
收到來自topic: MyTopic,的消息:5
收到來自topic: MyTopic,的消息:6
收到來自topic: MyTopic,的消息:7
收到來自topic: MyTopic,的消息:8
收到來自topic: MyTopic,的消息:9

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM