【rabbitmq】之過期和死信隊列


1、過期隊列:

消息如果在隊列中一直沒有被消費且存在時間超過了ttl,消息就會變成死信,后續無法再消費。設置ttl有兩種方式,

1,聲明消息隊列的時候,這個是全局的,所有發到這個隊列的消息的過期時間是一樣的

2、發送消息的時候設置屬性,可以每條消息設置不同的ttl

假如你兩種都設置了,以小的ttl為准。

兩者的區別:

queue的全局ttl,消息過期立刻就會被刪掉;如果是發送消息時設置的ttl,過期之后並不會立刻刪掉,這時候消息是否過期是需要投遞給消費者的時候判斷的。

原因:

queue的全局ttl,隊列的有效期都一樣,先入隊列的隊列頭部,頭部也是最早過期的消息,rabbitmq會有一個定時任務從隊列的頭部開始掃描是否有過期消息即可。而每條設置不同的ttl,只有遍歷整個隊列才可以篩選出來過期的消息,這樣的效率實在是太低,而且如果消息量大了根本不可行,所以rabbitmq在等到消息投遞給消費者的時候判斷當前消息是否過期,雖然刪除的不及時但是不影響功能。

注意,ttl隊列一般需要設置監聽者,因為過期之后我們會有一些通用處理邏輯比如轉發到死信隊列。

 

2、死信隊列

死信隊列和普通隊列並沒有什么特殊之處,它的作用主要是用來接收死信消息(dead message),什么是死信消息呢?一個正常的消息變成死信消息有以下集中情況:

1、消息過期

2、消息被拒絕

3、隊列達到最大長度

 

這兩個隊列可以做什么?

最常見的就是延遲關單,比如下單15分鍾沒有支付訂單關閉。本文將模擬訂單來演示這個功能。

image

原代碼:https://www.cnblogs.com/gyjx2016/p/13622097.html ,我們將在這個代碼基礎上增加一些功能。

 

 

orderDto

@Data
public class OrderDto {

    private String orderNo;

    private String title;

    private String body;

    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

 

死信隊列標識符

public class DlxConstant {

    /**
     * dlx 死信交換機標識符
     */
    public static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";

    /**
     * dlx 死信路由key標識符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-routing-key";



}

 

死信隊列配置

@Configuration
public class DlxConfig {

    /*************************ttl dlx 配置 *************************************/
    public static final String DLX_TTL_QUEUE = "dlx_ttl_queue";

    public static final String DLX_TTL_ROUT_KEY = "dlx.ttl.key";

    @Bean("dlxTtlQueue")
    public Queue dlxTtlQueue() {
        return new Queue(DLX_TTL_QUEUE, true, false, false);
    }

    @Bean("dlxTtlBind")
    public Binding dlxTtlBind(@Autowired @Qualifier("directExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(dlxTtlQueue()).to((directExchange)).with(DLX_TTL_ROUT_KEY);
    }

}

 

訂單過期隊列配置

@Configuration
public class OrderTtlQueueConfig {

    public static final String TTL_ORDER_QUEUE="ttl_order_queue";

    public static final String TTL_ROUT_KEY="#.ttl";

    /**
     * ttlqueue 里的消息過期之后轉移到死信隊列中
     * @return
     */
    @Bean("ttlOrderQueue")
    public Queue ttlOrderQueue(){
        Map<String,Object> params=new HashMap<>();
        params.put(DlxConstant.DEAD_LETTER_EXCHANGE,RabbitMQExchangeConfig.DIRECT_EXCHANGE);
        params.put(DlxConstant.DEAD_LETTER_QUEUE_KEY,DlxConfig.DLX_TTL_ROUT_KEY);
        return new Queue(TTL_ORDER_QUEUE,true,false,false,params);
    }

    @Bean("ttlOrderBind")
    public Binding ttlOrderBind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(ttlOrderQueue()).to(topicExchange).with(TTL_ROUT_KEY);
    }
}

 

發送創建訂單mq消息

 

/**
     * 創建訂單
     * @return
     */
    @GetMapping("createOrder")
    public String createOrder(){
        OrderDto orderDto=new OrderDto();
        orderDto.setOrderNo("123456789");
        orderDto.setTitle("小米手機");
        orderDto.setBody("黑色的小米手機");
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE,"order.ttl",orderDto,message -> {
            //過期時間10s
            message.getMessageProperties().setExpiration("10000");
            message.getMessageProperties().setMessageId(orderDto.getOrderNo());
            message.getMessageProperties().setCorrelationId(orderDto.getOrderNo());
            return message;
        });

        log.info("send ok");

        return "ok";

    }

 

 

監聽訂單死信隊列

/**
     * 消費訂單死信隊列
     *
     * @param channel
     * @param orderDto
     * @param message
     * @throws Exception
     */
    @RabbitListener(queues = {"#{dlxTtlQueue.name}"})
    public void orderDlxTtl(@Header(AmqpHeaders.CHANNEL) Channel channel, OrderDto orderDto, Message message) throws Exception {
        log.info("orderDlxTtl,orderDto:{},mq.message:{}", orderDto.toString(), message.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

 

 

mq管控台

lALPDgtYus6sDQTNAdrNBMM_1219_474

 

 

控制輸出結果:

image

 

 

 

參考文獻:

1、https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#broker-configuration


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM