SpringBoot2.3整合RabbitMQ實現延遲消費消息


1.源碼獲取地址

文章末尾有源代碼地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章節主要實現消息的延遲消費,在學習延遲消費之前必須先了解RabbitMQ兩個基本概念,消息的TTL和死信Exchange,通過這兩者的組合來實現消息的延遲消費。
不想看原理講解的,直接通過標題6看代碼實現

2.消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。

3.死信交換器 Dead Letter Exchanges

  • 一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
  • 一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
  • 上面的消息的TTL到了,消息過期了
  • 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。 死信交換器(Dead Letter Exchange)其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

4.實現延遲消費原理

file

  • 大概原理:首先發送消息到死信隊列,死信隊列設置ttl過期時間,到期之后會自動將消息發送到一般隊列實現消息的消費
  • 實現步驟如下
  • 創建死信交換器
  • 創建死信隊列
  • 將死信隊列與死信交換機綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)消息的發送方在向 Exchange發送消息時,也必須指定消息的RoutingKey。Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的Routing key完全一致,才會接收到消息.
  • 創建正常交換器
  • 創建正常隊列
  • 將正常隊列綁定到正常交換器

5.基於案例實現消息的延遲消費

這里我們以最熟悉的12306購票為例進行案例場景的分析,12306購票步驟如下:

  • 首先登錄12306根據日期和起點站等條件進行搶票下訂單
  • 搶到票訂單處於未支付狀態,並提示支付時間30分鍾內
    file
  • 這里就可以使用到延時隊列,在下訂單完成的時候將訂單號發送到MQ的死信隊列,並設置30分鍾過期,30分鍾以后死信隊列的數據會轉發到正常隊列,從正常隊列中獲取到下訂單的訂單號,然后我們根據訂單號查詢訂單的支付狀態,如果已經支付我們不做任何操作,如果未支付取消訂單,關閉支付狀態,將票回滾到票池供其他用戶購買

6.代碼實現

  • 在RabbitMQConfig中創建隊列、交換機以及綁定關系

      @Configuration
      public class RabbitMQConfig {
    
      		/**
      		 * 測試發送消息到MQ
      		 * @return
      		 */
      		@Bean
      		public Queue testHello() {
      				return new Queue(SysConstant.QUEUE_TEST_HELLO);
      		}
    
    
      		/**
      		 * 死信交換機
      		 * @return
      		 */
      		@Bean
      		public DirectExchange sysOrderDelayExchange() {
      				return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE);
      		}
    
      		/**
      		 * 死信隊列
      		 * @return
      		 */
      		@Bean
      		public Queue sysOrderDelayQueue() {
      				Map<String, Object> map = new HashMap<String, Object>(16);
      				map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交換機
      				map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey
      				return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map);
      		}
    
      		/**
      		 * 給死信隊列綁定死信交換機
      		 * @return
      		 */
      		@Bean
      		public Binding sysOrderDelayBinding() {
      				return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY);
      		}
    
      		/**
      		 * 死信接收交換機,用於接收死信隊列的消息
      		 * @return
      		 */
      		@Bean
      		public DirectExchange sysOrderReceiveExchange() {
      				return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE);
      		}
    
      		/**
      		 * 死信接收隊列
      		 * @return
      		 */
      		@Bean
      		public Queue sysOrderReceiveQueue() {
      				return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE);
      		}
    
      		/**
      		 * 死信接收交換機綁定接收死信隊列消費隊列
      		 * @return
      		 */
      		@Bean
      		public Binding sysOrdeReceiveBinding() {
      				return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY);
      		}
      }
    
  •   發送延時消息到死信交換器方法
    
      		@Service
      		public class MsgService {
    
      				@Autowired
      				private RabbitTemplate rabbitTemplate;
      				/**
      				 * 發送延時消息到mq
      				 * @param exchange 死信交換機
      				 * @param routeKey 路由key
      				 * @param data 發送數據
      				 * @param delayTime 過期時間,單位毫秒
      				 */
      				public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) {
      						rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> {
      								message.getMessageProperties().setExpiration(delayTime + "");
      								return message;
      						});
      				}
      		}
    
  • 監聽隊列消息ReceiveMsgListener類

       /**
      		 * 獲取到的延時消息
      		 * 這里接收到消息進行對應的業務處理(例如:取消訂單,關閉支付,回滾庫存等 ...)
      		 * @param msg
      		 */
      		@RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE)
      		@RabbitHandler
      		public void getdelayMsg(String msg) {
      				log.info("MQ接收消息時間:{},消息內容:{}", DateUtil.formatDateTime(DateUtil.date()),msg);
      				log.info("------->這里實現訂單關閉、支付關閉、回滾庫存業務邏輯...");
      		}
    
  •   		創建Controller向隊列發送消息,設置過期時間10秒
    
      				@RestController
      				@RequestMapping("mq")
      				@Slf4j
      				public class MQController {
    
      						@Autowired
      						private MsgService msgService;
    
      						@GetMapping("sendMsg")
      						public String sendMsg() {
      								log.info("發送延時消息時間:" + DateUtil.formatDateTime(DateUtil.date()));
    
      								OrderInfo orderInfo = new OrderInfo();
      								orderInfo.setOrderId(IdUtil.fastSimpleUUID());
      								orderInfo.setOrderState("待支付");
      								orderInfo.setPayMoney(999.88);
      								msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分鍾
      								return JSONUtil.toJsonStr("發送延時消息成功");
      						}
      				}
    
  • 啟動服務,可以看到MQ中創建對應的隊列和交換器

file
file

  • 控制台日志可以看到發送消息與消費消息間隔時間是10s

file

7.更多MQ技術文檔獲取

https://www.sunnyblog.top/index.html?tagId=1264009609236971520

詳細開發技術文檔盡在 點擊這里查看技術文檔 ;更多技術文章: https://www.sunnyblog.top;任何疑問加QQ群咨詢:534073451


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM