RocketMQ-廣播模式消費


Rocketmq 消費者默認是集群的方式消費的,消費者還可以用廣播的模式進行消費。廣播模式消費就是所有訂閱同一個主題的消費者都會收到消息。代碼實現上其實很簡單,就是在消費端添加

consumer.setMessageModel(MessageModel.BROADCASTING);

就可以了。我們看實驗步驟:

一、啟動ConsumerBroadCastMember1

二、啟動ConsumerBroadCastMember2

三、運行ProducerBraodCast

四、我們可以看到兩個Consumer都收到了同樣的消息。

Producer端:

package org.hope.lee.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class ProducerBroadCast {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            // 設置實例名稱
            producer.setInstanceName("producer_broadcast");
            // 設置重試次數
            producer.setRetryTimesWhenSendFailed(3);
            // 開啟生產者
            producer.start();
            // 創建一條消息
            Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
            SendResult send = producer.send(msg);
            System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
            
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
        producer.shutdown();
    }
}

Consumer端:

package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class ConsumerBroadCastMember1 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        // 批量消費,每次拉取10條
        consumer.setConsumeMessageBatchMaxSize(10);
        //設置廣播消費
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //設置集群消費
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 如果非第一次啟動,那么按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱PushTopic下Tag為push的消息
        consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
        consumer.registerMessageListener(new MqBroadCastListener());
        consumer.start();
        System.out.println("Consumer1 Started.");

    }
}
class MqBroadCastListener implements MessageListenerConcurrently{
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            MessageExt msg = msgs.get(0);
            String msgBody = new String(msg.getBody(), "utf-8");
            System.out.println("msgBody:" + msgBody);
        } catch(Exception e) {
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    
}
package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class ConsumerBroadCastMember2 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        // 批量消費,每次拉取10條
        consumer.setConsumeMessageBatchMaxSize(10);
        //設置廣播消費
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //設置集群消費
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 如果非第一次啟動,那么按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱PushTopic下Tag為push的消息
        consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
        consumer.registerMessageListener(new MqBroadCastListener());
        consumer.start();
        System.out.println("Consumer2 Started.");

    }
}

 

結果:

 

 

https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM