springboot2 整合 rocketmq


  引入依賴pom:

<!-- TCP Java SDK ,生產者com.aliyun.openservices.ons.api.Producer,消費者,以及消息,消息內容Message message, ConsumeContext context等類 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.0.Final</version>
        </dependency>

初始化生產者,和消費者,直接上代碼

package com.example.demo.rocketmq.init;

import java.util.Date;
import java.util.Properties;
import java.util.UUID;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer;

/** 
 * 初始化生產者
* @author zel 
* @date 2019年3月31日 上午11:37:27 
*  
*/
@Component
public class RocketmqProducerInit {
    private static final Logger logger = LoggerFactory.getLogger(RocketmqProducerInit.class);
    
//    @Value("${rocketmq.topic}")
//    private String topic;
    
    @Value("${rocketmq.groupid}")
    private String groupId;

    @Value("${rocketmq.accessKey}")
    private String accessKey;

    @Value("${rocketmq.secretKey}")
    private String secretKey;

    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private static Producer producer;

    private static class SingletonHolder {
        private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit();
    }

    private RocketmqProducerInit() {
    }

    public static final RocketmqProducerInit getInstance() {
        return SingletonHolder.INSTANCE;
    }

    @PostConstruct
    public void init() {
        // producer 實例配置初始化
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        // 設置發送超時時間,單位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, String.valueOf(1000*3));//超時時間
        // 設置 TCP 接入域名,到控制台的實例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,namesrvAddr);
        producer = ONSFactory.createProducer(properties);
        producer.start();
        //獲取是否運行狀態
//        if (producer instanceof ProducerImpl) {
//            System.out.println("它是DefaultMQProducerImpl實例");
//            ProducerImpl producerImpl = (ProducerImpl) producer;
//            DefaultMQProducerImpl defaultMQProducerImpl = producerImpl.getDefaultMQProducer().getDefaultMQProducerImpl();
//            logger.info("start前狀態字段值:" + defaultMQProducerImpl.getServiceState());
//            producer.start();
//            logger.info("start后狀態字段值:" + defaultMQProducerImpl.getServiceState());
//        }
    }

    /**
     * @param type simple:無序消息;order:順序消息;Transaction:事務消息;
     * @return
     */
    public Producer getProducer() {
        return producer;
    }
    
}
package com.example.demo.rocketmq.init;

import java.util.Properties;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.example.demo.controller.RocketmqEvaluateListener;
import com.example.demo.controller.RocketmqSMSReminderListener;
import com.example.demo.controller.RocketmqWWlogListener;

/** 
 * 初始化消費者
* @author zel 
* @date 2019年3月31日 上午11:38:16 
*  
*/
@Component
//@Configuration
//@ConditionalOnProperty(value = "rocketmqConsumerInit.enabled", havingValue = "true", matchIfMissing = true)
public class RocketmqConsumerInit {
      private Logger logger = LoggerFactory.getLogger(RocketmqConsumerInit.class);
      
      private static Consumer consumer;
      
          @Value("${rocketmq.topic}")
        private String topic;
        @Value("${rocketmq.topicsms}")
        private String topicsms;
          @Value("${rocketmq.tag}")
        private String tag;
        @Value("${rocketmq.groupid}")
        private String groupId;
//        @Value("${rocketmq.topicevaluate}")
//        private String topicevaluate;
        
        
        @Value("${rocketmq.accessKey}")
        private String accessKey;
    
        @Value("${rocketmq.secretKey}")
        private String secretKey;
    
        @Value("${rocketmq.namesrvAddr}")
        private String namesrvAddr;

        @PostConstruct
      public void consumerFactory(){//不同消費者 這里不能重名
        Properties consumerProperties = new Properties();
     // 您在控制台創建的 Group ID
        consumerProperties.put(PropertyKeyConst.GROUP_ID, groupId);
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        consumerProperties.put(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        consumerProperties.put(PropertyKeyConst.SecretKey, secretKey);
        // 設置 TCP 接入域名,到控制台的實例基本信息中查看
        consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR,namesrvAddr);
        // 集群訂閱方式 (默認)
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        consumer = ONSFactory.createConsumer(consumerProperties);
        logger.info("topic:"+topic+",tag:"+tag);
        consumer.subscribe(topic, tag, new RocketmqWWlogListener());
//        consumer.subscribe("testtopic", "TagA2BPublishTest", new RocketmqTest1Listener());
        //訂閱另外一個 狀態發送變化發送短信Topic
        consumer.subscribe(topicsms, "*", new RocketmqSMSReminderListener());
      //訂閱另外一個 自動評價 Topic
        //consumer.subscribe(topicevaluate, "*", new RocketmqEvaluateListener());
//        consumer.subscribe(topicsms, tagSMS, new MessageListener() { //訂閱多個 Tag
//            public Action consume(Message message, ConsumeContext context) {
//                logger.info("短信消息隊列消費監聽Receive: " + new String(message.getBody()));
//                //調用發送短信接口
//                return Action.CommitMessage;
//            }
//        });
        //在發送消息前,必須調用start方法來啟動consumer,只需調用一次即可,當項目關閉時,自動shutdown
        consumer.start();
        logger.info("ConsumerConfig start success.");
      }
        /**
         * 初始化消費者
         * @return
         */
        public Consumer getconsumer(){
            return consumer;
        }
}

生產者測試類:使用測試的topic【testtopic】測試

  1. 設置基本屬性【GroupId、AccessKey 、SecretKey、TCP 接入域名等】
  2. 創建生產者
  3. 並.Start(),
  4. 發送消息.send(msg【topic、tag、Message Body任何二進制形式的數據】)
  5. 根據Message類型(不同topic、tag)發送不同消息給不同訂閱者
package com.example.demo.rocketmq;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Date;
import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_testgroup");
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.AccessKey,"LTAId2oS8TzH82lH");
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.SecretKey, "abHRyVHZYHmeBCEmM6uXQaQXbbWHtk");
        //設置發送超時時間,單位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設置 TCP 接入域名,到控制台的實例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "http://MQ_INST_1356501_BaSQ7VE8.mq-internet-access.mq-internet.aliyuncs.com:80");

        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
        producer.start();

        //循環發送消息
        for (int i = 0; i < 100; i++){
            Message msg = new Message( //
                // Message 所屬的 Topic
                "testtopic",
                // Message Tag 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
                "TagA",
                // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
                // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                "Hello MQ".getBytes());
            // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
            // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制台查詢消息並補發
            // 注意:不設置也不會影響消息正常收發
            msg.setKey("ORDERID_" + i);

            try {
                SendResult sendResult = producer.send(msg);
                // 同步發送消息,只要不拋異常就是成功
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        // 在應用退出前,銷毀 Producer 對象
        // 注意:如果不銷毀也沒有問題
        producer.shutdown();
    }
}
View Code

消費者測試類:

  1. 設置基本屬性【GroupId、AccessKey 、SecretKey、TCP 接入域名、訂閱方式等】,
  2. 創建消費者,
  3. 並訂閱subscribe(topic, tag, MessageListener接口實現類實例【內部實現consume方法】)
  4. 訂閱多個不同Message(不同topic、tag),不同consume方法。
package com.example.demo.rocketmq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台創建的 Group ID
        properties.put(PropertyKeyConst.GROUP_ID, "GID_testgroup");
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.AccessKey, "LTAId2oS8TzH82lH");
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        properties.put(PropertyKeyConst.SecretKey, "abHRyVHZYHmeBCEmM6uXQaQXbbWHtk");
        // 設置 TCP 接入域名,到控制台的實例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
            "http://MQ_INST_1356501_BaSQ7VE8.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集群訂閱方式 (默認) 分攤消費,即:生產20條消息,啟動兩個消費者,兩個消費者消費加起來一共消費20條(11+9條,10+10條)
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式,啟動兩個消費者,理論上是各自消費二十條數據,消費端的消費方式是多線程消費的
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("testtopic", "TagA||TagB", new MessageListener() { //訂閱多個 Tag
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        //訂閱另外一個 Topic(訂閱多個topic,同時要保證consumer里的多個消費者實例訂閱關系一致性)
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //訂閱全部 Tag
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}
View Code

 可根據需要發送不同類型消息到消息隊列

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM