最新RocketMq與SpringBoot整合


最新版的RocketMqSpringBoot2.X進行整合可以利用rocketmq-spring-boot-starter來簡化配置,本文采用了最新版的jar包來整合,並且略微做了封裝,以便於其他模塊引用,適合於多生產者多消費者的情況。

項目依賴

主要用到了rocketmq的包和lombok的包,具體的依賴如下所示:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

配置項

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.enable=true

rocketmq.producer.group=test-group-e
rocketmq.producer.topic=test-topic-e

rocketmq.producer.group2=test-group-f
rocketmq.producer.topic2=test-topic-f



rocketmq.consume.group=consumer-group-3
rocketmq.consume.topic=test-topic-e

rocketmq.consume.group2=consumer-group-4
rocketmq.consume.topic2=test-topic-f

最重要的是rocketmq.name-serverrocketmq.producer.group這兩個屬性一定要配置,否則項目無法啟動,在封裝成中間件的時候rocketmq.producer.group可以隨意指定一個,親測即便指定了也不會在啟動的時候就生成這個生產者的group.rocketmq.producer.enable這個是我自己加的,為了讓rocketmq能按需加載,因為封裝成中間件的話其他模塊引入,有的可能用不到,所以bean需要按條件加載。 剩下的group和topic我分別都指定了兩個,為了模擬多消費者和多生產者的情況。沒有用tag來區分,因為Tag使用不當會引來不必要的麻煩,不同的功能嚴格按照topic和group來區分。

封裝的消息類

package com.rocket.mq.demo.controller;

import lombok.Data;

import java.io.Serializable;

/**
 * @author : zhangwei
 * @description : 消息
 * @date: 2020-08-20 11:03
 */

@Data
public class RocketMqMessage<T> implements Serializable {
    /**
     * 消息內容
     */
    private T content;


    /**
     * 消息的key
     */
    private String msgKey;

    /**
     * topic
     */
    private String producerTopic;
    /**
     * group
     */
    private String producerGroup;
    /**
     * tag
     */
    private String producerTag;
}

發送消息的公共方法

package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @author : zhangwei
 * @description : Mq發送消息的類
 * @date: 2020-08-21 09:54
 */
@Component
@Slf4j
@ConditionalOnProperty(name = "rocketmq.producer.enable", havingValue = "true")
public class MqSendService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 發送帶tag的消息
     *
     * @param msg
     * @param topic
     * @param group
     * @param tag
     * @author: zhangwei
     * @date: 2020/8/21 10:54
     * @return: org.apache.rocketmq.client.producer.SendResult
     **/
    private <T> SendResult send(T msg, String topic, String group, String tag) {
        if (StringUtils.isBlank(topic) || StringUtils.isBlank(group)) {
            new Throwable("發送方topic或者group不能為空");
        }
        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
        RocketMqMessage message = new RocketMqMessage();
        message.setProducerTopic(topic);
        message.setProducerGroup(group);
        message.setProducerTag(tag);
        message.setContent(msg);
        message.setMsgKey(uuid);
        // 發送消息
        Message messageFinal = MessageBuilder.withPayload(message).setHeader("KEYS", uuid).build();
        String destination = topic;
        if (StringUtils.isNotBlank(tag)) {
            destination = topic + ":" + tag;
        }
        SendResult result = rocketMQTemplate.syncSend(destination, messageFinal);
        log.info("成功發送消息,消息內容為:{},返回值為:{}", message, result);
        return result;
    }

    /**
     * 發送不帶tag的消息
     *
     * @param msg
     * @param topic
     * @param group
     * @author: zhangwei
     * @date: 2020/8/21 10:54
     * @return: org.apache.rocketmq.client.producer.SendResult
     **/
    public <T> SendResult send(T msg, String topic, String group) {
        return this.send(msg, topic, group, null);
    }


}

多生產者發送消息示例

package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

/**
 * @author : zhangwei
 * @description : mq消息發送服務
 * @date: 2020-08-20 19:11
 */
@Slf4j
@Service
@RestController
public class Producer {

    @Value(value = "${rocketmq.producer.topic}")
    private String topic;

    @Value(value = "${rocketmq.producer.group}")
    private String group;

    @Autowired
    private MqSendService mqSendService;

    @GetMapping("/test-rocketmq/sendMsg")
    public String testSendMsg() {
        List<String> list=new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        mqSendService.send(list,topic,group);
        return "send message success";
    }
}

package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

/**
 * @author : zhangwei
 * @description : mq消息發送服務
 * @date: 2020-08-20 19:11
 */
@Slf4j
@Service
@RestController
public class Producer2 {

    @Value(value = "${rocketmq.producer.topic2}")
    private String topic;

    @Value(value = "${rocketmq.producer.group2}")
    private String group;

    @Autowired
    private MqSendService mqSendService;

    @GetMapping("/test-rocketmq/sendMsg2")
    public String testSendMsg() {
        List<String> list=new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        mqSendService.send(list,topic,group);
        return "send message success";
    }
}

多消費者消費消息示例

package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
 * @author : zhangwei
 * @description : ed
 * @date: 2020-08-20 16:29
 */
@Slf4j
@Component
// topic需要和生產者的topic一致,consumerGroup屬性是必須指定的,內容可以隨意
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic}", consumerGroup = "${rocketmq.consume.group}")
public  class Consumer implements  RocketMQListener<RocketMqMessage> {

    @Override
    public void onMessage(RocketMqMessage message) {
        log.info("======我收到了消息,消息內容為:{}",message);
    }
}

package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
 * @author : zhangwei
 * @description : ed
 * @date: 2020-08-20 16:29
 */
@Slf4j
@Component
// topic需要和生產者的topic一致,consumerGroup屬性是必須指定的,內容可以隨意
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic2}", consumerGroup = "${rocketmq.consume.group2}")
public  class Consumer2 implements  RocketMQListener<RocketMqMessage> {

    @Override
    public void onMessage(RocketMqMessage message) {
        log.info("======我收到了消息,消息內容為:{}",message);
    }
}

鏡像地址

https://www.zhangwei.wiki/#/posts/16

pay


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM