RabbitMQ-基礎


1. 簡介

MQ(Message Queue)消息隊列,是基礎數據結構中“FIFO(先進先出)”的一種數據結構。

一般用來解決應用解耦異步消息流量削峰等問題,實現高性能,高可用,可伸縮和最終一致性架構。

應用解耦

MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。

異步消息

將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。

流量削峰

如訂單系統,在下單的時候就會往數據庫寫數據。但是數據庫只能支撐每秒1000左右的並發寫入,並發量再高就容易宕機。低峰期的時候並發也就100多個,但是在高峰期時候,並發量會突然激增到5000以上,這個時候數據庫肯定卡死了。

這時候我們可以使用MQ將消息保存起來,然后系統就可以按照自己的消費能力來消費,比如每秒1000個數據,這樣慢慢寫入數據庫,這樣就不會卡死數據庫了。

但是使用了MQ之后,限制消費消息的速度為1000,但是這樣一來,高峰期產生的數據勢必會被積壓在MQ中,高峰就被“削”掉了。但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000QPS,直到消費完積壓的消息,這就叫做“填谷”。

2. RabbitMQ

RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列。

RabbitMQ 其實是一個消息代理:它接受和轉發消息。可以將其視為郵局:當你把 要投遞的郵件放入郵箱時,你可以確定郵遞員最終會將郵件遞送給你的收件人。在這個比喻中,RabbitMQ 是一個郵箱、一個郵局和一個信件載體。 RabbitMQ 和郵局之間的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊 - 消息。

3. 模式

官方網站

這里僅介紹了常用的模式,最近看官網又多個模式Publisher Confirms,完了有時間再補充上。

關於官網中提到的第六種模式RPC,由於RPC通信一般不使用RabbitMQ,所以這里也沒有講。

3.1 簡單模式

如圖所示:只有一個生產者(P)一個隊列(紅色塊)和 一個消費者(C)。

應用場景:可以實現對應用程序的解耦,並且可以實現對業務的異步處理。事實上這是mq最基本的功能。

3.2 工作模式

如圖所示:一個生產者對應多個消費者。多個消費者功能消費一個隊列(負載均衡)。

每個消息只能被其中的一個消費者消費。

應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

3.3 發布訂閱模式

如圖所示:在生產者和隊列之間多了個交換機(X),此時的交換機類型為:扇形交換機(Fanout Exchange)

事實上,簡單模式和工作模式也都有自己的Exchange,只不過不用顯性的聲明,因為默認使用default Exchange

即:一個發送到Exchange的消息都會被轉發到與該交換機綁定的所有隊列上。

每一個消息能被多個消費者都消費。

Fanout Exchange消息路由規則如圖所示:

應用場景:顧名思義,一個消息想被多個訂閱者消費。

3.4 路由模式

如圖所示:相比發布訂閱模式,Exchange和Queue之間多了個路由關系,此時的交換機類型為:直連交換機(Direct Exchange)

  • 隊列和交換機不是任意綁定了,而是要指定一個Routingkey

  • 生產者在向Exchange發送消息時,也必須指定消息的RoutingKey

  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息。

Direct Exchange消息路由規則如圖所示:

3.5 通配符模式/主題模式

如圖所示:相比路由模式,Exchange和Queue之間不只是通過固定的RoutingKey進行綁定,還支持通配符的方式,此時的交換機類型為:主題交換機/通配符交換機(Topic Exchange)

Topic Exchange消息路由規則如圖所示:

3. 安裝RabbitMQ

version: '2'
services:
    rabbitmq:
       hostname: rabbitmq
       image: rabbitmq:3.8.3-management
       restart: always
       environment:
         # 默認的用戶名
         - RABBITMQ_DEFAULT_USER=admin
         # 默認的密碼
         - RABBITMQ_DEFAULT_PASS=admin123
       volumes:
         - ./data:/var/lib/rabbitmq
         - ./log:/var/log/rabbitmq/log
       ports:
         # rabbit ui 默認端口
         - "15672:15672"
         # Epmd 是 Erlang Port Mapper Daemon 的縮寫,
         # 在 Erlang 集群中相當於 dns 的作用,綁定在4369端口上
         - "4369:4369"
         # rabbit 默認的端口
         - "5672:5672"
         # 25672端口用於節點間和CLI工具通信(Erlang分發服務器端口),
         # 並從動態范圍分配(默認情況下僅限於單個端口,
         # 計算方式為AMQP 0-9-1和AMQP 1.0端口+20000),
         # 默認情況下通過 RABBITMQ_NODE_PORT 計算是25672
         - "25672:25672"

