Spring Boot 實現 RabbitMQ 延遲消費和延遲重試隊列


本文主要摘錄自:詳細介紹Spring Boot + RabbitMQ實現延遲隊列

並增加了自己的一些理解,記錄下來,以便日后查閱。

項目源碼:

背景

何為延遲隊列?

顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。
延遲隊列能做什么?延遲隊列多用於需要延遲工作的場景。最常見的是以下兩種場景:

  • 延遲消費。比如:用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單;用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。
  • 延遲重試。比如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。

如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。

實現思路

在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒后“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那么較小的那個值會被取用。更多資料請查閱官方文檔

Dead Letter Exchange

剛才提到了,被設置了TTL的消息在過期后會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的“死亡”形式:

  • 消息被拒絕。通過調用basic.reject或者basic.nack並且設置的requeue參數為false。
  • 消息因為設置了TTL而過期。
  • 消息進入了一條已經達到最大長度的隊列。

如果隊列設置了Dead Letter Exchange(DLX),那么這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱官方文檔

流程圖

聰明的你肯定已經想到了,如何將RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲隊列。

針對於上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:

延遲消費

延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩沖隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之后,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

延遲重試

延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。

如下圖所示,消費者發現該消息處理出現了異常,比如是因為網絡波動引起的異常。那么如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那么我們可以將其先放在緩沖隊列中(圖中紅色隊列),等消息經過一段的延遲時間后再次進入實際消費隊列中(圖中藍色隊列),此時由於已經過了“較長”的時間了,異常的一些波動通常已經恢復,這些消息可以被正常地消費。

代碼實現

配置隊列

從上述的流程圖中我們可以看到,一個延遲隊列的實現,需要一個緩沖隊列以及一個實際的消費隊列。又由於在RabbitMQ中,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:

  • delay_queue_per_message_ttl:TTL配置在消息上的緩沖隊列。
  • delay_queue_per_queue_ttl:TTL配置在隊列上的緩沖隊列。
  • delay_process_queue:實際消費隊列。

我們通過Java Config的方式將上述的隊列配置為Bean。由於我們添加了spring-boot-starter-amqp擴展,Spring Boot在啟動時會根據我們的配置自動創建這些隊列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的消息都會通過DLX轉發到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

@Bean
Queue delayQueuePerMessageTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter發送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .build();
}

其中,x-dead-letter-exchange聲明了隊列里的死信轉發到的DLX名稱,x-dead-letter-routing-key聲明了這些死信在轉發時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置代碼:

@Bean
Queue delayQueuePerQueueTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設置隊列的過期時間
                       .build();
}

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個x-message-ttl,該配置用來設置隊列的過期時間。

delay_process_queue

delay_process_queue的配置最為簡單:

@Bean
Queue delayProcessQueue() {
    return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                       .build();
}

配置Exchange

配置DLX

首先,我們需要配置DLX,代碼如下:

@Bean
DirectExchange delayExchange() {
    return new DirectExchange(DELAY_EXCHANGE_NAME);
}

然后再將該DLX綁定到實際消費隊列即delay_process_queue上。這樣所有的死信都會通過DLX被轉發到delay_process_queue:

@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
    return BindingBuilder.bind(delayProcessQueue)
                         .to(delayExchange)
                         .with(DELAY_PROCESS_QUEUE_NAME);
}

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之后,我們需要將消息轉發到緩沖隊列,所以緩沖隊列也需要綁定一個Exchange。在本例中,我們將delay_process_per_queue_ttl作為延遲重試里的緩沖隊列。

定義消費者

我們創建一個最簡單的消費者ProcessReceiver,這個消費者監聽delay_process_queue隊列,對於接受到的消息,他會:

  • 如果消息里的消息體不等於FAIL_MESSAGE,那么他會輸出消息體。
  • 如果消息里的消息體恰好是FAIL_MESSAGE,那么他會模擬拋出異常,然后將該消息重定向到緩沖隊列(對應延遲重試場景)。

另外,我們還需要新建一個監聽容器用於存放消費者,代碼如下:

@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監聽delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
}

至此,我們前置的配置代碼已經全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲隊列。

編寫測試用例

延遲消費場景

首先我們編寫用於測試TTL設置在消息上的測試代碼。

我們借助spring-rabbit包下提供的RabbitTemplate類來發送消息。由於我們添加了spring-boot-starter-amqp擴展,Spring Boot會在初始化時自動地將RabbitTemplate當成bean加載到容器中。

解決了消息的發送問題,那么又該如何為每個消息設置TTL呢?這里我們需要借助MessagePostProcessor。MessagePostProcessor通常用來設置消息的Header以及消息的屬性。我們新建一個ExpirationMessagePostProcessor類來負責設置消息的TTL屬性:

