1.源碼獲取地址
文章末尾有源代碼地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章節主要實現消息的延遲消費,在學習延遲消費之前必須先了解RabbitMQ兩個基本概念,消息的TTL和死信Exchange,通過這兩者的組合來實現消息的延遲消費。
不想看原理講解的,直接通過標題6看代碼實現
2.消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
3.死信交換器 Dead Letter Exchanges
- 一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
- 一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
- 上面的消息的TTL到了,消息過期了
- 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。 死信交換器(Dead Letter Exchange)其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
4.實現延遲消費原理

- 大概原理:首先發送消息到死信隊列,死信隊列設置ttl過期時間,到期之后會自動將消息發送到一般隊列實現消息的消費
- 實現步驟如下
- 創建死信交換器
- 創建死信隊列
- 將死信隊列與死信交換機綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)消息的發送方在向 Exchange發送消息時,也必須指定消息的RoutingKey。Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的Routing key完全一致,才會接收到消息.
- 創建正常交換器
- 創建正常隊列
- 將正常隊列綁定到正常交換器
5.基於案例實現消息的延遲消費
這里我們以最熟悉的12306購票為例進行案例場景的分析,12306購票步驟如下:
- 首先登錄12306根據日期和起點站等條件進行搶票下訂單
- 搶到票訂單處於未支付狀態,並提示支付時間30分鍾內

- 這里就可以使用到延時隊列,在下訂單完成的時候將訂單號發送到MQ的死信隊列,並設置30分鍾過期,30分鍾以后死信隊列的數據會轉發到正常隊列,從正常隊列中獲取到下訂單的訂單號,然后我們根據訂單號查詢訂單的支付狀態,如果已經支付我們不做任何操作,如果未支付取消訂單,關閉支付狀態,將票回滾到票池供其他用戶購買
6.代碼實現
-
在RabbitMQConfig中創建隊列、交換機以及綁定關系
@Configuration public class RabbitMQConfig { /** * 測試發送消息到MQ * @return */ @Bean public Queue testHello() { return new Queue(SysConstant.QUEUE_TEST_HELLO); } /** * 死信交換機 * @return */ @Bean public DirectExchange sysOrderDelayExchange() { return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE); } /** * 死信隊列 * @return */ @Bean public Queue sysOrderDelayQueue() { Map<String, Object> map = new HashMap<String, Object>(16); map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交換機 map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map); } /** * 給死信隊列綁定死信交換機 * @return */ @Bean public Binding sysOrderDelayBinding() { return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY); } /** * 死信接收交換機,用於接收死信隊列的消息 * @return */ @Bean public DirectExchange sysOrderReceiveExchange() { return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); } /** * 死信接收隊列 * @return */ @Bean public Queue sysOrderReceiveQueue() { return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE); } /** * 死信接收交換機綁定接收死信隊列消費隊列 * @return */ @Bean public Binding sysOrdeReceiveBinding() { return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY); } } -
發送延時消息到死信交換器方法 @Service public class MsgService { @Autowired private RabbitTemplate rabbitTemplate; /** * 發送延時消息到mq * @param exchange 死信交換機 * @param routeKey 路由key * @param data 發送數據 * @param delayTime 過期時間,單位毫秒 */ public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) { rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> { message.getMessageProperties().setExpiration(delayTime + ""); return message; }); } } -
監聽隊列消息ReceiveMsgListener類
/** * 獲取到的延時消息 * 這里接收到消息進行對應的業務處理(例如:取消訂單,關閉支付,回滾庫存等 ...) * @param msg */ @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE) @RabbitHandler public void getdelayMsg(String msg) { log.info("MQ接收消息時間:{},消息內容:{}", DateUtil.formatDateTime(DateUtil.date()),msg); log.info("------->這里實現訂單關閉、支付關閉、回滾庫存業務邏輯..."); } -
創建Controller向隊列發送消息,設置過期時間10秒 @RestController @RequestMapping("mq") @Slf4j public class MQController { @Autowired private MsgService msgService; @GetMapping("sendMsg") public String sendMsg() { log.info("發送延時消息時間:" + DateUtil.formatDateTime(DateUtil.date())); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(IdUtil.fastSimpleUUID()); orderInfo.setOrderState("待支付"); orderInfo.setPayMoney(999.88); msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分鍾 return JSONUtil.toJsonStr("發送延時消息成功"); } } -
啟動服務,可以看到MQ中創建對應的隊列和交換器


- 控制台日志可以看到發送消息與消費消息間隔時間是10s

7.更多MQ技術文檔獲取
https://www.sunnyblog.top/index.html?tagId=1264009609236971520
詳細開發技術文檔盡在 點擊這里查看技術文檔 ;更多技術文章: https://www.sunnyblog.top;任何疑問加QQ群咨詢:534073451
