Java 實現RabbitMq延時隊列和死信隊列


延時隊列:實際是不存在直接可用的延時隊列,可通過死信消息和死信隊列來實現延時隊列的功能。

死信交換機: DLX 全稱(Dead-Letter-Exchange)。其實它是個普通的交換機,但它是設置在隊列上某個參數的值對應的交換機。

死信隊列如果某個隊列上存在參數:x-dead-letter-exchange, 當這個隊列里的消息變成死信消息(dead message)后會被重新Pushlish到 x-dead-letter-exchange 所對應參數值的交換機上,跟這個交換機所綁定的隊列就是死信隊列。

死信消息

  1. 消息被拒絕(basic.reject / basic.nack),並且requeue = false
  2. 消息TTL過期
  3. 隊列達到了最大的長度時

過期消息:RabbitMq 有兩種設置消息過期的方式:

  1. 創建隊列時通過 x-message-ttl 參數指定該隊列消息的過期時間,這種隊列里的消息過期時間全部相同。
  2. 生產者Pushlish消息時,通過設置消息的 expiration 參數指定過期時間,每個消息的過期時間都不一樣。

  如果兩者同時使用,過期時間按照小的一方為准,兩種方式設置的時間都是 毫秒。

 

應用場景:延時隊列的應用場景很多,在我的項目開發中也涉及到很多,例如:訂單五分鍾未支付自動取消、訂單准備超時30分鍾推送提醒給門店、訂單完成后兩小時推送評價邀請給用戶等等,這些間隔指定時間后的操作都可以使用延時隊列。

上一篇文章:Java 簡單操作 RabbitMq 介紹了RabbitMq的基本操作,要引入的包和配置可以參考上一篇文章。這里就利用RabbitMq的死信隊列直接來實現延時隊列的功能。

 

首先創建一個自動加載類利用Bean在項目啟動時,自動創建延時和死信交換機/延時和死信隊列,並將創建好的隊列綁定在對應的交換機上。如果交換機和隊列存在的情況下,則不會創建或更新。 這一步可減少手動或忘記創建隊列帶來的麻煩:

package com.demo.www.rabbitmq.config;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * RabbitMq 延時隊列實現
 * @author AnYuan
 */

@Slf4j
@Configuration
public class DelayQueueConfig {

    /**
     * 延遲隊列
     */
    public static final String DELAY_EXCHANGE = "delay.queue.business.exchange";
    public static final String DELAY_QUEUE = "delay.queue.business.queue";
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey";

    /**
     * 死信隊列
     */
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange";
    public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey";
    public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";

    /**
     * 聲明 死信交換機
     * @return deadLetterExchange
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 聲明 死信隊列 用於接收死信消息
     * @return deadLetterQueueA
     */
    @Bean
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     *  將 死信隊列 綁定到死信交換機上
     * @return deadLetterBindingA
     */
    @Bean
    public Binding deadLetterBindingA() {
        return BindingBuilder
                .bind(deadLetterQueueA())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_QUEUE_ROUTING_KEY);
    }

    /**
     * 聲明 延時交換機
     * @return delayExchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }

    /**
     * 將 延時隊列 綁定參數
     * @return Queue
     */
    @Bean
    public Queue delayQueueA() {
        Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3);
        // 隊列綁定DLX參數(關鍵一步)
        maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 隊列綁定 死信RoutingKey參數
        maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
        // 消息過期采用第一種設置隊列的 ttl 時間,消息過期時間全部相同。 單位:毫秒,這里設置為8秒
        maps.put("x-message-ttl", 8000);
        return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
    }

    /**
     * 將 延時隊列 綁定到延時交換機上面
     * @return delayBindingA
     */
    @Bean
    public Binding delayBindingA() {
        return BindingBuilder
                .bind(delayQueueA())
                .to(directExchange())
                .with(DELAY_QUEUE_ROUTING_KEY);
    }
}

 這里我們定義一個RabbitMq服務接口:

package com.demo.www.service;

/**
 * rabbiMq服務
 * @author AnYuan
 */

public interface RabbitMqService {