4. 各種模式的簡單實現

4.1 項目搭建

4.1.1 引入依賴

我們這里使用spring-boot-starter-amqp操作RabbitMQ。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ldx</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

4.1.2 application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbitmq 安裝時指定的超管信息
    username: admin
    password: admin123

4.2 簡單模式

4.2.1 聲明一個簡單隊列

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbit 快速開始
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Configuration
public class RabbitSimpleConfig {

    /**
     * 設置一個簡單的隊列
     */
    @Bean
    public Queue queue() {
        return new Queue("helloMQ");
    }
}

4.2.2 創建生產者

package com.ldx.rabbitmq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 生產者
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Component
public class SimpleProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String context = "helloMQ " + System.currentTimeMillis();
        rabbitTemplate.convertAndSend("helloMQ", context);
    }
}

4.2.3 創建消費者

package com.ldx.rabbitmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer {
  
    @RabbitHandler
    public void process(String hello) {
        log.info("Message:{} ", hello);
    }
  
}

4.2.4 創建測試類

package com.ldx.rabbitmq;

import com.ldx.rabbitmq.producer.SimpleProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private SimpleProducer simpleSender;

    @Test
    public void hello() throws Exception {
        // 每秒發送一條消息
        for (int i = 0; i < 10; i++) {
            simpleSender.send();
            Thread.sleep(1000);
        }
    }
}

4.2.5 啟動測試

啟動測試類,輸出內容如下:

每秒消費一條消息。

2021-09-08 23:58:01.837  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116681827 
2021-09-08 23:58:02.839  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116682833 
2021-09-08 23:58:03.842  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116683838 
2021-09-08 23:58:04.852  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116684843 
2021-09-08 23:58:05.853  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116685844 
2021-09-08 23:58:06.853  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116686847 
2021-09-08 23:58:07.857  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116687850 
2021-09-08 23:58:08.863  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116688855 
2021-09-08 23:58:09.868  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116689858 
2021-09-08 23:58:10.870  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116690862 

4.2.6 小節

簡單模式,顧名思義,很簡單,相當於Hello World程序。我們在編寫的時候

  1. 指定了一個Queue並且名稱為helloMQ
  2. 消息生產者通過SpringBoot 提供的RabbitTemplate發送消息,我們在發送時指定了QueuehelloMQ且發送了指定內容。
  3. 消息消費者通過@RabbitListener注解監聽了指定QueuehelloMQ,且使用@RabbitHandler注解指定消費方法SimpleConsumer::process()
  4. 最后編寫測試類循環調用生產者消息發送邏輯,實現了消息的生產與消費。

4.3 工作模式

首先分析:其實工作模式和簡單模式相比,僅僅是又一個消費者變成了多個消費者。ok,很好辦,我們通過代碼再多加一個消費者即可。

4.3.1 添加消費者

package com.ldx.rabbitmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer2 {

    @RabbitHandler
    public void process(String hello) {
        log.info("Message2:{} ", hello);
    }

}

4.3.2 啟動測試

我們再次執行test方法,查看消息消費情況。

輸出日志如下:

SimpleConsumerSimpleConsumer2交替消費隊列中的消息(消費者之間消費消息是通過輪詢的關系)。

2021-09-09 20:24:35.043  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190275019 
2021-09-09 20:24:36.038  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190276029 
2021-09-09 20:24:37.036  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190277032 
2021-09-09 20:24:38.046  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190278036 
2021-09-09 20:24:39.049  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190279041 
2021-09-09 20:24:40.049  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190280042 
2021-09-09 20:24:41.054  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190281047 
2021-09-09 20:24:42.060  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190282051 
2021-09-09 20:24:43.062  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190283055 
2021-09-09 20:24:44.074  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190284057 

4.3.3 小節

在一個隊列中如果有多個消費者,那么消費者之間是輪詢的關系。

4.4 發布訂閱模式

首先分析:發布訂閱模式其實是將消息先發送給扇形交換機,交換機再將消息轉發給其綁定到此交換機的隊列上。

這里,我們聲明一個交換機,給交換機綁定兩個隊列,並且使用兩個消費者分別綁定到兩個隊列上(其實就是為了和3.3保持一致)。

4.4.1 聲明交換機和隊列

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 扇形交換機配置
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Configuration
public class RabbitFanoutConfig {

    public static final String EXCHANGE_NAME = "FANOUT_EXCHANGE";
    public static final String QUEUE_NAME = "FANOUT_QUEUE";
    public static final String QUEUE_NAME_1 = "FANOUT_QUEUE_1";

    /**
     * 1.交換機
     */
    @Bean(EXCHANGE_NAME)
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 2.Queue 隊列
     */
    @Bean(QUEUE_NAME)
    public Queue fanoutQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 2.1 Queue 隊列
     */
    @Bean(QUEUE_NAME_1)
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(QUEUE_NAME_1).build();
    }

    /**
     * 3. 隊列和交互機綁定關系 Binding
     */
    @Bean
    public Binding bindFanoutExchange(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        // fanout :routing key 默認為 "",指定了別的值也沒用
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

    /**
     * 3.1 隊列和交互機綁定關系 Binding
     */
    @Bean
    public Binding bindFanoutExchange1(@Qualifier(QUEUE_NAME_1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        // fanout :routing key 默認為 "",指定了別的值也沒用,我們這里隨便寫個值,看會不會有影響
        return BindingBuilder.bind(queue).to(exchange).with("aaabbb").noargs();
    }
}

4.4.2 創建生產者

package com.ldx.rabbitmq.producer;

import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 扇形交換機消息生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class FanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   public void sendWithFanout() {
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "", "fanout mq hello~~~");
        // 指定一個routingKey 看消費方能不能正常接收消息
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "abc", "fanout2 mq hello~~~");
    }
}

4.4.3 創建消費者

package com.ldx.rabbitmq.consumer;

import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 扇形交換機消息消費者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Slf4j
@Component
public class FanoutConsumer {

    @RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME})
    public void process(String message){
        log.info("queue === " + message);
    }

    @RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME_1})
    public void process1(String message){
        log.info("queue1 === " + message);
    }
}

4.4.4 創建測試代碼

    @Autowired
    private FanoutProducer producer;

    @Test
    @SneakyThrows
    public void sendWithFanout(){
        producer.sendWithFanout();
        // 為了阻塞進程,使消費者能正常消費。
        System.in.read();
    }

4.4.5 啟動測試

執行測試方法,輸出內容如下:

生產者發送的兩條消息,被兩個消費者共同消費了。實現了消息的廣播。

2021-09-09 21:59:17.538  INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue1 === fanout mq hello~~~
2021-09-09 21:59:17.538  INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue === fanout mq hello~~~
2021-09-09 21:59:17.539  INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue1 === fanout2 mq hello~~~
2021-09-09 21:59:17.539  INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue === fanout2 mq hello~~~

4.4.6 小節

本節代碼中我們創建了一個fanout Exchange,並且創建了兩個隊列與其綁定,其中一個隊列進行綁定的時候還指定了routing key,但程序執行時消息正常被消費,說明fanout Exchange不用指定routing key

發布訂閱模式與工作隊列模式的區別

1、工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。

2、發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。

3、發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁定到默認的交換機 。

4.5 路由模式

首先分析:路由模式其實就是將 發布訂閱模式中的 fanout Exchange 換成了 direct Exchange 從而指定相應的路由規則即可。

4.5.1 聲明交換機和隊列

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 直連交換機配置
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Configuration
public class RabbitDirectConfig {

    public static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
    public static final String QUEUE_NAME_INSERT = "DIRECT_QUEUE_INSERT";
    public static final String QUEUE_NAME_UPDATE = "DIRECT_QUEUE_UPDATE";

    /**
     * 1.交換機
     */
    @Bean(EXCHANGE_NAME)
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 2.Queue insert隊列
     */
    @Bean(QUEUE_NAME_INSERT)
    public Queue bootQueueInsert() {
        return QueueBuilder.durable(QUEUE_NAME_INSERT).build();
    }

    /**
     * 2.Queue update隊列
     */
    @Bean(QUEUE_NAME_UPDATE)
    public Queue bootQueueUpdate() {
        return QueueBuilder.durable(QUEUE_NAME_UPDATE).build();
    }

    /**
     * 3. 綁定insert 隊列
     * 3. routing key: insert
     */
    @Bean
    public Binding bindInsertDirectExchange(@Qualifier(QUEUE_NAME_INSERT) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("insert").noargs();
    }

    /**
     * 3. 綁定update 隊列
     * 3. routing key: update
     */
    @Bean
    public Binding bindUpdateDirectExchange(@Qualifier(QUEUE_NAME_UPDATE) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("update").noargs();
    }
}

4.5.2 創建生產者

package com.ldx.rabbitmq.producer;

import com.ldx.rabbitmq.config.RabbitDirectConfig;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 直連交換機消息生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class DirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   public void sendWithDirect() {
      rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "insert", "diect insert mq hello~~~");
      rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "update", "diect update mq hello~~~");
      // 指定一個沒有配置的routingKey 看消費方能不能接收消息
      rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "delete", "diect update mq hello~~~");
   }
}

4.5.3 創建消息者

package com.ldx.rabbitmq.consumer;

import com.ldx.rabbitmq.config.RabbitDirectConfig;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 直連交換機消息消費者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Slf4j
@Component
public class DirectConsumer {
		/**
		 * @param message message 為springboot 封裝的消息存儲的實例對象,其對象中不僅封裝了生產者發送的消息
		 *    而且也封裝了很多消息的元數據,例如:headers contentType receivedRoutingKey ...
		 */
    @RabbitListener(queues = {RabbitDirectConfig.QUEUE_NAME_INSERT, RabbitDirectConfig.QUEUE_NAME_UPDATE})
    public void directQueue(Message message){
        log.info(message.toString());
        log.info(new String(message.getBody()));
    }

}

4.5.4 創建測試代碼

    @Autowired
    private DirectProducer directProducer;

    @Test
    @SneakyThrows
    public void sendWithDirect() {
        directProducer.sendWithDirect();
        System.in.read();
    }

4.5.5 啟動測試

執行測試代碼,輸出內容如下:

insert 和 update 對應的消息都被正常消費,其中值得注意的是指定routing key=delete的消息丟失了,因為隊列與交換機綁定時根本沒有此routing key,而交換機之所以叫交換機,因為其不存儲消息,只是轉發消息,其沒有持久化消息的能力,所以消息還沒有到queue,然后嗝屁。

