延時隊列:實際是不存在直接可用的延時隊列,可通過死信消息和死信隊列來實現延時隊列的功能。
死信交換機: DLX 全稱(Dead-Letter-Exchange)。其實它是個普通的交換機,但它是設置在隊列上某個參數的值對應的交換機。
死信隊列:如果某個隊列上存在參數:x-dead-letter-exchange, 當這個隊列里的消息變成死信消息(dead message)后會被重新Pushlish到 x-dead-letter-exchange 所對應參數值的交換機上,跟這個交換機所綁定的隊列就是死信隊列。
死信消息:
- 消息被拒絕(basic.reject / basic.nack),並且requeue = false
- 消息TTL過期
- 隊列達到了最大的長度時
過期消息:RabbitMq 有兩種設置消息過期的方式:
- 創建隊列時通過 x-message-ttl 參數指定該隊列消息的過期時間,這種隊列里的消息過期時間全部相同。
- 生產者Pushlish消息時,通過設置消息的 expiration 參數指定過期時間,每個消息的過期時間都不一樣。
如果兩者同時使用,過期時間按照小的一方為准,兩種方式設置的時間都是 毫秒。
應用場景:延時隊列的應用場景很多,在我的項目開發中也涉及到很多,例如:訂單五分鍾未支付自動取消、訂單准備超時30分鍾推送提醒給門店、訂單完成后兩小時推送評價邀請給用戶等等,這些間隔指定時間后的操作都可以使用延時隊列。
上一篇文章:Java 簡單操作 RabbitMq 介紹了RabbitMq的基本操作,要引入的包和配置可以參考上一篇文章。這里就利用RabbitMq的死信隊列直接來實現延時隊列的功能。
首先創建一個自動加載類利用Bean在項目啟動時,自動創建延時和死信交換機/延時和死信隊列,並將創建好的隊列綁定在對應的交換機上。如果交換機和隊列存在的情況下,則不會創建或更新。 這一步可減少手動或忘記創建隊列帶來的麻煩:
package com.demo.www.rabbitmq.config; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Map; /** * RabbitMq 延時隊列實現 * @author AnYuan */ @Slf4j @Configuration public class DelayQueueConfig { /** * 延遲隊列 */ public static final String DELAY_EXCHANGE = "delay.queue.business.exchange"; public static final String DELAY_QUEUE = "delay.queue.business.queue"; public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey"; /** * 死信隊列 */ public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange"; public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey"; public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue"; /** * 聲明 死信交換機 * @return deadLetterExchange */ @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } /** * 聲明 死信隊列 用於接收死信消息 * @return deadLetterQueueA */ @Bean public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUE); } /** * 將 死信隊列 綁定到死信交換機上 * @return deadLetterBindingA */ @Bean public Binding deadLetterBindingA() { return BindingBuilder .bind(deadLetterQueueA()) .to(deadLetterExchange()) .with(DEAD_LETTER_QUEUE_ROUTING_KEY); } /** * 聲明 延時交換機 * @return delayExchange */ @Bean public DirectExchange directExchange() { return new DirectExchange(DELAY_EXCHANGE); } /** * 將 延時隊列 綁定參數 * @return Queue */ @Bean public Queue delayQueueA() { Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3); // 隊列綁定DLX參數(關鍵一步) maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 隊列綁定 死信RoutingKey參數 maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY); // 消息過期采用第一種設置隊列的 ttl 時間,消息過期時間全部相同。 單位:毫秒,這里設置為8秒 maps.put("x-message-ttl", 8000); return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build(); } /** * 將 延時隊列 綁定到延時交換機上面 * @return delayBindingA */ @Bean public Binding delayBindingA() { return BindingBuilder .bind(delayQueueA()) .to(directExchange()) .with(DELAY_QUEUE_ROUTING_KEY); } }
這里我們定義一個RabbitMq服務接口:
package com.demo.www.service; /** * rabbiMq服務 * @author AnYuan */ public interface RabbitMqService { /** * 統一發送mq * * @param exchange 交換機 * @param routingKey 路由key * @param msg 消息 * @param ttl 過期時間 */ void send(String exchange, String routingKey, String msg, Integer ttl); }
服務接口的實現類:
package com.demo.www.service.impl; import com.demo.www.service.RabbitMqService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * rabbitmq服務 * @author AnYuan */ @Service @Slf4j public class RabbitMqServiceImpl implements RabbitMqService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void send(String exchange, String routingKey, String msg, Integer ttl) { MessageProperties messageProperties = new MessageProperties(); // 第二種方式設置消息過期時間 messageProperties.setExpiration(ttl.toString()); // 構建一個消息對象 Message message = new Message(msg.getBytes(), messageProperties); // 發送RabbitMq消息 rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
接着創建一個測試類進行接口測試:
package com.demo.www.service.impl; import com.google.common.collect.Maps; import com.demo.www.rabbitmq.config.DelayQueueConfig; import com.demo.www.service.RabbitMqService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.time.LocalDateTime; import java.util.Map; @Slf4j @SpringBootTest class RabbitMqServiceImplTest { @Autowired private RabbitMqService rabbitMqService; @Test public void sendTest() { // 手動指定消息過期時間 int ttl = 10000; Map<String, Object> msgMap = Maps.newHashMapWithExpectedSize(3); msgMap.put("msg", "Hello RabbitMq"); msgMap.put("time", LocalDateTime.now()); msgMap.put("ttl", ttl); // 注意這里發送的交換機是 延時交換機 rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY, JSONObject.toJSONString(msgMap), ttl); log.info("消息發送成功:{}", JSONObject.toJSONString(msgMap)); } }
以上准備就緒后,延時隊列其實已經實現了,來看一下項目啟動后的情況
在RabbitMq的管理后台,可以看到自動創建的交換機
自動創建的隊列,在延時隊列的Features欄可以看到有: TTl、DLX、DLK。它們分別代表:(x-message-ttl):設置隊列中的所有消息的生存周期,也就是過期時間;(x-dead-letter-exchange)綁定了死信交換機,死信消息會重新推送到指定交換機上而不是丟掉;(x-dead-letter-routing-key):死信消息推送到交換機上指定路由鍵的隊列中,也就是說綁定了RoutingKey;
當運行測試類后會顯示發送成功:
首先會看到延時隊列里面產生了一條數據:
8秒后消息變成死信消息,同時會推送到死信隊列里面:
這樣就實現了延時隊列。最后只需要創建一個消費者,消費死信隊列里面的消息,注意是消費死信隊列!
package com.demo.www.rabbitmq.consumers; import com.alibaba.fastjson.JSONObject; import com.demo.www.rabbitmq.config.DelayQueueConfig; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * 延時隊列消息消費者 * @author AnYuan */ @Component @Slf4j public class DelayMsgConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE), exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE))) public void queueAConsumer(Message message) { Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class); LocalDateTime now = LocalDateTime.now(); Duration duration = Duration.between(msg.getTime(), now); log.info("DelayMsgConsumer死信隊列消費---->Msg:{}, 發送時間:{}, 當前時間:{}, 相差時間:{}秒,消息設置的ttl:{}", JSONObject.toJSONString(msg), localDateTimeToString(msg.getTime()), localDateTimeToString(now), duration.getSeconds(), msg.getTtl()); } @Data public static class Msg { private String ttl; private String msg; private LocalDateTime time; } private String localDateTimeToString(LocalDateTime localDateTime){ DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return dateTimeFormatter.format(localDateTime); } }
消費者創建好后,項目啟動即可看到消費的Mq消息,對比time里面的值確認為同一條消息:
最后有一個細節:發送消息時設置的ttl為10秒,但是消息過了8秒后就變成死信消息被消費掉了,這里就是上面說的:當設置過期消息時同時使用兩種方式,過期時間按照小的一方計算。
以上就是利用死信消息和死信隊列實現了RabbitMq的延時隊列功能,實現了間隔指定時間后做指定的邏輯,既保證了消息及時性又能將功能代碼進行解耦,開發過程中可以好好利用。