    /**
     * 統一發送mq
     *
     * @param exchange   交換機
     * @param routingKey 路由key
     * @param msg       消息
     * @param ttl       過期時間
     */
    void send(String exchange, String routingKey, String msg, Integer ttl);
}

服務接口的實現類:

package com.demo.www.service.impl;

import com.demo.www.service.RabbitMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * rabbitmq服務
 * @author AnYuan
 */

@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(String exchange, String routingKey, String msg, Integer ttl) {
        MessageProperties messageProperties = new MessageProperties();
        // 第二種方式設置消息過期時間
        messageProperties.setExpiration(ttl.toString());
        // 構建一個消息對象
        Message message = new Message(msg.getBytes(), messageProperties);
        // 發送RabbitMq消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

接着創建一個測試類進行接口測試:

 

package com.demo.www.service.impl;

import com.google.common.collect.Maps;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import com.demo.www.service.RabbitMqService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.Map;

@Slf4j
@SpringBootTest class RabbitMqServiceImplTest {

    @Autowired private RabbitMqService rabbitMqService;

    @Test public void sendTest() {
      // 手動指定消息過期時間
    int ttl = 10000;

    Map<String, Object> msgMap = Maps.newHashMapWithExpectedSize(3);
    msgMap.put("msg", "Hello RabbitMq");
    msgMap.put("time", LocalDateTime.now());
    msgMap.put("ttl", ttl);

    // 注意這里發送的交換機是 延時交換機
    rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY, JSONObject.toJSONString(msgMap), ttl);

    log.info("消息發送成功:{}", JSONObject.toJSONString(msgMap));
    
    }
}

 

 以上准備就緒后,延時隊列其實已經實現了,來看一下項目啟動后的情況

在RabbitMq的管理后台,可以看到自動創建的交換機

自動創建的隊列,在延時隊列的Features欄可以看到有: TTl、DLX、DLK。它們分別代表:(x-message-ttl):設置隊列中的所有消息的生存周期,也就是過期時間;(x-dead-letter-exchange)綁定了死信交換機,死信消息會重新推送到指定交換機上而不是丟掉;(x-dead-letter-routing-key):死信消息推送到交換機上指定路由鍵的隊列中,也就是說綁定了RoutingKey;

當運行測試類后會顯示發送成功:

首先會看到延時隊列里面產生了一條數據:

 

8秒后消息變成死信消息,同時會推送到死信隊列里面:

這樣就實現了延時隊列。最后只需要創建一個消費者,消費死信隊列里面的消息,注意是消費死信隊列!

package com.demo.www.rabbitmq.consumers;

import com.alibaba.fastjson.JSONObject;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延時隊列消息消費者
 * @author AnYuan
 */

@Component
@Slf4j
public class DelayMsgConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
            exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
    public void queueAConsumer(Message message) {

        Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
        LocalDateTime now = LocalDateTime.now();
        Duration duration = Duration.between(msg.getTime(), now);

        log.info("DelayMsgConsumer死信隊列消費---->Msg:{}, 發送時間:{}, 當前時間:{},  相差時間:{}秒,消息設置的ttl:{}",
                JSONObject.toJSONString(msg),
                localDateTimeToString(msg.getTime()),
                localDateTimeToString(now),
                duration.getSeconds(),
                msg.getTtl());
    }

    @Data
    public static class Msg {
        private String ttl;
        private String msg;
        private LocalDateTime time;
    }

    private String localDateTimeToString(LocalDateTime localDateTime){
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        return dateTimeFormatter.format(localDateTime);
    }
}

消費者創建好后,項目啟動即可看到消費的Mq消息,對比time里面的值確認為同一條消息: 

最后有一個細節:發送消息時設置的ttl為10秒,但是消息過了8秒后就變成死信消息被消費掉了,這里就是上面說的:當設置過期消息時同時使用兩種方式,過期時間按照小的一方計算。

以上就是利用死信消息和死信隊列實現了RabbitMq的延時隊列功能,實現了間隔指定時間后做指定的邏輯,既保證了消息及時性又能將功能代碼進行解耦,開發過程中可以好好利用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM