Springboot+死信實現RabbitMQ延遲隊列


原理


  生產者把帶有 ttl(Time-To-Live過期時間) 的消息發送到一個臨時隊列(DelayQueue),該隊列沒有消費者;

  該消息在DelayQueue中停留直至過期,同時該消息沒有ReQueue(重新入隊),就變成了死信(Dead-letter或Dead-message),死信自動地被發送給了配置好的DLX(Dead-Letter-Exchange);

  DLX根據路由規則把消息路由到了配置好的隊列中(DeadLetterQueue),隊列中的消息被消費者消費。

maven依賴

引入amqp的依賴, 生產者和消費者都需要

<!--amqp 適用rabbitmq-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

boostrap配置

同樣,生產者和消費者都需要
properties.yaml:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456

使用配置類創建各組件

rabbitmq的基本的組件是Exchange(交換機)、Queue(隊列)、Binding(綁定)。實現延時隊列需要定義如下組件:

  1. 一個帶ttl的Queue臨時隊列;
  2. 一個普通的用於業務的Queue,即死信隊列;
  3. 定義一個Exchange,即死信交換器DLX(Dead-Letter-Exchange);
  4. 再定義一個Bingding把死信隊列和死信交換器綁定在一起。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayQueueConfig {

    /**為了更貼合業務,參數名不使用DeadQueue之類的*/
    /**延遲隊列名*/
    private static String DELAY_QUEUE = "delay.queue";
    /**延遲隊列(死信隊列)交換器名*/
    private static String DELAY_EXCHANGE = "delay.exchange";
    /**處理業務的隊列(死信隊列)*/
    private static String PROCESS_QUEUE = "process.queue";
    /**ttl(10秒)*/
    private static int DELAY_EXPIRATION = 10000;

    /**
    * 創建延遲隊列
    * "x-dead-letter-exchange"參數定義死信隊列交換機
    * "x-dead-letter-routing-key"定義死信隊列中的消息重定向時的routing-key
    * "x-message-ttl"定義消息的過期時間
    */
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE)
                .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", PROCESS_QUEUE)
                .withArgument("x-message-ttl", DELAY_EXPIRATION)
                .build();
    }

    /**創建用於業務的隊列*/
    @Bean
    public Queue processQueue(){
        return QueueBuilder.durable(PROCESS_QUEUE)
                .build();
    }

    /**創建一個DirectExchange*/
    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE);
    }

    /**綁定Exchange和queue,把消息重定向到業務queue*/
    @Bean
    Binding dlxBinding(DirectExchange directExchange, Queue processQueue){
        return BindingBuilder.bind(processQueue)
                .to(directExchange)
                .with(PROCESS_QUEUE);
                //綁定,以PROCESS_QUEUE為routing key
    }
}

生產者創建發送消息的組件

@Component
public class MessageSender {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String routingKey, String msg) {
        //amqpTemplate.convertAndSend("process.queue", msg);
        amqpTemplate.convertAndSend(routingKey, msg);
    }
}

消費者自動接收並處理消息的組件

@Component
//注意監聽的Queue是用於業務的ProcessQueue, 而不是臨時存放消息的DelayQueue
@RabbitListener(queues = "process.queue")
public class MessageReceiver {

    @RabbitHandler()
    public void doSth(String msg) {
        //TODO
    }
}

拓展

該方案存在的缺點:消息會堆積在隊列中,如果隊列已滿,新的消息會變為死信,會直接重發送到死信隊列,此時就沒有“延遲”的效果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM