TTL過期時間
我們在RabbitMQ中發布消息時,有兩種方法設置某個隊列的消息過期時間:
1、針對隊列來說,可以使用x-message-ttl參數設置當前隊列中所有消息的過期時間,即當前隊列中所有的消息過期時間都一樣;
2、針對單個消息來說,在發布消息時,可以使用Expiration參數來設置單個消息的過期時間。
以上兩個參數的單位都是毫秒,即1000毫秒為1秒。如果以上兩個都設置,則以當前消息最短的那個過期時間為准。
死信隊列有其特殊的應用場景,例如用戶在商城下單成功並點擊去支付的時候,如果在指定的時間內未支付,那么就可以將該下單消息投遞到死信隊列中,至於后續怎么處理死信隊列需要結合具體的應用場景。
1.設置隊列中所有消息的過期時間
通過以下方式可以完成TTL隊列的創建
@Configuration public class RabbitExchangeAndQueueConfig { /** * 創建交換機 * durable:是否持久化 * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ //創建direct交換機 @Bean public DirectExchange ttlDirectExchange(){ return new DirectExchange("ttl-direct-exchange",true,false); } /** * 創建隊列 * durable:是否持久化 * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ //創建ttl隊列 @Bean public Queue ttlQueue(){ Map<String, Object> map = new HashMap<>(); //設置過期時間為10s map.put("x-message-ttl",10000); return new Queue("ttlQueue",true,false,false,map); } //ttl隊列綁定交換機 @Bean public Binding ttlDirectBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); } }
帶有TTL的隊列在我們的管理頁面中會有顯示
發送消息測試
@SpringBootTest class RabbitmqProviderApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void testTTLDirect() { String msg = "hello"; rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg); } }
然后可以自行通過rabbitmq管理頁面觀察隊列中消息的存活時間
2.針對單個消息的過期時間
通過以下方式創建一個正常的隊列
@Configuration public class RabbitExchangeAndQueueConfig { /** * 創建交換機 * durable:是否持久化 * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ @Bean public DirectExchange directExchange(){ return new DirectExchange("direct-exchange",true,false); } /** * 創建隊列 * durable:是否持久化 * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ @Bean public Queue smsQueue(){ return new Queue("smsQueue",true); } /** * 綁定交換機和隊列 */ //sms隊列綁定direct交換機 @Bean public Binding smsDirectBinding(){ return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms"); } }
在發送消息的時候設置TTL過期時間
@SpringBootTest class RabbitmqProviderApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void testDirect() { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //在此進行TTL設置 message.getMessageProperties().setExpiration("10000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; String msg = "hello"; rabbitTemplate.convertAndSend("direct-exchange","sms",msg,messagePostProcessor); } }
這里需要注意一點,因為我們該隊列不是TTL隊列,又因為隊列中的消息都是順序排序的,所以當如果我們該隊列之前存在沒有設置TTL的消息,未被消費,那么我們設置了的TTL消息就會一直存在該隊列中
死信隊列
死信隊列其實也是一個正常的隊列,可以被消費,只不過,它是專門用來存放一些被丟棄了的消息。因為如果消息直接被丟棄了其實是個很危險的事情,所以我們使用一個隊列來存放這些消息,這個隊列就被稱為死信隊列。
通常什么消息會放置我們的死信隊列呢?
1.消息TTL過期
2.隊列達到了最大長度,無法再添加信息到MQ中
3.消息被拒絕,並且沒有重新入隊
死信隊列的實現
1.創建死信交換機和隊列
這里我們可以新創建一個交換機和隊列用來接收我們的死信隊列
@Configuration public class RabbitExchangeAndQueueConfig { /** * 創建交換機 * durable:是否持久化 * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ //創建死信direct交換機 @Bean public DirectExchange deadDirectExchange(){ return new DirectExchange("dead-direct-exchange",true,false); } /** * 創建隊列 * durable:是否持久化 * exclusive:默認false,只能在當前創建連接時使用,連接關閉后隊列自動刪除,該優先級高於durable * autoDelete:是否自動刪除,當沒有生產者或消費者使用該交換機時,會自動刪除 */ //創建死信的隊列 @Bean public Queue deadQueue(){ return new Queue("deadQueue",true); } /** * 綁定交換機和隊列 */ //死信交換機與死信隊列綁定 @Bean public Binding deadDirectBinding(){ return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead"); } }
2.基於TTL過期時間實現死信隊列
然后重新創建一個TTL隊列,這里不可直接修改我們之前上方所創建的TTL隊列參數的,若需要繼續使用我們之前上方所創建的隊列,則需要先把該隊列進行刪除,否則會報錯。當然生產環境的話,不建議直接去刪除我們的隊列。
#這里因為是測試,為了方便.......我將我之前創建的TTL隊列進行刪除,然后修改TTL隊列代碼
基於之前的TTL隊列做以下修改
@Bean public Queue ttlQueue(){ Map<String, Object> map = new HashMap<>(); //設置過期時間為10s map.put("x-message-ttl",10000); //下方設置死信隊列的交換機名稱,因為我是direct的交換機,所以還需要設置其routingKey,若是fanout模式則不需要設置routingKey map.put("x-dead-letter-exchange","dead-direct-exchange"); map.put("x-dead-letter-routing-key","dead"); return new Queue("ttlQueue",true,false,false,map); }
發送消息測試
@SpringBootTest class RabbitmqProviderApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void testTTLDirect() { String msg = "hello"; rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg); } }
觀察rabbitmq管理頁面,可以發現,配置了死信隊列的話,會多出兩個參數DLX,DLK。DLX代表死信交換機,DLK代表死信routingKey
可以看到,當ttl隊列消息過期后,就會存放至了我們的死信隊列中
3.基於隊列最大長度實現死信隊列
還是基於剛剛的ttl隊列進行修改
基於之前的TTL隊列做以下修改
@Bean public Queue ttlQueue(){ Map<String, Object> map = new HashMap<>(); //設置過期時間為10s //map.put("x-message-ttl",10000); //設置隊列最大長度為5 map.put("x-max-length",5); //下方設置死信隊列的交換機名稱,因為我是direct的交換機,所以還需要設置其routingKey,若是fanout模式則不需要設置routingKey map.put("x-dead-letter-exchange","dead-direct-exchange"); map.put("x-dead-letter-routing-key","dead"); return new Queue("ttlQueue",true,false,false,map); }
然后把存在的ttlQueue隊列在頁面上再次刪除
發送消息測試
@Test void testLengthDirect(){ String msg = ""; for(int i = 0; i< 11 ;i++){ msg = "第" + i; rabbitTemplate.convertAndSend("ttl-direct-exchange","ttl",msg); } }
觀察rabbitmq頁面,可以看到由於我們ttlQueue隊列最大長度只有5,所以其余7項都被轉移到了死信隊列中,Lim標簽代表設置了最大長度