RabbitMQ-TTL-死信隊列_DLX


1. 簡介

死信隊列,簡稱:DLXDead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發送到另外一個交換機,這個交換機就是DLX

(一般會將DLX和與其binding 的 Queue,一並稱為死信隊列或DLX,習慣而已,不必糾結)

那么什么情況下會成為Dead message

  1. 隊列的長度達到閾值。
  2. 消費者拒接消費消息,basicNack/basicReject,並且不把消息重新放入原目標隊列,requeue=false
  3. 原隊列存在消息過期設置,消息到達超時時間未被消費。

流程講解,如圖所示(以第三種情況為例):

  1. Producer發送一條消息到Exchange並路由到設有過期時間(假設30分鍾)的Queue中。
  2. 當消息的存活時間超過了30分鍾后,Queue會將消息轉發給DLX
  3. DLX接收到Dead message后,將Dead message路由到與其綁定的Queue中。
  4. 此時消費者監聽此死信隊列並消費此消息。

死信隊列有什么用呢?

  1. 取消訂單(比如下單30分鍾后未付款,則取消訂單,回滾庫存),或者新用戶注冊,隔段時間進行短信問候等。
  2. 將消費者拒絕的消息發送到死信隊列,然后將消息進行持久化,后續可以做業務分析或者處理。

2. TTL

因為要實現延遲消息,我們先得知道如何設置過期時間。這里指演示

TTLTime To Live(存活時間/過期時間),當消息到達存活時間后,還沒有被消費,會被自動清除。

RabbitMQ可以對消息設置過期時間,也可以對整個隊列(Queue)設置過期時間。

  • 設置隊列過期時間使用參數:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統一過期。

  • 設置消息過期時間使用參數:expiration。單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。 例:現在有兩條消息,第一條消息過期時間為30s,而第二條消息過期時間為15s,當過了15秒后,第二條消息不會立即過期,而是要等第一條消息被消費后,第二條消息被消費時,才會判斷是否過期,所以當所有消息的過期時間一致時(比如30m后過期),最好給隊列設置過期時間,而不是消息。但是有的情況確實每個消息的過期時間不一致,比如海底撈預約,每個人預約的時間段不一致,有個可能一個小時后,有的可能三個小時等,當快到預約時間點需要給用戶進行短信通知,這就有問題了,不可能設置那么多的隊列,這時就需要使用延遲隊列來實現這個功能(下篇博文會講到)。

  • 如果兩者都進行了設置,以時間短的為准。

2.1 隊列設置TTL

2.1.1 引入所需依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

2.1.2 application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbit 用戶名密碼
    username: admin
    password: admin123

2.1.3 RabbitConfig

  1. 聲明一個過期時間為30s的Queue
  2. 聲明一個交換機(這里聲明的是主題交換機,交換機類型無所謂,只要消息能路由到Queue即可)。
  3. 設置綁定關系。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 設置過期隊列
 *
 * @author ludangxin
 * @date 2021/9/15
 */
@Configuration
public class RabbitTtlConfig {
   public static final String EXCHANGE_NAME = "TTL_EXCHANGE";
   public static final String QUEUE_NAME = "TTL_QUEUE";

   @Bean(QUEUE_NAME)
   public Queue queue() {
      return QueueBuilder.durable(QUEUE_NAME).ttl(30000).build();
   }

   @Bean(EXCHANGE_NAME)
   public Exchange exchange() {
      return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
   }

   @Bean
   public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
   }
}

2.1.4 Producer

import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 具有過期時間的消息 生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class TtlProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   public void sendMsg() {
      rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", "這是一條有生命周期的消息。");
   }
}

2.1.5 測試代碼

@Autowired
private TtlProducer ttlProducer;

@Test
public void sendMsg() {
    ttlProducer.sendMsg();
}

2.1.6 啟動測試

運行測試代碼后,到RabbitMQ 控制台中查看隊列即消息情況。

如圖所示,消息存活30s未被消費后,消息被遺棄。

2.2 消息設置TTL

2.2.1 Producer

我們將Producer代碼稍加修改,給消息設置10s的過期時間,觀察消息到底是存活30s還是10s。

import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 具有過期時間的消息 生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class TtlProducer {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   public void sendMsg() {
      MessageProperties mp = new MessageProperties();
      mp.setExpiration("10000");
      Message message = new Message("這是一條有生命周期的消息。".getBytes(), mp);
      rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", message);
   }
}

2.2.2 啟動測試

如圖所示,消息只存活了10s。

我們將過期時間設置成40s后,但消息還是只存活了30s。說明當同時設置了過期時間時,是以時間短的為准

3. TTL + DLX

接下來我們通過設置過期時間和死信隊列來實現延遲隊列的功能。

首先羅列下實現步驟:

  1. 聲明一個ExchangeTTl Queue,並且綁定關系,實現生成死信的邏輯。
  2. 聲明一個DLXQueue,此步驟的Queue是為了接收死信並讓Consumer進行監聽消費的。
  3. TTl QueueDLX進行綁定,使消息成為死信后能轉發給DLX

3.1 RabbitConfig

其實DLX與普通的Exchange沒有什么區別,只不過是“生產”死信的Queue指定了消息成為死信后轉發到DLX。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 死信隊列配置
 *
 * @author ludangxin
 * @date 2021/9/15
 */
@Configuration
public class RabbitDeadLetterConfig {

   public static final String QUEUE_NAME_TTL = "QUEUE_NAME_TTL_1";
   public static final String EXCHANGE_NAME_TTL = "EXCHANGE_NAME_TTL_1";
   public static final String QUEUE_NAME_DEAD_LETTER = "QUEUE_NAME_DEAD_LETTER";
   public static final String EXCHANGE_NAME_DLX = "EXCHANGE_NAME_DLX";
   public static final String ROUTING_KEY_DLX = "EXPIRE.#";
   public static final String ROUTING_KEY_DEAD_LETTER = "EXPIRE.10";
   public static final String ROUTING_KEY_TTL = "EXPIRE_TTL_10";

   /**
    * 1. Queue 隊列
    */
   @Bean(QUEUE_NAME_TTL)
   public Queue queue() {
      /*
       * 1. 設置隊列的過期時間 30s
       * 2. 綁定DLX
       * 3. 設置routing key(注意:這里設置的是路由到死信Queue的路由,並不是設置binding關系的路由)
       */
      return QueueBuilder.durable(QUEUE_NAME_TTL).ttl(10000).deadLetterExchange(EXCHANGE_NAME_DLX).deadLetterRoutingKey(ROUTING_KEY_DEAD_LETTER).build();
   }

   /**
    * 2. exchange
    */
   @Bean(EXCHANGE_NAME_TTL)
   public Exchange exchange() {
      return ExchangeBuilder.directExchange(EXCHANGE_NAME_TTL).durable(true).build();
   }

   /**
    * 3. 隊列和交互機綁定關系 Binding
    */
   @Bean
   public Binding bindExchange(@Qualifier(QUEUE_NAME_TTL) Queue queue, @Qualifier(EXCHANGE_NAME_TTL) Exchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_TTL).noargs();
   }

   /**
    * 4. 死信隊列
    */
   @Bean(QUEUE_NAME_DEAD_LETTER)
   public Queue deadLetterQueue() {
      return QueueBuilder.durable(QUEUE_NAME_DEAD_LETTER).build();
   }

   /**
    * 5. dlx
    */
   @Bean(EXCHANGE_NAME_DLX)
   public Exchange exchangeDlx() {
      return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
   }

   /**
    * 6. 隊列和交互機綁定關系 Binding
    */
   @Bean
   public Binding bindDlxExchange(@Qualifier(QUEUE_NAME_DEAD_LETTER) Queue queue, @Qualifier(EXCHANGE_NAME_DLX) Exchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DLX).noargs();
   }

}

3.2 Producer

import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延遲消息生產者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class DelayProducer {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   public void sendMsg() {
      String msg = "這是一條有生命周期的消息,發送時間為:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
      Message message = new Message(msg.getBytes());
      rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.EXCHANGE_NAME_TTL, RabbitDeadLetterConfig.ROUTING_KEY_TTL, message);
   }
}

3.3 Consumer

import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延遲消息消費者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Slf4j
@Component
public class DelayConsumer {

    @RabbitListener(queues = {RabbitDeadLetterConfig.QUEUE_NAME_DEAD_LETTER})
    public void dlxQueue(Message message){
        log.info(new String(message.getBody()) + ",消息接收時間為:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }
}

3.4 測試代碼

@Autowired
private DelayProducer delayProducer;

@Test
@SneakyThrows
public void sendDlxMsg() {
    delayProducer.sendMsg();
    // 使進程阻塞,方便Consumer監聽輸出Message
    System.in.read();
}

3.5 啟動測試

輸出日志內容如下:

2021-09-15 23:51:22.795  INFO 8122 --- [ntContainer#0-1] com.ldx.rabbitmq.consumer.DelayConsumer  : 這是一條有生命周期的消息,發送時間為:2021-09-15 23:51:12,消息接收時間為:2021-09-15 23:51:22


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM