原理
生產者把帶有 ttl(Time-To-Live過期時間) 的消息發送到一個臨時隊列(DelayQueue),該隊列沒有消費者;
該消息在DelayQueue中停留直至過期,同時該消息沒有ReQueue(重新入隊),就變成了死信(Dead-letter或Dead-message),死信自動地被發送給了配置好的DLX(Dead-Letter-Exchange);
DLX根據路由規則把消息路由到了配置好的隊列中(DeadLetterQueue),隊列中的消息被消費者消費。
maven依賴
引入amqp的依賴, 生產者和消費者都需要
<!--amqp 適用rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
boostrap配置
同樣,生產者和消費者都需要
properties.yaml:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
使用配置類創建各組件
rabbitmq的基本的組件是Exchange(交換機)、Queue(隊列)、Binding(綁定)。實現延時隊列需要定義如下組件:
- 一個帶ttl的Queue臨時隊列;
- 一個普通的用於業務的Queue,即死信隊列;
- 定義一個Exchange,即死信交換器DLX(Dead-Letter-Exchange);
- 再定義一個Bingding把死信隊列和死信交換器綁定在一起。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueueConfig {
/**為了更貼合業務,參數名不使用DeadQueue之類的*/
/**延遲隊列名*/
private static String DELAY_QUEUE = "delay.queue";
/**延遲隊列(死信隊列)交換器名*/
private static String DELAY_EXCHANGE = "delay.exchange";
/**處理業務的隊列(死信隊列)*/
private static String PROCESS_QUEUE = "process.queue";
/**ttl(10秒)*/
private static int DELAY_EXPIRATION = 10000;
/**
* 創建延遲隊列
* "x-dead-letter-exchange"參數定義死信隊列交換機
* "x-dead-letter-routing-key"定義死信隊列中的消息重定向時的routing-key
* "x-message-ttl"定義消息的過期時間
*/
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE)
.withArgument("x-dead-letter-routing-key", PROCESS_QUEUE)
.withArgument("x-message-ttl", DELAY_EXPIRATION)
.build();
}
/**創建用於業務的隊列*/
@Bean
public Queue processQueue(){
return QueueBuilder.durable(PROCESS_QUEUE)
.build();
}
/**創建一個DirectExchange*/
@Bean
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE);
}
/**綁定Exchange和queue,把消息重定向到業務queue*/
@Bean
Binding dlxBinding(DirectExchange directExchange, Queue processQueue){
return BindingBuilder.bind(processQueue)
.to(directExchange)
.with(PROCESS_QUEUE);
//綁定,以PROCESS_QUEUE為routing key
}
}
生產者創建發送消息的組件
@Component
public class MessageSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String routingKey, String msg) {
//amqpTemplate.convertAndSend("process.queue", msg);
amqpTemplate.convertAndSend(routingKey, msg);
}
}
消費者自動接收並處理消息的組件
@Component
//注意監聽的Queue是用於業務的ProcessQueue, 而不是臨時存放消息的DelayQueue
@RabbitListener(queues = "process.queue")
public class MessageReceiver {
@RabbitHandler()
public void doSth(String msg) {
//TODO
}
}
拓展
該方案存在的缺點:消息會堆積在隊列中,如果隊列已滿,新的消息會變為死信,會直接重發送到死信隊列,此時就沒有“延遲”的效果。