一、基本介紹
①延時隊列(實現定時任務)
場景:比如未付款訂單,超過一定時間后,系統自動取消訂單並釋放占有物品。
常用解決方案: spring的 schedule定時任務輪詢數據庫:
缺點:消耗系統內存、增加了數據庫的壓力、存在較大的時間誤差
解決: rabbitmqExchange的消息TTL和死信結合
②消息的TL(Time To Live)消息的TTL就是消息的存活時間。
RabbitMQ可以對隊列和消息分別設置TTL
- 對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
- 如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。可以通過設置消息的 expiration-message-字段或者x--ttl屬性來設置時間,兩者是一樣的效果。
③ Dead Letter Exchanges (DLX)
一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。(什么是死信)
- 一個消息被Consumer拒收了,並且 reject方法的參數里是。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/basic.nack) requeue=false上面的消息的TTL到了,消息過期了。-
- 一隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上
Dead Letter Exchangeexch其實就是一種普通的,和創建其他exchange沒有兩樣。只是在某一個設置 Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到 Dead Letter Exchange中去。
·我們既可以控制消息在一段時間后變成死信,又可以控制變成死信的消息被路由到某一個指定的交換機,結合二者,其實就可以實現一個延時隊列
二、推薦:給隊列設置延時時間
①:因為RabbitMQ采用惰性檢查機制
RabbitMq采用惰性檢查機制,也就是懶檢查機制:比如消息隊列中存放了多條消息,第一條是5分鍾過期,第二條是1分鍾過期,第三條是1秒鍾過期,按照正常的過期邏輯,應該是1秒過期的先排出這個隊列,進入死信隊列中,但是實際RabbitMQ是先拿第一條消息,也就是5分鍾過期的,一看5分鍾還沒到過期時間,然后等待5分鍾會將第一條消息拿出來,放入死信隊列,這里就會出現問題,第二條設置1分鍾的和第三條設置1秒鍾的消息必須要等待第一條5分鍾過期后才能過期,等待第一條消息過期5分鍾了,拿第二條、三條的時候都不需要判斷就已經過期了,直接就放入死信隊列中,所以第二條、三條需要等待第一條消息過了5分鍾才能過期,這樣的延時根本就沒產生對應的效果。
②:理論結構圖
③:項目結構圖
④:代碼實現
4.1:基礎設置
4.1.1:創建信道、隊列、路由(結構圖)
4.1.2:MyRabbit配置
@Configuration public class MyRabbitConfig { /** * 使用JSON序列化機制,進行消息轉移 */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public Exchange stockEventExchange() { TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } @Bean public Queue stockReleaseStockQueue() { return new Queue("stock.release.stock.queue", true, false, false); }
@Bean public Queue stockDelayQueue() { HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "stock-event-exchange"); args.put("x-dead-letter-routing-key", "stock.release"); args.put("x-message-ttl", 120000); //延時2min return new Queue("stock.delay.queue", true, false, false,args); } @Bean public Binding stockReleaseBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }
4.1.3:設置監聽器
@Service @RabbitListener(queues = "stock.release.stock.queue") public class StockRelaeaseListener { @Autowired WareSkuService wareskuService; @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException { System.out.println("收到解鎖信息"); try { wareskuService.unlockStock(to); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } @RabbitHandler public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException { System.out.println("訂單關閉准備解鎖庫存"); try { wareskuService.unlockStock(orderTo); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
4.2、wareskuService類:
4.2.1:orderLockStock方法:
/** * 為某個訂單鎖定庫存 * * @param vo * @return * @Transactional(rollbackFor = NoStockException.class)運行出現異常時回滾 */ @Transactional @Override public Boolean orderLockStock(WareSkuLockVo vo) { /** * 保存庫存工作單的詳情,為了追溯哪個倉庫鎖了多少 */ WareOrderTaskEntity taskEntity = new WareOrderTaskEntity(); taskEntity.setOrderSn(vo.getOrderSn()); orderTaskService.save(taskEntity); //1、按照下單的收獲地址,找到就近倉庫進行鎖定庫存 List<OrderItemVo> locks = vo.getLocks(); List<SkuWareHasStock> collect = locks.stream().map(item -> { SkuWareHasStock stock = new SkuWareHasStock(); Long skuId = item.getSkuId(); stock.setSkuId(skuId); stock.setNum(item.getCount()); //查詢這個商品在哪里有庫存 List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId); stock.setWareId(wareIds); return stock; }).collect(Collectors.toList()); for (SkuWareHasStock hasStock : collect) { Boolean skuStocked = false; Long skuId = hasStock.getSkuId(); List<Long> wareIds = hasStock.getWareId(); if (wareIds == null || wareIds.size() == 0) { //沒有庫存拋出異常 throw new NoStockException(skuId); } //如果每一個商品都鎖成功,將當前商品鎖定了幾件發送給MQ //如果鎖定失敗,前面保存的工作單信息就回滾了。 for (Long wareId : wareIds) { //成功就返回1,否則就是0 Long count = wareSkuDao.lockSkuStock(skuId, wareId, hasStock.getNum()); if (count == 1) { //TODO:表明鎖住了,發消息告訴MQ庫存鎖定成功 //在數據表wms_ware_order_task_detail中存入庫存單(*倉庫/*商品/*數量/被鎖*件)做記號 WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1); orderTaskDetailService.save(entity); StockLockedTo lockedTo = new StockLockedTo(); lockedTo.setId(taskEntity.getId()); StockDetailTo stockDetailTo = new StockDetailTo(); //拷貝屬性和數值 BeanUtils.copyProperties(entity, stockDetailTo); //防止wms_ware_order_task_detail表內數據因為回滾丟失,所以new一個StockLockedTo類記錄失敗提交的數據 lockedTo.setDetail(stockDetailTo); //將庫存工作單的詳情放入exchange中 rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo); skuStocked = true; break; } else { //鎖失敗了,重試下一個倉庫 } } if (skuStocked == false) { //當前商品所有倉庫都沒鎖住庫存數量 throw new NoStockException(skuId); } } //肯定全部都是鎖定 return true; }
4.2.1、unlockStock方法:
@Override public void unlockStock(StockLockedTo to) { StockDetailTo detail = to.getDetail(); Long detailId = detail.getId(); /** * 1、查詢數據庫wms_ware_order_task_detail表關於這個訂單的鎖定庫存信息 * ①表里有關於鎖庫存的信息,隊列設置延時時間,檢查訂單的狀態,確認是否需要進行解鎖 * 1.1:解鎖前查看訂單情況:則需要解鎖庫存 * 1.1.1查看訂單狀態,查看訂單狀態,若訂單已取消則必須解鎖庫存 * 1.1.2查看訂單狀態,訂單未取消則不能解鎖庫存 * * 1.2:如果沒有訂單情況:則必須解鎖庫存 * * ②沒有則代表整個庫存鎖定失敗,事務回滾了,這種情況無需解鎖 * * 只要解鎖庫存失敗,利用手動模式 */ WareOrderTaskDetailEntity byId = orderTaskDetailService.getById(detailId); if (byId != null) { //解鎖 Long id = to.getId(); WareOrderTaskEntity taskEntity = orderTaskService.getById(id); String orderSn = taskEntity.getOrderSn();//根據訂單號查詢訂單狀態 //查找訂單是否創建成功,此處遠程調用會因為攔截器需要先登錄,因此按4.3進行修改 R r = orderFeignService.getOrderStatus(orderSn); if (r.getCode() == 0) { //訂單數據返回成功 OrderVo data = r.getData(new TypeReference<OrderVo>() { }); //只有訂單狀態是取消狀態/或者訂單不存在才可以解鎖.4為狀態碼代表訂單是取消狀態 System.out.println("Data1:" + data); if (data == null || data.getStatus() == 4) { //當前庫存單詳情狀態1已鎖定但是未解鎖才可以解鎖 unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId); /* if (byId.getLockStatus() == 1) { System.out.println("Data2:" + data); }*/ } } else { //消息拒絕以后重新放入隊列里,讓其他人繼續消費解鎖 throw new RuntimeException("遠程服務失敗"); } } }
4.2.2:unLockStock方法:
//庫存解鎖方法 private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) { wareSkuDao.unlockStock(skuId, wareId, num); } //wareSkuDao.xml中更新代碼: /**<update id="unlockStock"> UPDATE wms_ware_sku SET stock_locked = stock_locked- #{num} WHERE sku_id= #{skuId} AND ware_id=#{wareId} </update>**/
4.2.3:unlockStock(OrderTo orderTo)方法:
/* *防止訂單服務卡頓,導致訂單消息一直更改不了,庫存優先到期,查訂單狀態新建狀態,什么都做不了就走了 *導致卡頓的訂單,永遠不能解鎖 */ @Transactional @Override public void unlockStock(OrderTo orderTo) { String orderSn = orderTo.getOrderSn(); //進行到這一步再查一下最新的狀態 WareOrderTaskEntity task = orderTaskService.getOrderTaskByOrderSn(orderSn); //獲取was_ware_order_task中的id,從而以其獲取was_ware_order_task_detail狀態為1(未解鎖)的庫存 Long id = task.getId(); List<WareOrderTaskDetailEntity> entities = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1)); //Long skuId, Long wareId, Integer num, Long taskDetailId for (WareOrderTaskDetailEntity entity:entities){ unLockStock(entity.getSkuId(),entity.getWareId(),entity.getSkuNum(),entity.getId()); } }
4.3:攔截器修改
@Component public class LoginUserInterceptor implements HandlerInterceptor { public static ThreadLocal<MemberResVo> loginUser = new ThreadLocal<>(); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //此地址下不進行攔截 String uri = request.getRequestURI(); boolean match = new AntPathMatcher().match("/order/order/status/**", uri); boolean match1 = new AntPathMatcher().match("/payed/notify", uri); if (match || match1){ return true; }
//獲取登錄用戶的鍵 MemberResVo attribute = (MemberResVo) request.getSession().getAttribute(AuthServerConstant.LONG_USER); if (attribute!=null){ loginUser.set(attribute); return true; }else { request.getSession().setAttribute("msg","請先進行登錄!"); response.sendRedirect("http://auth.gulimall.com/login.html"); return false; } } }