在前面學習ActiveMQ時,看到ActiveMQ可以是隊列消息模式,也可以是訂閱發布模式。
同樣,在RocketMQ中,也存在兩種消息模式,即是集群消費模式和廣播消費模式。
1. 集群消費模式
跟AciiveMQ一樣,當存在多個消費者時,消息通過一定負載均衡策略,將消息分發到多個consumer中。
如圖:
在RockeMQ中,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/……的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!
消息分發也是有多個策略可以配置的,配置方式如下:
可以使用setAllocateMessageQueueStrategy()方法傳入如下參數實現不同的負載均衡策略,默認AllocateMessageQueueAveragely,輪詢算法策略。
RocketMQ默認的消息模式就是集群模式。
開啟兩個不同的consumer,控制台打印結果如下:
consumer1
收到來自topic: MyTopic,的消息:2 收到來自topic: MyTopic,的消息:3 收到來自topic: MyTopic,的消息:4 收到來自topic: MyTopic,的消息:5
consumer2:
收到來自topic: MyTopic,的消息:0 收到來自topic: MyTopic,的消息:1 收到來自topic: MyTopic,的消息:6 收到來自topic: MyTopic,的消息:7 收到來自topic: MyTopic,的消息:8 收到來自topic: MyTopic,的消息:9
可以看出消息是被分發給兩個消費者的,可以通過consumer.setMessageModel(MessageModel.CLUSTERING);設置集群消費策略。
2. 廣播模式
廣播模式跟ActiveMQ的發布訂閱一樣,即是將所有消息分發給Consume Group中每個消費者消費。
代碼實現如下:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup"); consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("MyTopic", "*"); //設置消費模式為廣播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt ext = msgs.get(0); String topic = ext.getTopic(); String body = new String(ext.getBody(),"utf-8"); System.out.println("收到來自topic: " + topic + ",的消息:" + body); } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
啟動兩個消費者,每個消費者都能接收到所有消息,控制台打印如下:
收到來自topic: MyTopic,的消息:0 收到來自topic: MyTopic,的消息:1 收到來自topic: MyTopic,的消息:2 收到來自topic: MyTopic,的消息:3 收到來自topic: MyTopic,的消息:4 收到來自topic: MyTopic,的消息:5 收到來自topic: MyTopic,的消息:6 收到來自topic: MyTopic,的消息:7 收到來自topic: MyTopic,的消息:8 收到來自topic: MyTopic,的消息:9