一、目錄展示

二、導入依賴
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency>
三、提供者
package com.zn.tests; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * 生產者 */ public class Provider { public static void main(String[] args) throws MQClientException { //創建一個生產者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //設置NameServer地址 producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //設置生產者實例名稱 producer.setInstanceName("producer"); //啟動生產者 producer.start(); try { //發送消息 for (int i=1;i<=10;i++){ //模擬網絡延遲,每秒發送一次MQ Thread.sleep(1000); //創建消息,topic主題名稱 tags臨時值代表小分類, body代表消息體 Message message=new Message("itmayiedu-topic","TagA",("itmayiedu-"+i).getBytes()); //發送消息 SendResult sendResult=producer.send(message); System.out.println("來了來了:"+sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
四、消費者
package com.zn.tests; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { //創建消費者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //設置NameServer地址 consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //設置實例名稱 consumer.setInstanceName("consumer"); //訂閱topic consumer.subscribe("itmayiedu-topic","TagA"); //監聽消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //獲取消息 for (MessageExt messageExt:list){ //RocketMQ由於是集群環境,所有產生的消息ID可能會重復 System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody())); } //接受消息狀態 1.消費成功 2.消費失敗 隊列還有 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動消費者 consumer.start(); System.out.println("consumer Started!"); } }
五、控制台效果
提供者:

消費者:

六、RocketMQ控制台效果
測試前:

測試后:

