最簡單的SpringBoot集成RocketMQ(一)基礎


1.准備

首先要在本地安裝RocketMQ,安裝教程參考 RocketMQ安裝

2.創建SpringBoot項目集成RocketMQ

首先創建SpringBoot項目

1)pom引入依賴如下

<dependencies>
        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

2)application.properties配置文件如下

server.port=8088 rocketmq.producer.groupName=ProducerGroup rocketmq.producer.namesrvAddr=127.0.0.1:9876 rocketmq.producer.instanceName=ProducerGroup rocketmq.producer.topic=topic2020 rocketmq.producer.tag=test rocketmq.producer.maxMessageSize=131072 rocketmq.producer.sendMsgTimeout=10000 rocketmq.consumer.namesrvAddr=127.0.0.1:9876 rocketmq.consumer.groupName=ConsumerGroup rocketmq.consumer.topic=topic2020 rocketmq.consumer.tag=test rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64

3)創建MessageProcessor消息處理接口

package com.springboot.message; import org.apache.rocketmq.common.message.MessageExt; public interface MessageProcessor { boolean handle(MessageExt messageExt); }

4)實現MessageProcessorImpl消息處理類

package com.springboot.message.impl; import com.springboot.message.MessageProcessor; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; /** * Description:監聽消息處理類 */ @Service public class MessageProcessorImpl implements MessageProcessor { @Override public boolean handle(MessageExt messageExt) { // 收到的body(消息體),字節類型,需轉為String
        String result = new String(messageExt.getBody()); System.out.println("監聽到了消息,消息為:"+ result); return true; } }

5)創建MessageListen消息監聽類

package com.springboot.message; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * Description:監聽類 */ @Component public class MessageListen implements MessageListenerConcurrently { @Autowired private MessageProcessor messageProcessor; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); boolean result = messageProcessor.handle(ext); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }

6)創建消息生產者RocketMQProducer

package com.springboot.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description:生產者配置
 */
@Configuration
@Slf4j
public class RocketMQProducer {

    @Value("${rocketmq.producer.groupName}")
    private String groupName;

    @Value("${rocketmq.producer.namesrvAddr}")
    private String nameserAddr;

    @Value("${rocketmq.producer.instanceName}")
    private String instanceName;

    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;

    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameserAddr);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setVipChannelEnabled(false);
        log.info("================>生產者創建完成,ProducerGroupName{}<================", groupName);
        return producer;
    }

}

(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中開啟/銷毀生成者)

7)創建消息消費者RocketMQConsumer

package com.springboot.consumer;

import com.springboot.message.MessageListen;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description:消費者配置
 */
@Configuration
@Slf4j
public class RocketMQConsumer {

    @Autowired
    private MessageListen messageListen;

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.groupName}")
    private String groupName;

    @Value("${rocketmq.consumer.topic}")
    private String topic;

    @Value("${rocketmq.consumer.tag}")
    private String tag;

    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;

    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer getRocketMQConsumer()
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setVipChannelEnabled(false);
        // 我們自己實現的監聽類
        consumer.registerMessageListener(messageListen);
        try {
            consumer.subscribe(topic,tag);
            log.info("================>消費者創建完成,ConsumerGroupName{}<================",groupName);
            log.info("============>消費者監聽開始,groupName:{},topic:{}<============",groupName,topic);
        } catch (MQClientException e) {
            log.error("消費者啟動失敗");
            e.printStackTrace();
        }
        return consumer;
    }

}

(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中開啟/銷毀消費者)

8)創建controller類,RocketMqController

package com.springboot.controller;

import com.springboot.consumer.RocketMQProducer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Description:
 */
@RestController
public class RocketMqController {

    @Autowired
    @Qualifier("rocketMQProducer")
    RocketMQProducer rocketMQProducer;

    @GetMapping("/test")
    public void TestSend() {
        DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String body = "hi RocketMQ, now is  " + sdf.format(new Date()) + ".";
        Message message = new Message("topic2020", "test", body.getBytes());
        try {
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

3.測試

1)首先啟動本地RocketMQ(啟動參考

2)啟動主啟動類,在瀏覽器輸入 localhost:8088/test,可以看到消費者已經創建並開始監聽,並且已經消費消息

3)可視化圖看出,在瀏覽器輸入 localhost:7777

 

源碼:https://gitee.com/niugit_admin/springboot-rocketmq


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM