發送消息回調,消息接收確認,死信隊列,延遲隊列
RabbitMQ發送消息回調
主要是實現兩個接口,在實現之前需要加上兩個比較重要的配置。
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
然后就是實現回調方法:
@Configuration
public class RmqConfig {
@Bean
public RabbitTemplate createRmqTemplate(ConnectionFactory connectionFactory){
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMandatory(true);
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("確認回調-相關數據:"+correlationData);
System.out.println("確認回調-確認情況:"+b);
System.out.println("確認回調-原因:"+s);
}
});
template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("返回回調-消息:"+message);
System.out.println("返回回調-回應碼:"+i);
System.out.println("返回回調-回應信息:"+s);
System.out.println("返回回調-交換機:"+s1);
System.out.println("返回回調-路由鍵:"+s2);
}
});
return template;
}
}
RabbitMQ接收消息確認
消息確認在防止消息丟失的時候起到很重要的作用,RabbitMQ支持消息確認ACK。ACK機制是消費者從RabbitMQ收到消息並且處理完成之后,反饋給RabbitMQ,RabbitMQ收到反饋后再將消息從隊列中刪除。消費者如果沒有正常接收消息,沒有進行ACK反饋,那么RMQ就不會將這個消息刪除,而是認為此消息沒有被正常消費,將消息重新放到隊列里面。當然,如果是集群模式下,這個消息會被推送到其他消費者,ACK機制在RMQ中是默認開啟的。
自動確認
這也是默認的消息確認情況,RabbitMQ成功將信息發出,即成功將消息寫入到TCP連接中則認為這次消息投遞已經被正確處理,而不在乎消費者端是否正確處理了。所以這種模式下消費者如果報異常了等情況沒有正確地處理這個消息,那么RabbitMQ也不能知道,所以這條消息就丟失了。
spring.rabbitmq.listener.simple.acknowledge=auto
不確認
spring.rabbitmq.listener.simple.acknowledge=none
手動確認
spring.rabbitmq.listener.simple.acknowledge=manual
主要涉及的是三個API,也就是Channel里面的basicAck,baseNack和basicReject。三個方法調用都是代表消息被正確投遞,但是只有basicAck表示消息被正確處理。值得一提的是basicReject,在實際生產中難免需要將消息重回隊列的場景,那么這時候需要在第二個參數傳入true,表示將消息重新回到隊列。而設置為false則代表這條消息消費者端已經收到但是並不想下次還收到,所以這條消息不進入隊列而是直接丟掉。然后是basicNack,這個API代表不消費某條消息。三個方法的參數列表和其意義為:
void basicAck(deliveryTag,multiple)
deliveryTag 當前消息的唯一ID
multiple 是否批量,即是否一次性ack所有小於當前消息deliveryTag的消息
void basicNack(deliveryTag,multiple,requeue)
deliveryTag 當前消息的唯一ID。
multiple 是否批量,即一次性針對當前channel的消息的tagID小於當前這條消息的,都拒絕確認。
requeue 是指是否重新入列,也就是指不確定的消息是否重新丟回到隊列里去。
void basicReject(deliveryTag,requeue)
deliveryTag 當前消息的唯一ID。
requeue 是指是否重新入列,也就是被拒絕的消息是否重新丟回到隊列里去。
代碼實現
手動確認需要加入相關配置來配置對哪些隊列進行手動確認,然后添加回調。
//比如這里對DirectQueue,fanout.A,fanout.B,fanout.C進行手動確認
@Configuration
public class AckConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private AckReceiver ackReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(){
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(connectionFactory);
smlc.setConcurrentConsumers(1);
smlc.setMaxConcurrentConsumers(1);
smlc.setAcknowledgeMode(AcknowledgeMode.MANUAL);
smlc.addQueues(new Queue("DirectQueue",true));
smlc.addQueues(new Queue("fanout.A",true));
smlc.addQueues(new Queue("fanout.B",true));
smlc.addQueues(new Queue("fanout.C",true));
smlc.setMessageListener(ackReceiver);
return smlc;
}
}
//實現ChannelAwareMessageListener
@Component
public class AckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
String text = message.toString();
String[] msgArr = text.split("'");
Map<String, String> msgMap = mapStringToMap(msgArr[1].trim(),3);
String messageId=msgMap.get("msgId");
String messageData=msgMap.get("msgData");
String createTime=msgMap.get("createTime");
System.out.println("MessageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("Message from :"+message.getMessageProperties().getConsumerQueue());
//手動確認
channel.basicAck(deliveryTag, true);
}catch (Exception e){
channel.basicReject(deliveryTag,false);
e.printStackTrace();
}
}
private Map<String, String> mapStringToMap(String str,int entryNum ) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",",entryNum);
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
這里我采用的postman來調用生產者的發送信息的接口,代碼就沒啥好說的了,上一篇有講。
然后怎么樣才能看出有沒有確認的效果呢?不要忘了,RMQ有自帶的管理頁面的插件,上面可以瀏覽隊列的相關信息。首先將上面的配置暫時注釋,然后調用生產者發送信息,打開管理頁面的Queue選項。可以看到:
這四個隊列都有一條沒有ack的消息,然后將之前的注釋去掉使得代碼生效。再查看控制台輸出和管理頁面的情況。
綜上就可以得出這些消息的的確確被確認收到了。
死信隊列
概念
死信即DeadLetter,是RMQ中的一種消息機制,死信消息會被RMQ特殊處理,如果配置了死信隊列,那么死信消息會被扔進死信隊列,如果沒有,這條消息會被丟棄。
死信出現條件
1,消息被否定確認,即上面的basicNack,basicReject,並且requeue屬性設置為false,即不重新進入隊列。
2,消息在隊列中的存活時間超過設置TTL。
3,消息隊列的消息數量已經超過了最大隊列長度。
配置
業務隊列與死信交換機的綁定是在構建業務隊列時,通過參數(x-dead-letter-exchange和x-dead-letter-routing-key)的形式進行指定。具體過程分為三步:
1,配置業務隊列並綁定在業務交換機上。
2,為業務隊列配置死信交換機和routingkey。
3,為死信交換機配置死信隊列。
demo
死信隊列一般是在業務隊列的基礎上進行配置的。這里設置兩個業務隊列(A,B)和兩個死信隊列(A,B),綁定各自的交換機。同時為了顯現出死信隊列的作用,兩個業務隊列其中一個拋出異常,從而讓沒有被正常消費的信息進入死信隊列。然后死信隊列的消費者進行消費。
以下是隊列申明和配置。
@Configuration
public class DLConfig {
public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
public static final String BUSINESS_QUEUE_A_NAME = "business.queue_a";
public static final String BUSINESS_QUEUE_B_NAME = "business.queue_b";
public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dl_queue_a.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dl_queue_b.routingkey";
public static final String DEAD_LETTER_QUEUE_A_NAME = "queue_a";
public static final String DEAD_LETTER_QUEUE_B_NAME = "queue_b";
// 聲明業務Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明業務隊列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
//聲明當前隊列綁定的“死信交換機”
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//聲明當前隊列的死信“路由key”
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_A_NAME).withArguments(args).build();
}
// 聲明業務隊列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
//聲明當前隊列綁定的“死信交換機”
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//聲明當前隊列的“死信路由key”
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_B_NAME).withArguments(args).build();
}
// 聲明死信隊列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE_A_NAME);
}
// 聲明死信隊列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUE_B_NAME);
}
// 聲明業務隊列A綁定關系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明業務隊列B綁定關系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明死信隊列A綁定關系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 聲明死信隊列B綁定關系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
}
然后對業務隊列中的消息進行消費。
@Slf4j
@Component
public class BusinessMsgReceiver {
// 監聽器處理方法(監聽業務隊列A——BUSINESS_QUEUE_A_NAME)
@RabbitListener(queues = BUSINESS_QUEUE_A_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業務消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
// 如果消息中包含“deadletter”, 則拋出異常,對接收到的消息返回“Nack”確認——讓消息進入死信隊列中
if (msg.contains("deadletter")){
throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
}
if (!ack){
log.error("消息消費發生異常,error msg:{}", exception.getMessage(), exception);
// 進行非正常的“nack”確認
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
// 進行正常的“ack”確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 監聽器處理方法(監聽業務隊列B——BUSINESS_QUEUE_B_NAME)
@RabbitListener(queues = BUSINESS_QUEUE_B_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到業務消息B:" + new String(message.getBody()));
// 進行正常的“ack”確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
然后對A,B兩個死信隊列設置消費者。
@Component
public class DLMsgReceiver {
// 監聽器處理方法(監聽死信隊列A——DEAD_LETTER_QUEUE_A_NAME)
@RabbitListener(queues = DEAD_LETTER_QUEUE_A_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
// 進行正常的“ack”確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
// 監聽器處理方法(監聽死信隊列B——DEAD_LETTER_QUEUE_B_NAME)
@RabbitListener(queues = DEAD_LETTER_QUEUE_B_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
// 進行正常的“ack”確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生產消息。
@GetMapping("/sendBusinessMsg")
public String sendDirectMessage4(){
//消息包含deadletter字樣從而產生異常 rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", "deadletter");
return "send ok";
}
這里在業務隊列A中拋出了異常,所以A中的消息會進入死信交換機,這里提一嘴死信隊列在RMQ中的模式圖:
A中的消息在進入死信交換機后,被路由到相應的死信隊列,上面兩個死信隊列是綁定在一個死信交換機上的,所以兩個死信隊列里都有消息,所以這兩個隊列的消費者也會有相應的輸出。
延遲隊列
物如其名,延遲隊列從字面意思來理解就是消息延遲某個時間再處理,一般用RMQ來實現延遲隊列可以使用上面所提到的死信隊列,那么延遲怎么處理呢,那自然是消息的TTL。過期的消息被放到死信隊列里,然后再由專門的消費者消費死信隊列中的消息,就可以實現延遲隊列了。說干就干,且不說這個方案有啥方案吧,先來按照上面思路實現以下延遲隊列。
1,申明隊列,交換機等信息。一共有AB兩個隊列,為了比較出效果,兩個隊列的TTL是不同的。
@Configuration
public class DelayQueueConfig {
public static final String DELAY_QUEUE_EXCHANGE_NAME = "Delay.queue.business.exchange";
public static final String DELAY_QUEUE_NAMEA = "Delay.queue.business.queue.A";
public static final String DELAY_QUEUE_NAMEB = "Delay.queue.business.queue.B";
public static final String DELAY_QUEUE_KEY_A = "Delay.queue.business.queue.a.key";
public static final String DELAY_QUEUE_KEY_B = "Delay.queue.business.queue.b.key";
public static final String DEAD_LETTER_EXCHANGE = "Delay.queue.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_KEY = "Delay.queue.deadletter.a.key";
public static final String DEAD_LETTER_QUEUEB_KEY = "Delay.queue.deadletter.b.key";
public static final String DEAD_LETTER_QUEUEA_NAME = "Delay.queue.deadletter.queue.A";
public static final String DEAD_LETTER_QUEUEB_NAME = "Delay.queue.deadletter.queue.B";
// 聲明延時Exchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_QUEUE_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明延時隊列A 延時10s
// 並綁定到對應的死信交換機
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_KEY);
// x-message-ttl 聲明隊列的TTL
args.put("x-message-ttl", 6000);
return QueueBuilder.durable(DELAY_QUEUE_NAMEA).withArguments(args).build();
}
// 聲明延時隊列B 延時 60s
// 並綁定到對應的死信交換機
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_KEY);
// x-message-ttl 聲明隊列的TTL
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_QUEUE_NAMEB).withArguments(args).build();
}
// 聲明死信隊列A 用於接收延時10s處理的消息
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 聲明死信隊列B 用於接收延時60s處理的消息
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 聲明延時隊列A綁定關系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_KEY_A);
}
// 聲明業務隊列B綁定關系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_KEY_B);
}
// 聲明死信隊列A綁定關系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_KEY);
}
// 聲明死信隊列B綁定關系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_KEY);
}
}
2,申明消費者,這沒什么太多要說的,解析出消息打印然后手動ACK。
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
3,生產者生產消息,代碼沒啥好說的就不貼了。發兩條消息分別到AB兩個TTL不同的隊列中,同時查看控制台收消息情況:
4,雖然很愉快地實現了,但是仔細一想這個方案還是會有問題,那就是每增加一種新的TTL不同的業務場景,就需要新增一個隊列,對於TTL很多的情況,我們就需要創建很多很多隊列,這肯定是不行的。
延遲隊列 V2.0
既然問題處在隊列上,那么是否可以將思路轉換到消息本身上呢,答案自然是可以,不要忘記消息本身也有TTL屬性,先不考慮這個方案的好壞,先試着實現一下,我們增加一個不帶有TTL屬性的隊列。這個隊列用來存放各種不同TTL的消息,這樣不就可以達到我們的要求了么。
1,增加隊列C。
public static final String DELAY_QUEUEC_NAME = "Delay.queue.business.queuec";
public static final String DELAY_QUEUEC_KEY = "Delay.queue.business.queuec.routingkey";
public static final String DEAD_LETTER_QUEUEC__KEY = "Delay.queue.deadletter.c.key";
public static final String DEAD_LETTER_QUEUEC_NAME = "Delay.queue.deadletter.queue.C";
// 聲明延時隊列C 不設置TTL
// 並綁定到對應的死信交換機
@Bean("delayQueueC")
public Queue delayQueueC(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC__KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
// 聲明死信隊列C 用於接收延時任意時長處理的消息
@Bean("deadLetterQueueC")
public Queue deadLetterQueueC(){
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
// 聲明延時列C綁定關系
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_KEY);
}
// 聲明死信隊列C綁定關系
@Bean
public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC__KEY);
}
2,生產者需要在發送的時候來對消息的屬性進行設置,先后發送兩條信息,一條過期時間為6S,一條為60S,然后觀察死信消費者的輸出情況。
設置消息的過期時間。
@GetMapping("/sendDelayMsg")
public String sendDelayMsg(){
String msgA = "hello world,the expire is 6";
String msgB = "hello world,the expire is 60";
log.info("當前時間:{},發送,msg:{}", new Date(), msgA);
//TTL隊列 (queue的擴展參數)
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_EXCHANGE_NAME,DelayQueueConfig.DELAY_QUEUEC_KEY,msgA,msg->{
msg.getMessageProperties().setExpiration(String.valueOf(6000));
return msg;
});
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_EXCHANGE_NAME,DelayQueueConfig.DELAY_QUEUEC_KEY,msgB,msg->{
msg.getMessageProperties().setExpiration(String.valueOf(60000));
return msg;
});
return "ok";
}
3,那么上面就算完成了?並不是,再試一下將過期時間長的放在前面,過期時間短的放在后面。再觀察消費者這邊的情況。
可以看到盡管后面的消息過期時間比前面的消息短,但是仍然沒有及時地被處理,而是等到前面的消息過期才被處理。這是因為消息隊列檢查消息過期的時候不是對整個隊列中的消息進行檢查,而是對隊首的消息進行檢查,所以在第二種情況下,后面的消息盡管已經過期,但還是需要等到前面過期時間長的消息被處理之后才能得到處理。所以使用這種方案實現延遲隊列也有一定的弊端。
延遲隊列V3.0
在比較前面兩種方案之后現在來嘗試第三種方案,那就是用RabbitMQ自己的延遲隊列插件。
1,還是繼續申明相關的隊列信息。
@Configuration
public class RmqDelayQueueConfig {
public static final String DELAY_QUEUE_NAME = "Delay.queue.name.demo";
public static final String DELAY_EXCHANGE_NAME = "Delay.queue.exchange.demo";
public static final String DELAY_QUEUE_KEY = "Delay.queue.key.demo";
@Bean
public Queue delayQueue(){
return new Queue(DELAY_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange(){
Map<String,Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
@Bean
public Binding bindingNotify(@Qualifier("delayQueue")Queue queue,
@Qualifier("customExchange")CustomExchange customExchange){
return BindingBuilder.bind(delayQueue()).to(customExchange()).with(DELAY_QUEUE_KEY).noargs();
}
}
2,發送消息,這里需要設置MessageProperties中的屬性,注意區分setDelay()和setExpiration()。
@Component
public class DelayMsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg,int expire){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_KEY, msg, item ->{
item.getMessageProperties().setExpiration(String.valueOf(expire));
return item;
});
}
}
3,消費者還是和之前一樣,但是這里只測試V2.0中的第二種情況,看看過期時間短的信息放在后面是否能被隊列提前發現並處理。
寫一個簡單的http接口來發送消息。
@Slf4j
@RestController
public class RmqDelayController {
@Autowired
private DelayMsgSender sender;
@GetMapping("/sendDelayMsg/{msg}/{time}")
public void send(@PathVariable("msg") String msg, @PathVariable("time")Integer delayedTime){
log.info("當前時間:{},收到請求,msg:{},delayTime:{}", new Date(), msg, delayedTime);
sender.sendMsg(msg, delayedTime);
}
}
用postman測試一下,發送兩個這樣的請求。
http://localhost:9001/sendDelayMsg/helloworld,the expire is 20s/20000
http://localhost:9001/sendDelayMsg/helloworld,the expire is 2s/2000
這時候就可以看見這是我們需要的結果。
延遲隊列的一點小總結
延遲隊列在需要延遲處理的場景中是非常有用的,使用rmq的延遲隊列也有很好的rmq的支持,並且由於rmq支持集群模式,可以很好的實現高可用。但是rmq的延遲隊列不是實現延遲處理的唯一解決辦法,使用Java的延遲隊列,甚至是Redis都可以實現延遲隊列,各有各的利弊,對於方案的選用也需要綜合業務來進行考慮,希望在日后我能對這個有更多的了解吧。