引入依賴pom:
<!-- TCP Java SDK ,生產者com.aliyun.openservices.ons.api.Producer,消費者,以及消息,消息內容Message message, ConsumeContext context等類 --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.0.Final</version> </dependency>
初始化生產者,和消費者,直接上代碼
package com.example.demo.rocketmq.init; import java.util.Date; import java.util.Properties; import java.util.UUID; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl; import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer; /** * 初始化生產者 * @author zel * @date 2019年3月31日 上午11:37:27 * */ @Component public class RocketmqProducerInit { private static final Logger logger = LoggerFactory.getLogger(RocketmqProducerInit.class); // @Value("${rocketmq.topic}") // private String topic; @Value("${rocketmq.groupid}") private String groupId; @Value("${rocketmq.accessKey}") private String accessKey; @Value("${rocketmq.secretKey}") private String secretKey; @Value("${rocketmq.namesrvAddr}") private String namesrvAddr; private static Producer producer; private static class SingletonHolder { private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit(); } private RocketmqProducerInit() { } public static final RocketmqProducerInit getInstance() { return SingletonHolder.INSTANCE; } @PostConstruct public void init() { // producer 實例配置初始化 Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, groupId); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.SecretKey, secretKey); // 設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, String.valueOf(1000*3));//超時時間 // 設置 TCP 接入域名,到控制台的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR,namesrvAddr); producer = ONSFactory.createProducer(properties); producer.start(); //獲取是否運行狀態 // if (producer instanceof ProducerImpl) { // System.out.println("它是DefaultMQProducerImpl實例"); // ProducerImpl producerImpl = (ProducerImpl) producer; // DefaultMQProducerImpl defaultMQProducerImpl = producerImpl.getDefaultMQProducer().getDefaultMQProducerImpl(); // logger.info("start前狀態字段值:" + defaultMQProducerImpl.getServiceState()); // producer.start(); // logger.info("start后狀態字段值:" + defaultMQProducerImpl.getServiceState()); // } } /** * @param type simple:無序消息;order:順序消息;Transaction:事務消息; * @return */ public Producer getProducer() { return producer; } }
package com.example.demo.rocketmq.init; import java.util.Properties; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.example.demo.controller.RocketmqEvaluateListener; import com.example.demo.controller.RocketmqSMSReminderListener; import com.example.demo.controller.RocketmqWWlogListener; /** * 初始化消費者 * @author zel * @date 2019年3月31日 上午11:38:16 * */ @Component //@Configuration //@ConditionalOnProperty(value = "rocketmqConsumerInit.enabled", havingValue = "true", matchIfMissing = true) public class RocketmqConsumerInit { private Logger logger = LoggerFactory.getLogger(RocketmqConsumerInit.class); private static Consumer consumer; @Value("${rocketmq.topic}") private String topic; @Value("${rocketmq.topicsms}") private String topicsms; @Value("${rocketmq.tag}") private String tag; @Value("${rocketmq.groupid}") private String groupId; // @Value("${rocketmq.topicevaluate}") // private String topicevaluate; @Value("${rocketmq.accessKey}") private String accessKey; @Value("${rocketmq.secretKey}") private String secretKey; @Value("${rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void consumerFactory(){//不同消費者 這里不能重名 Properties consumerProperties = new Properties(); // 您在控制台創建的 Group ID consumerProperties.put(PropertyKeyConst.GROUP_ID, groupId); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 consumerProperties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 consumerProperties.put(PropertyKeyConst.SecretKey, secretKey); // 設置 TCP 接入域名,到控制台的實例基本信息中查看 consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR,namesrvAddr); // 集群訂閱方式 (默認) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 廣播訂閱方式 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); consumer = ONSFactory.createConsumer(consumerProperties); logger.info("topic:"+topic+",tag:"+tag); consumer.subscribe(topic, tag, new RocketmqWWlogListener()); // consumer.subscribe("testtopic", "TagA2BPublishTest", new RocketmqTest1Listener()); //訂閱另外一個 狀態發送變化發送短信Topic consumer.subscribe(topicsms, "*", new RocketmqSMSReminderListener()); //訂閱另外一個 自動評價 Topic //consumer.subscribe(topicevaluate, "*", new RocketmqEvaluateListener()); // consumer.subscribe(topicsms, tagSMS, new MessageListener() { //訂閱多個 Tag // public Action consume(Message message, ConsumeContext context) { // logger.info("短信消息隊列消費監聽Receive: " + new String(message.getBody())); // //調用發送短信接口 // return Action.CommitMessage; // } // }); //在發送消息前,必須調用start方法來啟動consumer,只需調用一次即可,當項目關閉時,自動shutdown consumer.start(); logger.info("ConsumerConfig start success."); } /** * 初始化消費者 * @return */ public Consumer getconsumer(){ return consumer; } }
生產者測試類:使用測試的topic【testtopic】測試
- 設置基本屬性【GroupId、AccessKey 、SecretKey、TCP 接入域名等】
- 創建生產者
- 並.Start(),
- 發送消息.send(msg【topic、tag、Message Body任何二進制形式的數據】)
- 根據Message類型(不同topic、tag)發送不同消息給不同訂閱者

package com.example.demo.rocketmq; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Date; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_testgroup"); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.AccessKey,"LTAId2oS8TzH82lH"); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.SecretKey, "abHRyVHZYHmeBCEmM6uXQaQXbbWHtk"); //設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設置 TCP 接入域名,到控制台的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1356501_BaSQ7VE8.mq-internet-access.mq-internet.aliyuncs.com:80"); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可 producer.start(); //循環發送消息 for (int i = 0; i < 100; i++){ Message msg = new Message( // // Message 所屬的 Topic "testtopic", // Message Tag 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾 "TagA", // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預, // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制台查詢消息並補發 // 注意:不設置也不會影響消息正常收發 msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // 同步發送消息,只要不拋異常就是成功 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在應用退出前,銷毀 Producer 對象 // 注意:如果不銷毀也沒有問題 producer.shutdown(); } }
消費者測試類:
- 設置基本屬性【GroupId、AccessKey 、SecretKey、TCP 接入域名、訂閱方式等】,
- 創建消費者,
- 並訂閱subscribe(topic, tag, MessageListener接口實現類實例【內部實現consume方法】)
- 訂閱多個不同Message(不同topic、tag),不同consume方法。

package com.example.demo.rocketmq; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在控制台創建的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "GID_testgroup"); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.AccessKey, "LTAId2oS8TzH82lH"); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建 properties.put(PropertyKeyConst.SecretKey, "abHRyVHZYHmeBCEmM6uXQaQXbbWHtk"); // 設置 TCP 接入域名,到控制台的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1356501_BaSQ7VE8.mq-internet-access.mq-internet.aliyuncs.com:80"); // 集群訂閱方式 (默認) 分攤消費,即:生產20條消息,啟動兩個消費者,兩個消費者消費加起來一共消費20條(11+9條,10+10條) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 廣播訂閱方式,啟動兩個消費者,理論上是各自消費二十條數據,消費端的消費方式是多線程消費的 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("testtopic", "TagA||TagB", new MessageListener() { //訂閱多個 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); //訂閱另外一個 Topic(訂閱多個topic,同時要保證consumer里的多個消費者實例訂閱關系一致性) consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //訂閱全部 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
可根據需要發送不同類型消息到消息隊列