【MQ中間件】RabbitMQ -- SpringBoot整合RabbitMQ(3)


1.前言說明

前面一篇博客中提到了使用原生java代碼進行測試RabbitMQ實現多種交換機類型的隊列場景。但是在項目中我們一般使用SpringBoot項目,而且RabbitMQ天生對於Spring的支持是非常良好的,所以這里基於SpringBoot我搭建了一個模擬購買商品訂單下單並發送消息使用RabbitMQ消息隊列的場景來分析實現不同模式下的場景。

也是對於SpringBoot整合RabbitMQ的一種總結。

使用到的模型如下圖所示,在下訂單處理的同時,采用消息隊列生產者向MQ消息中間件中生產消息發送給對應的隊列,創建消費者來消費隊列中的消息調用服務。

 

2.基於SpringBoot配置類構建消息隊列

項目構建我采用的是IDEA中Spring Initializr構建器創建的SpringBoot Maven項目,這部分主要是使用到了Spring RabbitMQ與SpringBoot Web的依賴組件。

由於原生支持,在IDEA中勾選對應的選項即可,非常簡單,無需考慮多余的Maven Repository引入。

創建SpringBoot項目主要有springboot-order-rabbitmq-consumer與springboot-order-rabbitmq-producer兩個Module。

 

這里還是簡單說明一下pom.xml與application.yml配置:

pom.xml

 <dependencies>
        <!--rabbitmq starter依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
View Code

application.yml

# 服務端口
server:
  port: 8080
# 配置rabbitmq服務
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 127.0.0.1  #基於本地windows RabbitMQ測試,雲服務填寫對應地址即可
    port: 5672
View Code

 

2.1.生產者配置類

RabbitMQ中消息隊列模式主要常用的模式就是:fanoutdirecttopic模式,這里我主要講解fanout與direct進行配置類構建生產者消費者。

整合生成消息隊列(交換機、Queues及綁定關系、Routing key)可以從生產者端也可從消費者端進行。

主要構建方式有兩種:

配置類生成交換機與隊列

注解形式綁定交換機隊列關系(topic使用注解方式構建)

 

這里先說第一種配置類方式:

使用配置類生成消息生產者隊列主要配置類說明:

主要配置類XxxTypeRabbitConfig

//注意:XxxType表示是交換機類型:可以是Fanout/Direct/Topic/Headers
@Configuration
public class XxxTypeRabbitConfig {
    //使用注入方式聲明對應的Queue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("email.xxxType.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.xxxType.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        return new Queue("weixin.xxxType.queue", true);
    }

    //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange
    @Bean
    public XxxTypeExchange xxxTypeOrderExchange() {
        return new XxxTypeExchange("xxxType_order_exchange", true, false);
    }

    //綁定關系:將隊列和交換機綁定, 並設置用於匹配鍵:routingKey
    @Bean
    public Binding bindingXxxType1() {
        return BindingBuilder
                .bind(weixinQueue())  //綁定哪個Queue
                .to(fanoutOrderExchange());  //是哪個交換機
    }
    @Bean
    public Binding bindingXxxType2() {
        return BindingBuilder.bind(smsQueue()).to(xxxTypeOrderExchange());
    }

    @Bean
    public Binding bindingXxxType3() {
        return BindingBuilder.bind(emailQueue()).to(xxxTypeOrderExchange());
    }
}
View Code

消息發送類,主要給創建的隊列填充消息,這里主要用到RabbitTemplate類調用convertAndSend方法進行對應交換機消息隊列的發送:

@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "";
    // 2: 路由key
    private String routeKey = "";

    //XxxType類型交換機
    public void makeOrderXxxType(Long userId, Long productId, int num) {
        exchangeName = "xxxType_order_exchange";
        routeKey = "";
        // 1: 模擬用戶下單
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根據商品id productId 去查詢商品的庫存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判斷庫存是否充足
        // if(num >  numstore ){ return  "商品庫存不足..."; }
        // 4: 下單邏輯
        // orderService.saveOrder(order);
        // 5: 下單成功要扣減庫存
        // 6: 下單完成以后
        System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
        // 發送訂單信息給RabbitMQ xxxType
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }

}
View Code

 

2.2.Fanout模式消息生產者

①創建交換機與隊列生成配置類,注意fanout這里綁定Queues的時候不要設置routing key,是采用廣播訂閱發送的方式:

/**
 * @Description:  fanout交換機類型就是對應的消息采用廣播訂閱模式,訂閱綁定交換機的隊列都應該收到消息
 * @Author: fengye
 * @Date: 2021/4/16 14:29
 */
@Configuration
public class FanoutRabbitConfig {
    //使用注入方式聲明對應的Queue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        return new Queue("weixin.fanout.queue", true);
    }

    //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange
    @Bean
    public FanoutExchange fanoutOrderExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //綁定關系:將隊列和交換機綁定, 並設置用於匹配鍵:routingKey
    @Bean
    public Binding bindingFanout1() {
        return BindingBuilder
                .bind(weixinQueue())  //綁定哪個Queue
                .to(fanoutOrderExchange());  //是哪個交換機
    }
    @Bean
    public Binding bindingFanout2() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
    }

    @Bean
    public Binding bindingFanout3() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
    }
}

 

②消息隊列發送到Queue,使用OrderService進行發送,主要用到了RabbitTemplate:

@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "";
    // 2: 路由key
    private String routeKey = "";

    //Fanout類型交換機
    public void makeOrderFanout(Long userId, Long productId, int num) {
        exchangeName = "fanout_order_exchange";
        routeKey = "";
        // 1: 模擬用戶下單
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根據商品id productId 去查詢商品的庫存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判斷庫存是否充足
        // if(num >  numstore ){ return  "商品庫存不足..."; }
        // 4: 下單邏輯
        // orderService.saveOrder(order);
        // 5: 下單成功要扣減庫存
        // 6: 下單完成以后
        System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
        // 發送訂單信息給RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

 

③生產者方啟動測試類向fanout_order_exchange交換機隊列發送消息,存儲到消息隊列中:

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void fanoutTest() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrderFanout(userId, productId, num);
        }
    }
}

 

運行結果:

 

 生成隊列並存儲10條消息。

 

2.3.Fanout模式消息消費者

①配置類實現消息消費者隊列比較簡單,主要就是使用@RabbitListener綁定對應的隊列,並使用@RabbitHandler接收消息對應中的參數信息即可,注意選擇合適的數據類型接收:

 

 對應消息隊列類配置:

//通過@RabbitListener綁定隊列接收消息
@RabbitListener(queues = {"weixin.fanout.queue"})
@Component
public class FanoutDuanxinConsumer {
    //隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("weixin fanout----接收到了訂單信息是:->" + message);
    }
}


@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email fanout----接收到了訂單信息是:->" + message);
    }
}


@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSMSConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms fanout----接收到了訂單信息是:->" + message);
    }
}

 

啟動消息接收者consumer SpringBoot項目:

 

 

 

 可以看到消息隊列存儲消息已被消費,控制台打印出了對應的消息信息。

 

2.4.Direct模式消息生產者

Direct模式消息生產者基於配置類構建與Fanout一樣,這里簡單說明一下配置類的增加的代碼就行:

修改XxxTypeConfig基類為DirectExchange:

/**
 * @Description:  direct交換機類型采用routing key與Queue進行綁定,通過key不同一對一進行消息傳遞
 * @Author: fengye
 * @Date: 2021/4/16 14:29
 */
@Configuration
public class DirectRabbitConfig {
    //使用注入方式聲明對應的Queue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        return new Queue("weixin.direct.queue", true);
    }

    //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange
    @Bean
    public DirectExchange directOrderExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    //綁定關系:將隊列和交換機綁定, 並設置用於匹配鍵:routingKey
    @Bean
    public Binding bindingFanout1() {
        return BindingBuilder
                .bind(weixinQueue())  //綁定哪個Queue
                .to(directOrderExchange())  //是哪個交換機
                .with("weixin");   //對應什么key
    }
    @Bean
    public Binding bindingFanout2() {
        return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms");
    }

    @Bean
    public Binding bindingFanout3() {
        return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email");
    }
}

 

對應消息發送Service類:

@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "";
    // 2: 路由key
    private String routeKey = "";

    //Direct類型交換機
    public void makeOrderDirect(Long userId, Long productId, int num) {
        exchangeName = "direct_order_exchange";
        routeKey = "weixin";
        // 1: 模擬用戶下單
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根據商品id productId 去查詢商品的庫存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判斷庫存是否充足
        // if(num >  numstore ){ return  "商品庫存不足..."; }
        // 4: 下單邏輯
        // orderService.saveOrder(order);
        // 5: 下單成功要扣減庫存
        // 6: 下單完成以后
        System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
        // 發送訂單信息給RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

 

執行測試類進行測試:

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void directTest() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrderDirect(userId, productId, num);
        }
    }
}

 

運行結果:

可以看到DirectQueue消息隊列已經生成並存儲到對應的weixin路由Key的隊列中:

 

 

 

2.5.Direct模式消息消費者

 

 ①創建對應的消息隊列消費者類,使用@RabbitListener、@RabbitHandler進行監聽並綁定消息獲取結果,這部分與上面的Fanout模式消費者是一樣的:

//通過@RabbitListener綁定隊列接收消息
@RabbitListener(queues = {"weixin.direct.queue"})
@Component
public class DirectDuanxinConsumer {
    //隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("duanxin direct queue----接收到了訂單信息是:->" + message);
    }
}

@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email direct----接收到了訂單信息是:->" + message);
    }
}

@RabbitListener(queues = {"sms.direct.queue"})
@Component
public class DirectSMSConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms direct----接收到了訂單信息是:->" + message);
    }
}

 

②啟動SpringBoot項目進行消費測試:

 

 

 

 可以看到消息隊列中綁定weixin端隊列收到了10條消息。

 

3.基於SpringBoot注解類構建消息隊列

使用注解方式實現消息隊列主要是從消費者進行交換機與Queues隊列的綁定關系建立,並使用@Component進行注入,可以比較簡單地處理交換機與隊列之間的綁定關系,隨SpringBoot項目一啟動就同時創建Exchange與Queues隊列的關系。

下面總的說一下主要的注解:

//通過@RabbitListener綁定隊列接收消息
// bindings其實就是用來確定隊列和交換機綁定關系
@RabbitListener(bindings = @QueueBinding(
    //隊列名字,綁定對應的隊列接收消息
    value = @Queue(value = "weixin.xxxType.queue", autoDelete = "false"),
    //交換機名字,必須和生產者中交換機名相同;指定綁定的交換機類型
    exchange = @Exchange(value = "xxxType_order_exchange", type = ExchangeTypes.XXXType),
    key = "com.#"
))
//隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
@RabbitHandler

 

3.1.Topic模式消息消費者

topic模式這里從消息消費者Springboot項目入手,優先創建出RabbitMQ上的消息隊列與交換機進行綁定,基於@RabbitListener與@QueueBinding會隨項目啟動自動創建消息隊列:

 

//通過@RabbitListener綁定隊列接收消息
// bindings其實就是用來確定隊列和交換機綁定關系
@RabbitListener(bindings = @QueueBinding(
    //隊列名字,綁定對應的隊列接收消息
    value = @Queue(value = "weixin.topic.queue", autoDelete = "false"),
    //交換機名字,必須和生產者中交換機名相同;指定綁定的交換機類型
    exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
    key = "com.#"
))
@Component
public class TopicDuanxinConsumer {
    //隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("duanxin topic----接收到了訂單信息是:->" + message);
    }
}


@RabbitListener(bindings = @QueueBinding(
        //隊列名字,綁定對應的隊列接收消息
        value = @Queue(value = "email.topic.queue", autoDelete = "false"),
        //交換機名字,必須和生產者中交換機名相同;指定綁定的交換機類型
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "#.order.#"
))
@Component
public class TopicEmailConsumer {
    //隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email topic----接收到了訂單信息是:->" + message);
    }

}


@RabbitListener(bindings = @QueueBinding(
        //隊列名字,綁定對應的隊列接收消息
        value = @Queue(value = "sms.topic.queue", autoDelete = "false"),
        //交換機名字,必須和生產者中交換機名相同;指定綁定的交換機類型
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "*.course.*"
))
@Component
public class TopicSMSConsumer {
    //隊列中的消息會通過@RabbitHandler注解注入到方法參數中,就可以獲取到隊列中的消息
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms topic----接收到了訂單信息是:->" + message);
    }
}

啟動SpringBoot消費者項目,進行驗證:

 

 

 

 

 

3.2.Topic模式消息生產者

 使用注解配置無需再創建對應的配置類Config來綁定Exchange與Queues的關系了。

直接使用Sevice調用服務發送消息即可。

①服務調用、向隊列中發送消息:

@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "";
    // 2: 路由key
    private String routeKey = "";


    //Topic類型交換機
    public void makeOrderTopic(Long userId, Long productId, int num) {
        exchangeName = "topic_order_exchange";
        routeKey = "com.course.user";
        // 1: 模擬用戶下單
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根據商品id productId 去查詢商品的庫存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判斷庫存是否充足
        // if(num >  numstore ){ return  "商品庫存不足..."; }
        // 4: 下單邏輯
        // orderService.saveOrder(order);
        // 5: 下單成功要扣減庫存
        // 6: 下單完成以后
        System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
        // 發送訂單信息給RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

 

②服務測試:

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void topicTest() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrderTopic(userId, productId, num);
        }
    }
}

消息發送:

 

 消費方consumer服務(消費者服務不停止)接收消息:

 

本博客示例涉及代碼均已上傳至Github:

RabbitMQStudy


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM