1、先導入jar包,我使用的是maven
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.4.9</version> <type>pom</type> </dependency> <!-- rocker-mq核心:start --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency>
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; /** * 消息訂閱 * * @author admin * */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); /** * 1、負責查找NameServer,多個NameServer地址用分號隔開 */ consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("QuickStartConsumer"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicModel","TagA"); /** * 訂閱指定topic下所有消息<br> * 注意:一個consumer對象可以訂閱多個topic */ consumer.subscribe("TopicMode2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicModel")) { // 執行TopicModel的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println("執行消費邏輯:"+new String(msg.getBody())); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("\nConsumer Started.========>運行...\n"); } }
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * 消息產生者Producer * * @author admin * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例<br> * 注意:ProducerGroupName需要由應用來保證唯一<br> * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵, * 因為服務器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer"); /** * 1、負責查找NameServer,多個NameServer地址用分號隔開 */ producer.setNamesrvAddr("127.0.0.1:9876"); /** * 2、客戶端實例名稱(這個實例包含網絡連接、線程資源等) */ producer.setInstanceName("QuickStartProducer"); /** * 3、Producer對象在使用之前必須要調用start初始化,初始化一次即可<br> * 注意:切記不可以在每次發送消息時,都調用start方法 */ producer.start(); /** * 這里模擬了 10 次消息發送,一個Producer對象可以發送多個topic,多個tag的消息。 * //發生消息時必須聲明區分Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());: */ for (int I = 1; I <=1; I++) { try { { Message msg = new Message("TopicModel", // topic:必填,線下環境不需要申請,線上環境需要申請后才能使用 "TagA", // tag:設置的標簽,方便服務器過濾使用。目前只支持每個消息設置一個tag "OrderID001",// key :保證key唯一,訂單號,商品Id等,代表這條消息的業務關鍵詞,可以在Console系統根據Topic、Keys來查詢消息 ("產品:內容......" + I).getBytes()// body:序列化應用,Producer與Consumer要協商好序列化形式。 ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicModel", "TagB", "OrderID002", ("特定信息:......" + I).getBytes() ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicMode2", "TagA", "OrderID002", ("訂單:......" + I).getBytes() ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /** * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從MetaQ服務器上注銷自己 * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法 */ // producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { producer.shutdown(); System.out.println("\nProducer========>運行...\n"); } })); System.exit(0); } }
⦁ 消息產生者Producer /** * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例 * 注意:ProducerGroupName需要由應用來保證唯一 * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵, * 因為服務器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer"); /** * 1、負責查找NameServer,多個NameServer地址用分號隔開 */ producer. setNamesrvAddr ("127.0.0.1:9876"); /** * 2、客戶端實例名稱(這個實例包含網絡連接、線程資源等) */ producer. setInstanceName("QuickStartProducer"); /** * 3、Producer對象在使用之前必須要調用start初始化,初始化一次即可 * 注意:切記不可以在每次發送消息時,都調用start方法 */ producer. start ();