springBoot--集成RocketMQ


1、導入依賴

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>spring-boot-starter-rocketmq</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <!-- rocketmq dependencies -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2、配置生產者

package com.example.demo.mq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class MyProducer {
    /**
     * 生產組,生產者必須在生產組內
     */
    private String producerGroup = "newTest_group";

    /**
     * nameSrv IP及端口
     */
    private String nameSrv = "****:9876";

    /**
     * 生產者
     */
    private DefaultMQProducer producer;

    /**
     * 設置producer對應的組及namesrv信息,並啟動producer
     */
    public MyProducer(){
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameSrv);
        start();
    }

    /**
     * 獲取producer
     * @return
     */
    public DefaultMQProducer getProducer(){
        return producer;
    }

    /**
     * 對象在使用之前必須調用一次,並且只能初始化一次
     */
    public void start(){
        try{
            this.producer.start();
        }catch (MQClientException e){
            e.printStackTrace();
        }
    }

    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown(){
        producer.shutdown();;
    }
}

3、發送消息Controller

package com.example.demo.controller;

import com.example.demo.mq.MyProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class MQController {
    @Autowired
    private MyProducer producer;
    /**
     * topic
     */
    private static final String topic = "my_topic_test01";

    @GetMapping(value = "/mq/test1")
    public Object test1(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        /**
         * message構造函數,三個參數分別為 主題、二級分類、消息內容(需要轉換為字節數組)
         */
        Message message = new Message(topic,"lcl",("hello RocketMQ"+text).getBytes());

        SendResult result = producer.getProducer().send(message);

        System.out.println(result);

        return new HashMap<>();
    }
}

4、客戶端

package com.example2.demo2.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {
    /**
     * 生產組,生產者必須在生產組內
     */
    private String producerGroup = "newTest_group";

    /**
     * nameSrv IP及端口
     */
    private String nameSrv = "118.190.215.76:9876";

    /**
     * topic
     */
    private static final String topic = "my_topic_test01";

    private DefaultMQPushConsumer consumer;

    public MyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(producerGroup);
        consumer.setNamesrvAddr(nameSrv);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(topic,"*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{
            try{
                Message message = msgs.get(0);
                System.out.println("%s Receive New Messages: %s %n"  + Thread.currentThread().getName() + new String(msgs.get(0).getBody()));
                String topic = message.getTopic();
                String body = new String(message.getBody(),"utf-8");
                String tags = message.getTags();
                String keys = message.getKeys();
                System.out.println("topic="+ topic +",tags="+tags+",keys="+keys+",body="+body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }catch(Exception e){
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
    }
}

5、測試

 

 

 

 

注意:一定要創建topic,創建topic可以通過控制台創建、可以通過linux命令進行創建,也可以通過程序自動創建,無論以哪種方式創建,都需要創建topic,否則會報錯。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM