RabbitMQ 延時隊列


一、延時隊列概念

延時隊列最重要的特性體現在它的延時屬性上,隊列內部是有序的,延時隊列中的消息是希望在到了指定時間之前或者之后被取出處理的

 

二、延時隊列的應用場景

1、用戶下了訂單,十分鍾之內未進行支付則自動取消訂單

2、新創建的店鋪,如果在十天之內都沒有上架商品,則發送消息進行提醒

3、用戶注冊賬號成功后,如果半個月沒有登錄,則發送消息進行提醒

4、用戶發起退款,如果三天之內都沒有得到處理,則發送消息通知相關運營人員進行處理

5、預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參與

上面的這些場景都有一個特點,需要在某個時間發生之前或者之后完成某一項任務,例如發生訂單生成時間,在十分鍾之后需要檢查該訂單的支付狀態,如果訂單未進行支付,需要將該訂單關閉,理論上我們通過定時任務,一直輪詢數據,每秒都查一次,取出所有十分鍾之后未支付的訂單,然后關閉就好了,如果數據量比較少,使用定時任務確實是一個不錯的選擇,但是,如果數據量比較大怎么辦呢,輪詢大量的數據對數據庫的壓力是很大的,並且實時性也不好(輪詢大量數據需要時間),這樣就無法滿足業務要求,並且性能低下.這種情況下我們就可以使用 RabbitMQ 的延時隊列了

 

三、延遲隊列實戰一

1、原理圖

2、引入 Maven 依賴

<dependencies>
    <!--RabbitMQ 依賴-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>2.9.2</version>
    </dependency>
    <!--RabbitMQ 測試依賴-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>

3、application.yml

spring:
  rabbitmq:
    host: 192.168.59.130
    port: 5672
    username: admin
    password: admin123

4、RabbitMQ 配置類

/**
 * RabbitMQ 配置類
 */
@Configuration
public class RabbitConfig {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String NORMAL_QUEUE_1 = "normal_queue_1";
    private static final String NORMAL_QUEUE_2 = "normal_queue_2";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String NORMAL_ROUTING_KEY_1 = "nk1";
    private static final String NORMAL_ROUTING_KEY_2 = "nk2";
    private static final String DEAD_ROUTING_KEY = "dk";

    // 聲明普通交換機
    @Bean("normal_exchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE, true, false);
    }

    // 聲明普通隊列 1,當該隊列超時(30s)后會將消息傳遞到死信交換機
    @Bean("normal_queue_1")
    public Queue normalQueue1() {
        Map<String, Object> arguments = new HashMap<>();
        // 設置死信交換機
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 設置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 設置消息超時時間(ms)
        arguments.put("x-message-ttl", 30 * 1000);
        // 普通隊列 1 綁定死信交換機
        return QueueBuilder.durable(NORMAL_QUEUE_1).withArguments(arguments).build();
    }

    // 聲明普通隊列 1 綁定普通交換機
    @Bean
    public Binding normalQueue1BindingNormalExchange(@Qualifier("normal_queue_1") Queue queue, @Qualifier("normal_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_1);
    }

    // 聲明普通隊列 2,當該隊列超時(10s)后會將消息傳遞到死信交換機
    @Bean("normal_queue_2")
    public Queue normalQueue2() {
        Map<String, Object> arguments = new HashMap<>();
        // 設置死信交換機
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 設置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 設置消息超時時間(ms)
        arguments.put("x-message-ttl", 10 * 1000);

        return QueueBuilder.durable(NORMAL_QUEUE_2).withArguments(arguments).build();
    }

    // 聲明普通隊列 2 綁定普通交換機
    @Bean
    public Binding normalQueue2BindingNormalExchange(@Qualifier("normal_queue_2") Queue queue, @Qualifier("normal_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_2);
    }

    // 聲明死信交換機
    @Bean("dead_exchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    // 聲明死信隊列
    @Bean("dead_queue")
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    // 聲明死信隊列綁定死信交換機
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DEAD_QUEUE = "dead_queue";
    @RabbitListener(queues = DEAD_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到死信隊列信息{}", new Date().toString(), msg);
    }
}

6、Producer

@Slf4j
@RestController
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY_1 = "nk1";
    private static final String NORMAL_ROUTING_KEY_2 = "nk2";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/ttl/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("當前時間:{},發送一條信息給兩個 TTL 隊列:{}", new Date(), message);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY_1, "消息來自 ttl 為 30S 的隊列: " + message);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY_2, "消息來自 ttl 為 10S 的隊列: " + message);
    }
}

7、測試過程及結果

啟動項目,可以看到交換機和隊列注冊到了 RabbitMQ 服務器上

接着發送請求 http://localhost:8080/ttl/sendMsg/小毛毛變身

控制台顯示 Producer 發送消息之后,Consumer 分別在 10 s 和 30 s 后從死信隊列中消費到了消息,也就是生產者發送的兩個消息分別延時了 10 s 和 30 s

 

四、延遲隊列實戰二

從上面的案例中我們已經實現了消息延遲功能,但是上面的案例存在不足,我們是將延時的時間直接設置在了 normal_queue_1、normal_queue_2 中,如果有新的需求,需要在 1 小時、2 小時之后發送呢,按照實戰一中的邏輯,就必須要新增兩個隊列 normal_queue_3、normal_queue_4,讓它們將消息延時 1 小時、2 小時,這樣就很不友好,為了解決上述的弊端,我們可以設置一個通用的隊列,然后通過 Producer 來設置消息的延時時間,這樣就可以很靈活的控制延遲隊列的時間了

1、原理圖

2、RabbitMQ 配置類

@Configuration
public class RabbitConfig {
    private static final String COMMON_EXCHANGE = "common_exchange";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String COMMON_QUEUE = "common_queue";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String COMMON_ROUTING_KEY = "common";
    private static final String DEAD_ROUTING_KEY = "dk";

    // 聲明通用交換機
    @Bean("common_exchange")
    public DirectExchange commonExchange() {
        return new DirectExchange(COMMON_EXCHANGE, true, false);
    }

    // 聲明通用隊列,當該隊列超時(30s)后會將消息傳遞到死信交換機
    @Bean("common_queue")
    public Queue commonQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 設置死信交換機
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 設置 routing-key
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 普通隊列 1 綁定死信交換機
        return QueueBuilder.durable(COMMON_QUEUE).withArguments(arguments).build();
    }

    // 聲明通用隊列綁定通用交換機
    @Bean
    public Binding normalQueue1BindingNormalExchange(@Qualifier("common_queue") Queue queue, @Qualifier("common_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(COMMON_ROUTING_KEY);
    }

    // 聲明死信交換機
    @Bean("dead_exchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    // 聲明死信隊列
    @Bean("dead_queue")
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    // 聲明死信隊列綁定死信交換機
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY);
    }
}

3、Producer

@Slf4j
@RestController
public class Producer {
    private static final String COMMON_EXCHANGE = "common_exchange";
    private static final String COMMON_ROUTING_KEY = "common";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/ttl/sendMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        // 設置延時時間
        rabbitTemplate.convertAndSend(COMMON_EXCHANGE, COMMON_ROUTING_KEY, message, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
            return messagePostProcessor;
        });
        log.info("當前時間:{},發送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(), ttlTime, message);
    }
}

4、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DEAD_QUEUE = "dead_queue";

    @RabbitListener(queues = DEAD_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到死信隊列信息{}", new Date().toString(), msg);
    }
}

5、測試過程及結果

啟動 Springboot 項目,查看 RabbitMQ 控制台的 Exchange、Queue

瀏覽器分別發送請求

http://localhost:8080/ttl/sendMsg/小毛毛變身/30000
http://localhost:8080/ttl/sendMsg/小毛毛真機智/1000

查看 IDEA 控制台

兩個延時消息成功發送出去了,一個消息延時 30 s,另外一個消息延時 1 s,但是為什么 Consumer 接收到消息的時間是一樣的呢,延時 1 s 的消息不是應該更早消費掉嗎?

因為我們的代碼是使用在消息屬性上設置 TTL 的方式,消息可能並不會按時死亡,RabbitMQ 只會檢查第一個消息是否過期,過期則丟到死信隊列,如果第一個消息的延時很長很長,而第二個消息的延時很短,第二個消息也不會優先得到執行

 

五、延遲隊列實戰三

實戰二中出現的狀況確實是一個比較麻煩的問題,如果不能實現在消息粒度上的 TTL,並使其在設置的 TTL 時間及時死亡,就無法設計成一個通用的延時隊列,那么如何解決呢?

1、安裝延時隊列插件

從官網下載延時隊列插件,把下載好的插件上傳到 RabbitMQ 安裝目錄的 plugins 目錄下,執行如下命令即可

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后重啟 RabbitMQ,重啟完成之后查看 RabbitMQ 控制台,發現交換機類型多了一種類型 x-delayed-message

2、原理圖

3、RabbitMQ 配置類

@Configuration
public class RabbitConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //自定義交換機 我們在這里定義的是一個延遲交換機
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        // 自定義交換機的類型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4、Producer

@Slf4j
@RestController
public class Producer {
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("當前時間:{}, 發送一條延遲 {} 毫秒的信息給隊列 delayed.queue:{}", new Date(), delayTime, message);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String DELAYED_QUEUE = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到延遲隊列信息{}", new Date().toString(), msg);
    }
}

6、測試過程及結果

啟動 Springboot 項目,查看 RabbitMQ 控制台的 Exchange、Queue

瀏覽器分別發送

http://localhost:8080/sendDelayMsg/小毛毛變身/90000
http://localhost:8080/sendDelayMsg/小毛毛真可愛/10000

查看 IDEA 控制台,分別在 10 s、90 s 后消費者消費了該消息

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM