Rocketmq 消費者默認是集群的方式消費的,消費者還可以用廣播的模式進行消費。廣播模式消費就是所有訂閱同一個主題的消費者都會收到消息。代碼實現上其實很簡單,就是在消費端添加
consumer.setMessageModel(MessageModel.BROADCASTING);
就可以了。我們看實驗步驟:
一、啟動ConsumerBroadCastMember1
二、啟動ConsumerBroadCastMember2
三、運行ProducerBraodCast
四、我們可以看到兩個Consumer都收到了同樣的消息。
Producer端:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class ProducerBroadCast { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 設置實例名稱 producer.setInstanceName("producer_broadcast"); // 設置重試次數 producer.setRetryTimesWhenSendFailed(3); // 開啟生產者 producer.start(); // 創建一條消息 Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes()); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
Consumer端:
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10); //設置廣播消費 consumer.setMessageModel(MessageModel.BROADCASTING); //設置集群消費 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱PushTopic下Tag為push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer1 Started."); } } class MqBroadCastListener implements MessageListenerConcurrently{ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt msg = msgs.get(0); String msgBody = new String(msg.getBody(), "utf-8"); System.out.println("msgBody:" + msgBody); } catch(Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10); //設置廣播消費 consumer.setMessageModel(MessageModel.BROADCASTING); //設置集群消費 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱PushTopic下Tag為push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer2 Started."); } }
結果:


https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api
