Java 实现RabbitMq延时队列和死信队列


延时队列:实际是不存在直接可用的延时队列,可通过死信消息和死信队列来实现延时队列的功能。

死信交换机: DLX 全称(Dead-Letter-Exchange)。其实它是个普通的交换机,但它是设置在队列上某个参数的值对应的交换机。

死信队列如果某个队列上存在参数:x-dead-letter-exchange, 当这个队列里的消息变成死信消息(dead message)后会被重新Pushlish到 x-dead-letter-exchange 所对应参数值的交换机上,跟这个交换机所绑定的队列就是死信队列。

死信消息

  1. 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  2. 消息TTL过期
  3. 队列达到了最大的长度时

过期消息:RabbitMq 有两种设置消息过期的方式:

  1. 创建队列时通过 x-message-ttl 参数指定该队列消息的过期时间,这种队列里的消息过期时间全部相同。
  2. 生产者Pushlish消息时,通过设置消息的 expiration 参数指定过期时间,每个消息的过期时间都不一样。

  如果两者同时使用,过期时间按照小的一方为准,两种方式设置的时间都是 毫秒。

 

应用场景:延时队列的应用场景很多,在我的项目开发中也涉及到很多,例如:订单五分钟未支付自动取消、订单准备超时30分钟推送提醒给门店、订单完成后两小时推送评价邀请给用户等等,这些间隔指定时间后的操作都可以使用延时队列。

上一篇文章:Java 简单操作 RabbitMq 介绍了RabbitMq的基本操作,要引入的包和配置可以参考上一篇文章。这里就利用RabbitMq的死信队列直接来实现延时队列的功能。

 

首先创建一个自动加载类利用Bean在项目启动时,自动创建延时和死信交换机/延时和死信队列,并将创建好的队列绑定在对应的交换机上。如果交换机和队列存在的情况下,则不会创建或更新。 这一步可减少手动或忘记创建队列带来的麻烦:

package com.demo.www.rabbitmq.config;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * RabbitMq 延时队列实现
 * @author AnYuan
 */

@Slf4j
@Configuration
public class DelayQueueConfig {

    /**
     * 延迟队列
     */
    public static final String DELAY_EXCHANGE = "delay.queue.business.exchange";
    public static final String DELAY_QUEUE = "delay.queue.business.queue";
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey";

    /**
     * 死信队列
     */
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange";
    public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey";
    public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";

    /**
     * 声明 死信交换机
     * @return deadLetterExchange
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明 死信队列 用于接收死信消息
     * @return deadLetterQueueA
     */
    @Bean
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     *  将 死信队列 绑定到死信交换机上
     * @return deadLetterBindingA
     */
    @Bean
    public Binding deadLetterBindingA() {
        return BindingBuilder
                .bind(deadLetterQueueA())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_QUEUE_ROUTING_KEY);
    }

    /**
     * 声明 延时交换机
     * @return delayExchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }

    /**
     * 将 延时队列 绑定参数
     * @return Queue
     */
    @Bean
    public Queue delayQueueA() {
        Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3);
        // 队列绑定DLX参数(关键一步)
        maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 队列绑定 死信RoutingKey参数
        maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
        // 消息过期采用第一种设置队列的 ttl 时间,消息过期时间全部相同。 单位:毫秒,这里设置为8秒
        maps.put("x-message-ttl", 8000);
        return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
    }

    /**
     * 将 延时队列 绑定到延时交换机上面
     * @return delayBindingA
     */
    @Bean
    public Binding delayBindingA() {
        return BindingBuilder
                .bind(delayQueueA())
                .to(directExchange())
                .with(DELAY_QUEUE_ROUTING_KEY);
    }
}

 这里我们定义一个RabbitMq服务接口:

package com.demo.www.service;

/**
 * rabbiMq服务
 * @author AnYuan
 */

public interface RabbitMqService {

    /**
     * 统一发送mq
     *
     * @param exchange   交换机
     * @param routingKey 路由key
     * @param msg       消息
     * @param ttl       过期时间
     */
    void send(String exchange, String routingKey, String msg, Integer ttl);
}

服务接口的实现类:

package com.demo.www.service.impl;

import com.demo.www.service.RabbitMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * rabbitmq服务
 * @author AnYuan
 */

@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(String exchange, String routingKey, String msg, Integer ttl) {
        MessageProperties messageProperties = new MessageProperties();
        // 第二种方式设置消息过期时间
        messageProperties.setExpiration(ttl.toString());
        // 构建一个消息对象
        Message message = new Message(msg.getBytes(), messageProperties);
        // 发送RabbitMq消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

接着创建一个测试类进行接口测试:

 

package com.demo.www.service.impl;

import com.google.common.collect.Maps;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import com.demo.www.service.RabbitMqService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.Map;

@Slf4j
@SpringBootTest class RabbitMqServiceImplTest {

    @Autowired private RabbitMqService rabbitMqService;

    @Test public void sendTest() {
      // 手动指定消息过期时间
    int ttl = 10000;

    Map<String, Object> msgMap = Maps.newHashMapWithExpectedSize(3);
    msgMap.put("msg", "Hello RabbitMq");
    msgMap.put("time", LocalDateTime.now());
    msgMap.put("ttl", ttl);

    // 注意这里发送的交换机是 延时交换机
    rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY, JSONObject.toJSONString(msgMap), ttl);

    log.info("消息发送成功:{}", JSONObject.toJSONString(msgMap));
    
    }
}

 

 以上准备就绪后,延时队列其实已经实现了,来看一下项目启动后的情况

在RabbitMq的管理后台,可以看到自动创建的交换机

自动创建的队列,在延时队列的Features栏可以看到有: TTl、DLX、DLK。它们分别代表:(x-message-ttl):设置队列中的所有消息的生存周期,也就是过期时间;(x-dead-letter-exchange)绑定了死信交换机,死信消息会重新推送到指定交换机上而不是丢掉;(x-dead-letter-routing-key):死信消息推送到交换机上指定路由键的队列中,也就是说绑定了RoutingKey;

当运行测试类后会显示发送成功:

首先会看到延时队列里面产生了一条数据:

 

8秒后消息变成死信消息,同时会推送到死信队列里面:

这样就实现了延时队列。最后只需要创建一个消费者,消费死信队列里面的消息,注意是消费死信队列!

package com.demo.www.rabbitmq.consumers;

import com.alibaba.fastjson.JSONObject;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延时队列消息消费者
 * @author AnYuan
 */

@Component
@Slf4j
public class DelayMsgConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
            exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
    public void queueAConsumer(Message message) {

        Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
        LocalDateTime now = LocalDateTime.now();
        Duration duration = Duration.between(msg.getTime(), now);

        log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{},  相差时间:{}秒,消息设置的ttl:{}",
                JSONObject.toJSONString(msg),
                localDateTimeToString(msg.getTime()),
                localDateTimeToString(now),
                duration.getSeconds(),
                msg.getTtl());
    }

    @Data
    public static class Msg {
        private String ttl;
        private String msg;
        private LocalDateTime time;
    }

    private String localDateTimeToString(LocalDateTime localDateTime){
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        return dateTimeFormatter.format(localDateTime);
    }
}

消费者创建好后,项目启动即可看到消费的Mq消息,对比time里面的值确认为同一条消息: 

最后有一个细节:发送消息时设置的ttl为10秒,但是消息过了8秒后就变成死信消息被消费掉了,这里就是上面说的:当设置过期消息时同时使用两种方式,过期时间按照小的一方计算。

以上就是利用死信消息和死信队列实现了RabbitMq的延时队列功能,实现了间隔指定时间后做指定的逻辑,既保证了消息及时性又能将功能代码进行解耦,开发过程中可以好好利用。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM