導入依賴
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.0.10</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.0.10</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
提供者
/**
* 生產者
*/
public class Provider {
public static void main(String[] args) throws MQClientException {
//創建一個生產者
DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
//設置NameServer地址
producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
//設置生產者實例名稱
producer.setInstanceName("producer");
//啟動生產者
producer.start();
try {
//發送消息
for (int i=1;i<=10;i++){
//模擬網絡延遲,每秒發送一次MQ
Thread.sleep(1000);
//創建消息,topic主題名稱 tags臨時值代表小分類, body代表消息體
Message message=new Message("itmayiedu-topic","TagA",("itmayiedu-"+i).getBytes());
//發送消息
SendResult sendResult=producer.send(message);
System.out.println("來了來了:"+sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消費者
public class Consumer {
public static void main(String[] args) throws MQClientException {
//創建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
//設置實例名稱
consumer.setInstanceName("consumer");
//訂閱topic
consumer.subscribe("itmayiedu-topic","TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//獲取消息
for (MessageExt messageExt:list){
//RocketMQ由於是集群環境,所有產生的消息ID可能會重復
System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
}
//接受消息狀態 1.消費成功 2.消費失敗 隊列還有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
System.out.println("consumer Started!");
}
}
控制台效果
提供者

消費者
