SpringBoot集成RocketMQ


實戰,用案例來說話

前面已經說了JMS和RocketMQ一些概念和安裝,下面使用SpringBoot來親身操作一下.

生產者的操作

  1. SpringBoot項目創建完成,引入依賴是第一步:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>    
    <version>4.3.0</version>
</dependency>
  1. 創建生產者是第二步,生產者必須依賴於生產組,而且需要指定nameServer
@Component
public class PayProducer {

    /**
     * 生產組,生產者必須在生產組內
     */
    private String producerGroup = "pay_group";

    /**
     * 端口
     */
    private String nameServer = "39.106.214.179:9876";


    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        // 指定nameServer地址,多個地址之間以 ; 隔開
        producer.setNamesrvAddr(nameServer);
        start();
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }

    /**
     * 對象在使用之前必須調用一次,並且只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown() {
        producer.shutdown();
    }

}
  1. 創建Controller進行測試發送消息,必須要指定topic,消息依賴於主題
@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;

    /**
     * topic,消息依賴於topic
     */
    private static final String topic = "pay_test_topic";


    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // 創建消息  主題   二級分類   消息內容好的字節數組
        Message message = new Message(topic, "taga", ("hello rocketMQ " + text).getBytes());

        SendResult send = payProducer.getProducer().send(message);

        System.out.println(send);

        return new HashMap<>();
    }

}
  1. 采坑記錄

    • 上面完成就可以啟動項目了,訪問之后報錯了:
    MQClientException: No route info of this topic, TopicTest1
    這個的原因就是Broker禁止自動創建Topic且用戶沒有通過手動方式創建Topic, 或者是broker與Nameserver網絡不通
    解決:
        使用手動創建Topic,在RocketMQ控制台的主題中創建就好,最主要的是指定topic name,如下圖
            出現創建不了的情況往下看
        
        如果還出現這個問題,請關閉防火牆
    

    • 這次說下上面可能創建不了的問題,前面說了安裝開放安全組,這次就是因為rocketMQ虛擬的端口問題,需要開放10909,也就是說ECS最終開放的端口號: 8080,10911,9876,10909

    • 繼續采坑

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    
    這個問題是阿里雲服務器存在多個網卡,rocketMQ會根據當前網卡選擇一個IP使用,我們需要制定一個IP:
        路徑是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq
        
        vim ./conf/broker.conf
        
        添加配置: brokerIP1=公網IP
        
        重新啟動: 
            nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
            tail -f nohup.out
    
    • 其他問題
        https://blog.csdn.net/qq_14853889/article/details/81053145
        https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
        https://www.jianshu.com/p/bfd6d849f156
        https://blog.csdn.net/wangmx1993328/article/details/81588217
    

消費者操作

  1. 在前一個項目的基礎上,將公共內容提取出來,創建一個JsmConfig的類,來聲明公共內容:

    public class JmsConfig {
    
        /**
         * 端口
         */
        public static final String NAME_SERVER = "39.106.214.179:9876";
    
        /**
         * topic,消息依賴於topic
         */
        public static final String TOPIC = "pay_test_topic";
    }
    
  2. 生產者內容變為

    @Component
    public class PayProducer {
    
        /**
         * 生產組,生產者必須在生產組內
         */
        private String producerGroup = "pay_producer_group";
        
        private DefaultMQProducer producer;
    
        public PayProducer() {
            producer = new DefaultMQProducer(producerGroup);
            // 指定nameServer地址,多個地址之間以 ; 隔開
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            start();
        }
    
        public DefaultMQProducer getProducer() {
            return producer;
        }
    
        /**
         * 對象在使用之前必須調用一次,並且只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在應用上下文,使用上下文監聽器,進行關閉
         */
        public void shutdown() {
            producer.shutdown();
        }
    
    }
    
  3. 創建消費者

    @Component
    public class PayConsumer {
    
        private DefaultMQPushConsumer consumer;
    
        private String consumerGroup = "pay_consumer_group";
    
        public PayConsumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            // 設置消費地點,從最后一個進行消費(其實就是消費策略)
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 訂閱主題的哪些標簽
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // 注冊監聽器
            consumer.registerMessageListener((MessageListenerConcurrently)
                    (msgs, context) -> {
                        try {
                            // 獲取Message
                            Message msg = msgs.get(0);
                            System.out.printf("%s Receive New Messages: %s %n",
                                    Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                            String topic = msg.getTopic();
                            String body = new String(msg.getBody(), "utf-8");
                            // 標簽
                            String tags = msg.getTags();
                            String keys = msg.getKeys();
                            System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    });
            consumer.start();
            System.out.println("Consumer Listener");
        }
    
    }
    
  4. Controller的變化:

    @RestController
    public class PayController {
    
        @Autowired
        private PayProducer payProducer;
    
    
    
        @RequestMapping("/api/v1/pay_cb")
        public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            // 創建消息  主題   二級分類   消息內容好的字節數組
            Message message = new Message(JmsConfig.TOPIC, "taga", ("hello rocketMQ " + text).getBytes());
    
            SendResult send = payProducer.getProducer().send(message);
    
            System.out.println(send);
    
            return new HashMap<>();
        }
    
    }
    

梳理一下整個流程,生產者存在於生產組,所以生產組很重要,創建生產者需要指定生產組.消費者同理,創建消費者也需要指定消費組. 並且二者都需要指定NameServer. 有了生產者就要發送消息,也就是Message,創建Message需要指定Topic,二級分類和消息體等信息. 那消費者如何獲取呢? 無非就是綁定Topic和二級分類就可以,這就是整個流程. 中間少說了消息的存放,消息是在broker中,這個相當於倉庫,所以就是生產者生產消息到Broker,Consumer從Broker中獲取消息進行消費.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM