一、RabbitMQ的的死信隊列+ TTL
1、什么是TTL
- time to live 消息存活時間
- 如果消息在存活時間內未被消費,則會被清除
- RabbitMQ支持兩種ttl設置
- 單獨消息進行配置ttl
- 整個隊列進行配置ttl(居多)
2、什么是rabbitmq的死信隊列
- 沒有被及時消費的消息存放的隊列
3、什么是rabbitmq的死信交換機
- Dead Letter Exchange(死信交換機,縮寫: DLX)當消息成為死信后,會被重新發送到另⼀個交換機,這個交換機就是DLX死信交換機。
4、消息有哪幾種情況成為死信
- 消費者拒收消息(basic.reject/ basic.nack) ,並且沒有重新入隊 requeue=false
- 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
- 隊列的消息長度達到極限
- 結果:消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
二、RabbitMQ管控台消息TTL測試
隊列過期時間使用參數,對整個隊列消息統⼀過期
- x-message-ttl
- 單位ms(毫秒)
消息過期時間使用參數(如果隊列頭部消息未過期,隊列中級消息已經過期,消息還在隊列里面)
- expiration
- 單位ms(毫秒)
兩者都配置的話,時間短的先觸發
1、RabbitMQ Web控制台測試
新建死信交換機(和普通沒區別)
新建死信隊列 (和普通沒區別)
死信交換機和隊列綁定
新建普通隊列,設置過期時間、指定死信交換機
測試:直接web控制台往product_qeueu發送消息即可,會看到消息先是在product_qeueu隊列停留10秒(因為沒有消費者消費),然后該消息從product_qeueu移入到dead_queue。
三、RabbitMQ的延遲隊列和應用場景
1、什么是延遲隊列
⼀種帶有延遲功能的消息隊列, Producer 將消息發送到消息隊列服務端,但並不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某⼀個時間投遞到Consumer進行消費,該消息即定時消息。
2、使用場景
- 通過消息觸發⼀些定時任務,比如在某⼀固定時間點向用戶發送提醒消息
- 用戶登錄之后5分鍾給用戶做分類推送、用戶多少天未登錄給用戶做召回推送;
- 消息生產和消費有時間窗⼝要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送⼀條延時消息。這條消息將會在30分鍾以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略
四、實戰
1、背景
JD、淘系、天貓、拼多多電商平台,規定新注冊的商家,審核通過后需要在【規定時間】內上架商品,否則凍結賬號。
2、代碼開發
死信交換機和死信隊列開發,topic交換機和隊列開發,綁定死信交換機
package net.xdclass.xdclasssp.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { //死信隊列 public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue"; //死信交換機 public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange"; //進入死信隊列的路由key public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key"; //創建死信交換機 @Bean public Exchange lockMerchantDeadExchange() { return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false); } //創建死信隊列 @Bean public Queue lockMerchantDeadQueue() { return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build(); } //綁定死信交換機和死信隊列 @Bean public Binding lockMerchantBinding() { return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE, LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null); } //普通隊列,綁定的個死信交換機 public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue"; //普通的topic交換機 public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange"; //路由key public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key"; //創建普通交換機 @Bean public Exchange newMerchantExchange() { return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false); } //創建普通隊列 @Bean public Queue newMerchantQueue() { Map<String, Object> args = new HashMap<>(3); //消息過期后,進入到死信交換機 args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE); //消息過期后,進入到死信交換機的路由key args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY); //過期時間,單位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build(); } //綁定交換機和隊列 @Bean public Binding newMerchantBinding() { return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE, NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null); } }
消息生產和消費
- 消息生產
- 投遞到普通的topic交換機
- 消息過期,進入死信交換機
- 消息消費
- 消費者監聽死信交換機的隊列
MerchantAccountController 模擬請求
@RestController @RequestMapping("/api/admin/merchant") public class MerchantAccountController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("check") public Object check(){ //修改數據庫的商家賬號狀態 TODO rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家賬號通過審核"); Map<String,Object> map = new HashMap<>(); map.put("code",0); map.put("msg","賬號審核通過,請10秒內上傳1個商品"); return map; } }
MerchantMQListener消費
package net.xdclass.xdclasssp.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = "lock_merchant_dead_queue") public class MerchantMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("body="+body); //做復雜業務邏輯 TODO //告訴broker,消息已經被確認 channel.basicAck(msgTag,false); } }