生產者:
/** * 生產者 */ public class Provider { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { //創建一個生產者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //設置NameServer地址 producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); //設置生產者實例名稱 producer.setInstanceName("provider"); //啟動生產者 producer.start(); //發送消息 for (int i = 1; i <=10 ; i++) { Thread.sleep(1000); //模擬網絡延遲 //創建消息 topic代表主題名稱 tags代表小分類 body代表消息體 Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes()); //發送消息 SendResult send = producer.send(message); System.out.println(send.toString()); } } }
消費者
/** * 消費者:監聽消費 */ public class Consumer { public static void main(String[] args) throws MQClientException { //創建消費者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //設置NameServer地址 consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); //設置實例名稱 consumer.setInstanceName("consumer"); //訂閱Topic consumer.subscribe("weksoft_topic","TagA"); //監聽消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //獲取消息 for(MessageExt ext:msgs){ //RocketMQ由於是集群環境,所以產生的消息ID可能會重復 System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody())); } //接受消息狀態 1.消費成功 2.消費失敗 隊列還有 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動消費者 consumer.start(); } }
生產者生產消息
消費者消費消息
控制台多了入隊和出隊的記錄