/**
 * 設置消息的失效時間
 */
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl; // 毫秒

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
               .setExpiration(ttl.toString()); // 設置per-message的失效時間
        return message;
    }
}

然后在調用RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊列中發送3條消息,過期時間依次為1秒,2秒和3秒。具體的代碼如下所示:

@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        long expiration = i * 1000;
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
    }
    ProcessReceiver.latch.await();
}

細心的朋友一定會問,為什么要在代碼中加一個CountDownLatch呢?這是因為如果沒有latch阻塞住測試方法的話,測試用例會直接結束,程序退出,我們就看不到消息被延遲消費的表現了。

那么類似地,測試TTL設置在隊列上的代碼如下:

@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
    }
    ProcessReceiver.latch.await();
}

我們向緩沖隊列中發送3條消息。理論上這3條消息會在4秒后同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景。

@Test
public void testFailMessage() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(6);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
    }
    ProcessReceiver.latch.await();
}

我們向delay_process_queue發送3條會觸發FAIL的消息,理論上這3條消息會在4秒后自動重試。

我的理解

延遲消費過程(每個消息可以單獨設置失效時間):

  • 1. 聲明 delay_queue_per_message_ttl 隊列:死信隊列,設置 DLX 參數,包含 x-dead-letter-exchange 表示失效后進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效后的路由鍵(值為 delay_process_queue,即實際消費隊列)。
  • 2. 聲明 delay_process_queue 隊列:實際消費隊列。
  • 3. 聲明 delay_exchange 交換機:實際消費交換機,類型為 Direct(一一對應)。
  • 4. 聲明 dlx_binding 綁定:將實際消費隊列和實際消費交換機綁定(路由鍵規則值為 delay_process_queue)。
  • 5. 發布一個消息,路由鍵為 delay_queue_per_message_ttl(發送到死信隊列),並通過 header 單獨設置每個消息的過期時間:當過期時間生效后,消息會轉到實際消費隊列。
  • 6. 聲明一個消費者,監聽 delay_process_queue 隊列(即實際消費隊列):消息正常被消費掉,達到延遲消費的目的。

延遲消費過程(所有消息統一設置失效時間):

  • 1. 聲明 delay_queue_per_queue_ttl 隊列:死信隊列,設置 DLX 參數,包含 x-dead-letter-exchange 表示失效后進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效后的路由鍵(值為 delay_process_queue,即實際消費隊列)、x-message-ttl 表示隊列消息過期時間。
  • 2. 聲明 delay_process_queue 隊列:實際消費隊列。
  • 3. 聲明 delay_exchange 交換機:實際消費交換機,類型為 Direct(一一對應)。
  • 4. 聲明 dlx_binding 綁定:將實際消費隊列和實際消費交換機綁定(路由鍵規則值為 delay_process_queue)。
  • 5. 發布一個消息,路由鍵為 delay_queue_per_queue_ttl(發送到死信隊列):當過期時間生效后,消息會轉到實際消費隊列。
  • 6. 聲明一個消費者,監聽 delay_process_queue隊列(即實際消費隊列):消息正常被消費掉,達到延遲消費的目的。

延遲重試過程

  • 1. 聲明 delay_process_queue 隊列:實際消費隊列。
  • 2. 聲明 delay_queue_per_queue_ttl 隊列:死信隊列,設置 DLX 參數,包含 x-dead-letter-exchange 表示失效后進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效后的路由鍵(值為 delay_process_queue,即實際消費隊列)、x-message-ttl 表示隊列消息過期時間。
  • 3. 聲明 delay_exchange 交換機:實際消費交換機,類型為 Direct(一一對應)。
  • 4. 聲明 per_queue_ttl_exchange 交換機:死信交換機,類型為 Direct(一一對應)。
  • 5. 聲明 dlx_binding 綁定:將實際消費隊列和實際消費交換機綁定(路由鍵規則值為 delay_process_queue)。
  • 6. 聲明 queue_ttl_binding 綁定:將死信隊列和死信交換機綁定(路由鍵規則值為 delay_queue_per_queue_ttl)。
  • 7. 發布一個消息,路由鍵為 delay_process_queue(發送到實際消費隊列)。
  • 8. 聲明一個消費者,監聽 delay_process_queue 隊列(即實際消費隊列):消費者監聽到消息,當處理過程中發生異常,消息重新發送到私信隊列,然后等待過期時間生效后,消息再轉到實際消費隊列,重新消費,以達到延遲重試的目的。

需要注意:在延遲消費的過程中,我們是沒有創建死信交換機的,那為什么還可以發布消息呢?原因是 RabbitMQ 會使用默認的 Exchange,並且創建一個默認的 Binding(類型為 Direct),通過rabbitmqadmin list bindings命令,可以看到結果。

Spring Cloud Stream RabbitMQ DLX 的實現:_rabbitmq_consumer_properties


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM