延時隊列:實際是不存在直接可用的延時隊列,可通過死信消息和死信隊列來實現延時隊列的功能。
死信交換機: DLX 全稱(Dead-Letter-Exchange)。其實它是個普通的交換機,但它是設置在隊列上某個參數的值對應的交換機。
死信隊列:如果某個隊列上存在參數:x-dead-letter-exchange, 當這個隊列里的消息變成死信消息(dead message)后會被重新Pushlish到 x-dead-letter-exchange 所對應參數值的交換機上,跟這個交換機所綁定的隊列就是死信隊列。
死信消息:
- 消息被拒絕(basic.reject / basic.nack),並且requeue = false
- 消息TTL過期
- 隊列達到了最大的長度時
過期消息:RabbitMq 有兩種設置消息過期的方式:
- 創建隊列時通過 x-message-ttl 參數指定該隊列消息的過期時間,這種隊列里的消息過期時間全部相同。
- 生產者Pushlish消息時,通過設置消息的 expiration 參數指定過期時間,每個消息的過期時間都不一樣。
如果兩者同時使用,過期時間按照小的一方為准,兩種方式設置的時間都是 毫秒。
應用場景:延時隊列的應用場景很多,在我的項目開發中也涉及到很多,例如:訂單五分鍾未支付自動取消、訂單准備超時30分鍾推送提醒給門店、訂單完成后兩小時推送評價邀請給用戶等等,這些間隔指定時間后的操作都可以使用延時隊列。
上一篇文章:Java 簡單操作 RabbitMq 介紹了RabbitMq的基本操作,要引入的包和配置可以參考上一篇文章。這里就利用RabbitMq的死信隊列直接來實現延時隊列的功能。
首先創建一個自動加載類利用Bean在項目啟動時,自動創建延時和死信交換機/延時和死信隊列,並將創建好的隊列綁定在對應的交換機上。如果交換機和隊列存在的情況下,則不會創建或更新。 這一步可減少手動或忘記創建隊列帶來的麻煩:
package com.demo.www.rabbitmq.config;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* RabbitMq 延時隊列實現
* @author AnYuan
*/
@Slf4j
@Configuration
public class DelayQueueConfig {
/**
* 延遲隊列
*/
public static final String DELAY_EXCHANGE = "delay.queue.business.exchange";
public static final String DELAY_QUEUE = "delay.queue.business.queue";
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey";
/**
* 死信隊列
*/
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange";
public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey";
public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";
/**
* 聲明 死信交換機
* @return deadLetterExchange
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 聲明 死信隊列 用於接收死信消息
* @return deadLetterQueueA
*/
@Bean
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUE);
}
/**
* 將 死信隊列 綁定到死信交換機上
* @return deadLetterBindingA
*/
@Bean
public Binding deadLetterBindingA() {
return BindingBuilder
.bind(deadLetterQueueA())
.to(deadLetterExchange())
.with(DEAD_LETTER_QUEUE_ROUTING_KEY);
}
/**
* 聲明 延時交換機
* @return delayExchange
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
/**
* 將 延時隊列 綁定參數
* @return Queue
*/
@Bean
public Queue delayQueueA() {
Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3);
// 隊列綁定DLX參數(關鍵一步)
maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 隊列綁定 死信RoutingKey參數
maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
// 消息過期采用第一種設置隊列的 ttl 時間,消息過期時間全部相同。 單位:毫秒,這里設置為8秒
maps.put("x-message-ttl", 8000);
return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
}
/**
* 將 延時隊列 綁定到延時交換機上面
* @return delayBindingA
*/
@Bean
public Binding delayBindingA() {
return BindingBuilder
.bind(delayQueueA())
.to(directExchange())
.with(DELAY_QUEUE_ROUTING_KEY);
}
}
這里我們定義一個RabbitMq服務接口:
package com.demo.www.service;
/**
* rabbiMq服務
* @author AnYuan
*/
public interface RabbitMqService {
/**
* 統一發送mq
*
* @param exchange 交換機
* @param routingKey 路由key
* @param msg 消息
* @param ttl 過期時間
*/
void send(String exchange, String routingKey, String msg, Integer ttl);
}
服務接口的實現類:
package com.demo.www.service.impl;
import com.demo.www.service.RabbitMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* rabbitmq服務
* @author AnYuan
*/
@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(String exchange, String routingKey, String msg, Integer ttl) {
MessageProperties messageProperties = new MessageProperties();
// 第二種方式設置消息過期時間
messageProperties.setExpiration(ttl.toString());
// 構建一個消息對象
Message message = new Message(msg.getBytes(), messageProperties);
// 發送RabbitMq消息
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
接着創建一個測試類進行接口測試:
package com.demo.www.service.impl;
import com.google.common.collect.Maps;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import com.demo.www.service.RabbitMqService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
import java.util.Map;
@Slf4j
@SpringBootTest class RabbitMqServiceImplTest {
@Autowired private RabbitMqService rabbitMqService;
@Test public void sendTest() {
// 手動指定消息過期時間
int ttl = 10000;
Map<String, Object> msgMap = Maps.newHashMapWithExpectedSize(3);
msgMap.put("msg", "Hello RabbitMq");
msgMap.put("time", LocalDateTime.now());
msgMap.put("ttl", ttl);
// 注意這里發送的交換機是 延時交換機
rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY, JSONObject.toJSONString(msgMap), ttl);
log.info("消息發送成功:{}", JSONObject.toJSONString(msgMap));
}
}
以上准備就緒后,延時隊列其實已經實現了,來看一下項目啟動后的情況
在RabbitMq的管理后台,可以看到自動創建的交換機

自動創建的隊列,在延時隊列的Features欄可以看到有: TTl、DLX、DLK。它們分別代表:(x-message-ttl):設置隊列中的所有消息的生存周期,也就是過期時間;(x-dead-letter-exchange)綁定了死信交換機,死信消息會重新推送到指定交換機上而不是丟掉;(x-dead-letter-routing-key):死信消息推送到交換機上指定路由鍵的隊列中,也就是說綁定了RoutingKey;

當運行測試類后會顯示發送成功:

首先會看到延時隊列里面產生了一條數據:

8秒后消息變成死信消息,同時會推送到死信隊列里面:

這樣就實現了延時隊列。最后只需要創建一個消費者,消費死信隊列里面的消息,注意是消費死信隊列!
package com.demo.www.rabbitmq.consumers;
import com.alibaba.fastjson.JSONObject;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 延時隊列消息消費者
* @author AnYuan
*/
@Component
@Slf4j
public class DelayMsgConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
public void queueAConsumer(Message message) {
Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
LocalDateTime now = LocalDateTime.now();
Duration duration = Duration.between(msg.getTime(), now);
log.info("DelayMsgConsumer死信隊列消費---->Msg:{}, 發送時間:{}, 當前時間:{}, 相差時間:{}秒,消息設置的ttl:{}",
JSONObject.toJSONString(msg),
localDateTimeToString(msg.getTime()),
localDateTimeToString(now),
duration.getSeconds(),
msg.getTtl());
}
@Data
public static class Msg {
private String ttl;
private String msg;
private LocalDateTime time;
}
private String localDateTimeToString(LocalDateTime localDateTime){
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTimeFormatter.format(localDateTime);
}
}
消費者創建好后,項目啟動即可看到消費的Mq消息,對比time里面的值確認為同一條消息: 
最后有一個細節:發送消息時設置的ttl為10秒,但是消息過了8秒后就變成死信消息被消費掉了,這里就是上面說的:當設置過期消息時同時使用兩種方式,過期時間按照小的一方計算。
以上就是利用死信消息和死信隊列實現了RabbitMq的延時隊列功能,實現了間隔指定時間后做指定的邏輯,既保證了消息及時性又能將功能代碼進行解耦,開發過程中可以好好利用。
