RabbitMQ TTL過期時間與死信隊列說明


TTL過期時間

我們在RabbitMQ中發布消息時,有兩種方法設置某個隊列的消息過期時間:

1、針對隊列來說,可以使用x-message-ttl參數設置當前隊列中所有消息的過期時間,即當前隊列中所有的消息過期時間都一樣;

2、針對單個消息來說,在發布消息時,可以使用Expiration參數來設置單個消息的過期時間。

以上兩個參數的單位都是毫秒,即1000毫秒為1秒。如果以上兩個都設置,則以當前消息最短的那個過期時間為准。

死信隊列有其特殊的應用場景,例如用戶在商城下單成功並點擊去支付的時候,如果在指定的時間內未支付,那么就可以將該下單消息投遞到死信隊列中,至於后續怎么處理死信隊列需要結合具體的應用場景。

1.設置隊列中所有消息的過期時間

通過以下方式可以完成TTL隊列的創建

@Configuration
public class RabbitExchangeAndQueueConfig {
    /**
     * 創建交換機
     * durable:是否持久化
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    //創建direct交換機
    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl-direct-exchange",true,false);
    }
    /**
     * 創建隊列
     * durable:是否持久化
     * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    //創建ttl隊列
    @Bean
    public Queue ttlQueue(){
        Map<String, Object> map = new HashMap<>();
        //設置過期時間為10s
        map.put("x-message-ttl",10000);
        return new Queue("ttlQueue",true,false,false,map);
    }
    //ttl隊列綁定交換機
    @Bean
    public Binding ttlDirectBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

帶有TTL的隊列在我們的管理頁面中會有顯示

 發送消息測試

@SpringBootTest
class RabbitmqProviderApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testTTLDirect() {
        String msg = "hello";
        rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg);
    }
}

然后可以自行通過rabbitmq管理頁面觀察隊列中消息的存活時間  

2.針對單個消息的過期時間

通過以下方式創建一個正常的隊列

@Configuration
public class RabbitExchangeAndQueueConfig {
    /**
     * 創建交換機
     * durable:是否持久化
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct-exchange",true,false);
    }

    /**
     * 創建隊列
     * durable:是否持久化
     * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    @Bean
    public Queue smsQueue(){
        return new Queue("smsQueue",true);
    }

    /**
     * 綁定交換機和隊列
     */
    //sms隊列綁定direct交換機
    @Bean
    public Binding smsDirectBinding(){
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }
}

在發送消息的時候設置TTL過期時間

@SpringBootTest
class RabbitmqProviderApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
  
    @Test
    void testDirect() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //在此進行TTL設置
                message.getMessageProperties().setExpiration("10000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        String msg = "hello";
        rabbitTemplate.convertAndSend("direct-exchange","sms",msg,messagePostProcessor);
    }
}

這里需要注意一點,因為我們該隊列不是TTL隊列,又因為隊列中的消息都是順序排序的,所以當如果我們該隊列之前存在沒有設置TTL的消息,未被消費,那么我們設置了的TTL消息就會一直存在該隊列中

死信隊列

死信隊列其實也是一個正常的隊列,可以被消費,只不過,它是專門用來存放一些被丟棄了的消息。因為如果消息直接被丟棄了其實是個很危險的事情,所以我們使用一個隊列來存放這些消息,這個隊列就被稱為死信隊列。

通常什么消息會放置我們的死信隊列呢?

1.消息TTL過期

2.隊列達到了最大長度,無法再添加信息到MQ中

3.消息被拒絕,並且沒有重新入隊

死信隊列的實現

1.創建死信交換機和隊列

這里我們可以新創建一個交換機和隊列用來接收我們的死信隊列

@Configuration
public class RabbitExchangeAndQueueConfig {
    /**
     * 創建交換機
     * durable:是否持久化
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    //創建死信direct交換機
    @Bean
    public DirectExchange deadDirectExchange(){
        return new DirectExchange("dead-direct-exchange",true,false);
    }

    /**
     * 創建隊列
     * durable:是否持久化
     * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable
     * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除
     */
    //創建死信的隊列
    @Bean
    public Queue deadQueue(){
        return new Queue("deadQueue",true);
    }

    /**
     * 綁定交換機和隊列
     */
    //死信交換機與死信隊列綁定
    @Bean
    public Binding deadDirectBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
    }
}

2.基於TTL過期時間實現死信隊列

然后重新創建一個TTL隊列,這里不可直接修改我們之前上方所創建的TTL隊列參數的,若需要繼續使用我們之前上方所創建的隊列,則需要先把該隊列進行刪除,否則會報錯。當然生產環境的話,不建議直接去刪除我們的隊列。

#這里因為是測試,為了方便.......我將我之前創建的TTL隊列進行刪除,然后修改TTL隊列代碼

基於之前的TTL隊列做以下修改

@Bean
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<>();
    //設置過期時間為10s
    map.put("x-message-ttl",10000);
    //下方設置死信隊列的交換機名稱,因為我是direct的交換機,所以還需要設置其routingKey,若是fanout模式則不需要設置routingKey
    map.put("x-dead-letter-exchange","dead-direct-exchange");
    map.put("x-dead-letter-routing-key","dead");
    return new Queue("ttlQueue",true,false,false,map);
}

發送消息測試

@SpringBootTest
class RabbitmqProviderApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testTTLDirect() {
        String msg = "hello";
        rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg);
    }
}

觀察rabbitmq管理頁面,可以發現,配置了死信隊列的話,會多出兩個參數DLX,DLK。DLX代表死信交換機,DLK代表死信routingKey

可以看到,當ttl隊列消息過期后,就會存放至了我們的死信隊列中

3.基於隊列最大長度實現死信隊列

還是基於剛剛的ttl隊列進行修改

基於之前的TTL隊列做以下修改

@Bean
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<>();
    //設置過期時間為10s
    //map.put("x-message-ttl",10000);
    //設置隊列最大長度為5
    map.put("x-max-length",5);
    //下方設置死信隊列的交換機名稱,因為我是direct的交換機,所以還需要設置其routingKey,若是fanout模式則不需要設置routingKey
    map.put("x-dead-letter-exchange","dead-direct-exchange");
    map.put("x-dead-letter-routing-key","dead");
    return new Queue("ttlQueue",true,false,false,map);
}

然后把存在的ttlQueue隊列在頁面上再次刪除

發送消息測試

@Test
void testLengthDirect(){
    String msg = "";
    for(int i = 0; i< 11 ;i++){
        msg = "第" + i;
        rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg);
    }
}

觀察rabbitmq頁面,可以看到由於我們ttlQueue隊列最大長度只有5,所以其余7項都被轉移到了死信隊列中,Lim標簽代表設置了最大長度

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM