第一步:導入依賴
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency>
第二步:創建生產者
package com.wish; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException { //創建一個消息的生產者 // producerGroup:一般發送同樣消息的Producer,歸為同一個Group,應用必須設置,並保證命名唯一 DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); //設置名稱srv地址 producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876"); //實例名稱 producer.setInstanceName("producer"); //啟動 producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); // 每秒發送一次MQ Message msg = new Message("itmayiedu-topic", // topic 主題名稱 "TagA", // tag 臨時值 ("itmayiedu-"+i).getBytes()// body 內容 ); //send()發送 SendResult sendResult = producer.send(msg); //SendResult:發送消息結果 System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } //關掉 producer.shutdown(); } }
第三步:創建消費者
package com.wish; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { //創建一個消費者 //consumerGroup:做同樣事情的Consumer歸為同一個Group,應用必須設置,並保證命名唯一 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); //設置名稱srv地址 consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876"); //實例名稱 consumer.setInstanceName("consumer"); //實現訂閱 consumer.subscribe("itmayiedu-topic", "TagA"); //注冊消息監聽器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動消費者 consumer.start(); System.out.println("Consumer Started."); } }
第四步:分別啟動消費者和生產者,查看瀏覽器