這邊使用一個producer和兩個consumer是實現負載均衡。
看一下代碼示例
package com.alibaba.rocketmq.example.message.model; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { String groupName = "message_producer"; DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); try { for (int i = 1; i <= 100; i++) { Message msg = new Message("Topic1", "Tag1", ("Hello RoctetMq : " + i).getBytes()); SendResult sendResult = producer.send(msg); //增加一個超時參數,單位為毫秒 // SendResult sendResult = producer.send(msg, 1000); System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
package com.alibaba.rocketmq.example.message.model; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 19:19. * @Description : */ public class Consumer1 { public Consumer1() { try { String groupName = "message_consumer"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3"); //廣播模式下需要先啟動consumer //consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags ); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) { Consumer1 c1 = new Consumer1(); System.out.println("consumer1 is start"); } }
package com.alibaba.rocketmq.example.message.model; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 19:19. * @Description : */ public class Consumer2 { public Consumer2() { try { String groupName = "message_consumer"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3"); //廣播模式下需要先啟動consumer //consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags ); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) { Consumer2 c1 = new Consumer2(); System.out.println("consumer2 is start"); } }
運行一下項目,先啟動兩個consumer,在啟動producer,
查看一下兩個consumer運行結果:
100條消息,推送到不同的consumer進行消費,無需搭建別的東西。
如果需要使用廣播模式,就把 consumer1和consumer2的廣播模式的注釋放開,下面發送10條消息,然后看一下打印的結果
兩個consumer各自收到10條消息,這種就是廣播模式。