1、過期隊列:
消息如果在隊列中一直沒有被消費且存在時間超過了ttl,消息就會變成死信,后續無法再消費。設置ttl有兩種方式,
1,聲明消息隊列的時候,這個是全局的,所有發到這個隊列的消息的過期時間是一樣的
2、發送消息的時候設置屬性,可以每條消息設置不同的ttl
假如你兩種都設置了,以小的ttl為准。
兩者的區別:
queue的全局ttl,消息過期立刻就會被刪掉;如果是發送消息時設置的ttl,過期之后並不會立刻刪掉,這時候消息是否過期是需要投遞給消費者的時候判斷的。
原因:
queue的全局ttl,隊列的有效期都一樣,先入隊列的隊列頭部,頭部也是最早過期的消息,rabbitmq會有一個定時任務從隊列的頭部開始掃描是否有過期消息即可。而每條設置不同的ttl,只有遍歷整個隊列才可以篩選出來過期的消息,這樣的效率實在是太低,而且如果消息量大了根本不可行,所以rabbitmq在等到消息投遞給消費者的時候判斷當前消息是否過期,雖然刪除的不及時但是不影響功能。
注意,ttl隊列一般需要設置監聽者,因為過期之后我們會有一些通用處理邏輯比如轉發到死信隊列。
2、死信隊列
死信隊列和普通隊列並沒有什么特殊之處,它的作用主要是用來接收死信消息(dead message),什么是死信消息呢?一個正常的消息變成死信消息有以下集中情況:
1、消息過期
2、消息被拒絕
3、隊列達到最大長度
這兩個隊列可以做什么?
最常見的就是延遲關單,比如下單15分鍾沒有支付訂單關閉。本文將模擬訂單來演示這個功能。
原代碼:https://www.cnblogs.com/gyjx2016/p/13622097.html ,我們將在這個代碼基礎上增加一些功能。
orderDto
@Data public class OrderDto { private String orderNo; private String title; private String body; public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } }
死信隊列標識符
public class DlxConstant { /** * dlx 死信交換機標識符 */ public static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange"; /** * dlx 死信路由key標識符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-routing-key"; }
死信隊列配置
@Configuration public class DlxConfig { /*************************ttl dlx 配置 *************************************/ public static final String DLX_TTL_QUEUE = "dlx_ttl_queue"; public static final String DLX_TTL_ROUT_KEY = "dlx.ttl.key"; @Bean("dlxTtlQueue") public Queue dlxTtlQueue() { return new Queue(DLX_TTL_QUEUE, true, false, false); } @Bean("dlxTtlBind") public Binding dlxTtlBind(@Autowired @Qualifier("directExchange") DirectExchange directExchange) { return BindingBuilder.bind(dlxTtlQueue()).to((directExchange)).with(DLX_TTL_ROUT_KEY); } }
訂單過期隊列配置
@Configuration public class OrderTtlQueueConfig { public static final String TTL_ORDER_QUEUE="ttl_order_queue"; public static final String TTL_ROUT_KEY="#.ttl"; /** * ttlqueue 里的消息過期之后轉移到死信隊列中 * @return */ @Bean("ttlOrderQueue") public Queue ttlOrderQueue(){ Map<String,Object> params=new HashMap<>(); params.put(DlxConstant.DEAD_LETTER_EXCHANGE,RabbitMQExchangeConfig.DIRECT_EXCHANGE); params.put(DlxConstant.DEAD_LETTER_QUEUE_KEY,DlxConfig.DLX_TTL_ROUT_KEY); return new Queue(TTL_ORDER_QUEUE,true,false,false,params); } @Bean("ttlOrderBind") public Binding ttlOrderBind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(ttlOrderQueue()).to(topicExchange).with(TTL_ROUT_KEY); } }
發送創建訂單mq消息
/** * 創建訂單 * @return */ @GetMapping("createOrder") public String createOrder(){ OrderDto orderDto=new OrderDto(); orderDto.setOrderNo("123456789"); orderDto.setTitle("小米手機"); orderDto.setBody("黑色的小米手機"); rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE,"order.ttl",orderDto,message -> { //過期時間10s message.getMessageProperties().setExpiration("10000"); message.getMessageProperties().setMessageId(orderDto.getOrderNo()); message.getMessageProperties().setCorrelationId(orderDto.getOrderNo()); return message; }); log.info("send ok"); return "ok"; }
監聽訂單死信隊列
/** * 消費訂單死信隊列 * * @param channel * @param orderDto * @param message * @throws Exception */ @RabbitListener(queues = {"#{dlxTtlQueue.name}"}) public void orderDlxTtl(@Header(AmqpHeaders.CHANNEL) Channel channel, OrderDto orderDto, Message message) throws Exception { log.info("orderDlxTtl,orderDto:{},mq.message:{}", orderDto.toString(), message.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
mq管控台
控制輸出結果:
參考文獻:
1、https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#broker-configuration