2021-09-09 22:38:49.451  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : (Body:'diect insert mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=insert, deliveryTag=1, consumerTag=amq.ctag-WJmYhQDljkKkM1pFeW99Yg, consumerQueue=DIRECT_QUEUE_INSERT])
2021-09-09 22:38:49.451  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : diect insert mq hello~~~
2021-09-09 22:38:49.452  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : (Body:'diect update mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=update, deliveryTag=2, consumerTag=amq.ctag-guzpfaF0BdII70w85ywiCg, consumerQueue=DIRECT_QUEUE_UPDATE])
2021-09-09 22:38:49.452  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : diect update mq hello~~~

4.5.6 小節

路由模式特點:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個rutingKey(路由key)。
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 routingKey
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的routing Key進行判斷,只有隊列的routingkey與消息的 routing key完全一致,才會接收到消息。

4.6 主題模式

首先分析:通配符模式其實就是將 路由模式中的 direct Exchange 換成了 topic Exchange, 使其不僅可以將exchangequeuerouting key全匹配的方式進行綁定,而且還支持通配符

routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規則:

#:匹配一個或多個單詞

*:匹配一個單詞

舉例:

item.#:能夠匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

圖解:

  • 紅色Queue:綁定的是usa.# ,因此凡是以 usa.開頭的routing key 都會被匹配到
  • 黃色Queue:綁定的是#.news ,因此凡是以 .news結尾的 routing key 都會被匹配

4.6.1 聲明交換機和隊列

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 主題交換機配置
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Configuration
public class RabbitTopicConfig {

    public static final String EXCHANGE_NAME = "TOPIC_EXCHANGE";
    public static final String QUEUE_NAME1 = "TOPIC_QUEUE1";
    public static final String QUEUE_NAME2 = "TOPIC_QUEUE2";

    /**
     * 1.交換機
     * topicExchange:通配符,把消息交給符合routing pattern(路由模式) 的隊列
     */
    @Bean(EXCHANGE_NAME)
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 2.Queue 隊列
     */
    @Bean(QUEUE_NAME1)
    public Queue bootQueue1() {
        return QueueBuilder.durable(QUEUE_NAME1).build();
    }

    /**
     * 2.Queue 隊列
     */
    @Bean(QUEUE_NAME2)
    public Queue bootQueue2() {
        return QueueBuilder.durable(QUEUE_NAME2).build();
    }

    /**
     * 3. 隊列和交互機綁定關系 Binding
     * 匹配 routing key 以 insert 開頭的 如 insert.user ; insert.user.log
     */
    @Bean
    public Binding bindTopicExchange1(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("insert.#").noargs();
    }

    /**
     * 3. 隊列和交互機綁定關系 Binding
     * routing key 中的 * 只能匹配單個單詞
     * 匹配 routing key 以 update 開頭的 如 update.user
     * 不能匹配 如 update.user.log
     */
    @Bean
    public Binding bindTopicExchange2(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("update.*").noargs();
    }

    /**
     * 3. 隊列和交互機綁定關系 Binding
     * routing key 中的 * 只能匹配單個單詞
     * 匹配 routing key 以 . 分割的
     * 不能匹配 如 update.user.log
     */
    @Bean
    public Binding bindTopicExchange3(@Qualifier(QUEUE_NAME2) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
    }
}

4.6.2 創建生產者

package com.ldx.rabbitmq.producer;

import com.ldx.rabbitmq.config.RabbitTopicConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 主題交換機消息生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   public void sendWithTopic() {
      rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "insert.user.log", "topic mq hello~~~ routing is insert.user.lo");
      rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user", "topic mq hello~~~ routing is update.user");
      rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user.log", "topic mq hello~~~ routing is update.user.log");
      rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user", "topic mq hello~~~ routing is delete.user");
      rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user.log", "topic mq hello~~~routing is delete.user.log");
   }
}

4.6.3 創建消費者

package com.ldx.rabbitmq.consumer;

import com.ldx.rabbitmq.config.RabbitTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 主題交換機消息消費者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Slf4j
@Component
public class TopicConsumer {

    @RabbitListener(queues = {RabbitTopicConfig.QUEUE_NAME1, RabbitTopicConfig.QUEUE_NAME2})
    public void topicQueue(Message message){
        log.info(message.toString());
        log.info(new String(message.getBody()));
    }
}

4.6.4 創建測試代碼

@Autowired
private TopicProducer topicProducer;

@Test
@SneakyThrows
public void sendWithTopic() {
    topicProducer.sendWithTopic();
    System.in.read();
}

4.6.5 啟動測試

執行測試代碼,輸入內容如下:

其中符合通配符條件的消息均已消費。

2021-09-09 23:02:57.131  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is insert.user.lo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=insert.user.log, deliveryTag=1, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is insert.user.lo
2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=2, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is update.user
2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=3, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is update.user
2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is delete.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=delete.user, deliveryTag=4, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is delete.user

4.6.6 小節

Topic主題模式可以實現 Publish/Subscribe發布與訂閱模式 Routing路由模式 的功能;只是Topic在配置routing key 的時候可以使用通配符,顯得更加靈活。

5. 模式總結

RabbitMQ工作模式:
1、簡單模式 HelloWorld
一個生產者、一個消費者,不需要設置交換機(使用default Exchange)。

2、工作隊列模式 Work Queue
一個生產者、多個消費者(平均分配消息),不需要設置交換機(使用default Exchange)。

3、發布訂閱模式 Publish/subscribe
需要設置交換機類型為fanout Exchange,並且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列。

4、路由模式 Routing
需要設置交換機類型為direct Exchange,交換機和隊列進行綁定,並且指定routing key,發送消息時也要指定對應的routing key到交換機,交換機會根據routing key將消息發送到對應的隊列。

5、主題模式 Topic
需要設置交換機類型為topic Exchange,,交換機和隊列進行綁定,並且指定通配符方式的routing key,發送消息時指定routing key到交換機后,交換機會根據routing key規則將消息發送到對應的隊列。主題模式比上面四類更靈活。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM