需求分析
超過限定時間並未支付的訂單,我們需要進行超時訂單的處理:先調用微信支付api,查詢該訂單的支付狀態。如果未支付調用關閉訂單的api,並修改訂單狀態為已關閉,並回滾庫存數。如果該訂單已經支付,則做補償操作(修改訂單狀態和記錄)。
實現思路
如何獲取超過限定時間的訂單?我們可以使用延遲消息隊列(死信隊列)來實現。
所謂延遲消息隊列,就是消息的生產者發送的消息並不會立刻被消費,而是在設定的時間之后才可以消費。
我們可以在訂單創建時發送一個延遲消息,消息為訂單號,系統會在限定時間之后取出這個消息,然后查詢訂單的支付狀態,根據結果做出相應的處理。
rabbitmq延遲消息
使用RabbitMQ來實現延遲消息必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,通過這兩者的組合來實現上述需求。
消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
我們創建一個隊列queue.temp,在Arguments 中添加x-message-ttl 為5000 (單位是毫秒),那每一個進入這個隊列的消息在5秒后會消失。
死信交換器 Dead Letter Exchanges
一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。
(1) 一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
(2)上面的消息的TTL到了,消息過期了。
(3)隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信交換機上。
Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

我們現在可以測試一下延遲隊列。
(1)創建死信交換器 exchange.ordertimeout (fanout)
(2)創建隊列queue.ordertimeout
(3)建立死信交換器 exchange.ordertimeout 與隊列queue.ordertimeout 之間的綁定
(4)創建隊列queue.ordercreate,Arguments添加
x-message-ttl=10000
x-dead-letter-exchange: exchange.ordertimeout
(5)測試:向queue.ordercreate隊列添加消息,等待10秒后消息從queue.ordercreate隊列消失
代碼實現

微信支付-關閉訂單
(1)WxPayController新增方法
/** * 關閉微信訂單 * @param orderId * @return */ @PutMapping("/close/{orderId}") public Result closeOrder(@PathVariable String orderId){ Map map = wxPayService.closeOrder( orderId ); return new Result( true,StatusCode.OK,"",map ); }
(2)changgou_service_pay的WxPayService新增方法定義
/** * 關閉訂單 * @param orderId * @return */ Map closeOrder(String orderId);
(3)changgou_service_pay的 WxPayServiceImpl實現該方法
@Override public Map closeOrder(String orderId) { Map map=new HashMap( ); map.put( "out_trade_no",orderId ); try { return wxPay.closeOrder( map ); } catch (Exception e) { e.printStackTrace(); return null; } }
(4)changgou_service_pay_api的WxPayFeign新增方法
/** * 關閉微信訂單 * @param orderId * @return */ @PutMapping("/wxpay/close/{orderId}") public Result closeOrder(@PathVariable("orderId") String orderId);
微信支付-查詢訂單
(1)WxPayController新增方法
/** * 查詢微信訂單 * @param orderId * @return */ @GetMapping("/query/{orderId}") public Result queryOrder(@PathVariable String orderId){ Map map = wxPayService.queryOrder( orderId ); return new Result( true,StatusCode.OK,"",map ); }
(2)WxPayFeign新增方法
/** * 查詢微信訂單 * @param orderId * @return */ @GetMapping("/wxpay/query/{orderId}") public Result queryOrder(@PathVariable("orderId") String orderId);
訂單關閉邏輯
如果為未支付,查詢微信訂單
如果確認為未支付,調用關閉本地訂單( 修改訂單表的訂單狀態、記錄訂單日志、恢復商品表庫存)和微信訂單的邏輯。
如果為已支付進行狀態補償。
(1)changgou_service_order新增依賴
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_pay_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
(2)changgou_service_order的OrderService新增方法定義
/** * 關閉訂單 * @param orderId */ void closeOrder(String orderId);
(3)OrderServiceImpl實現該方法
實現邏輯:
1)根據id查詢訂單信息,判斷訂單是否存在,訂單支付狀態是否為未支付
2)基於微信查詢訂單支付狀態
2.1)如果為success,則修改訂單狀態
2.2)如果為未支付,則修改訂單,新增日志,恢復庫存,關閉訂單
@Autowired private WxPayFeign wxPayFeign; @Override @Transactional public void closeOrder(String orderId) { System.out.println("關閉訂單開啟:"+orderId); Order order = orderMapper.selectByPrimaryKey( orderId ); if(order==null){ throw new RuntimeException( "訂單不存在!" ); } if(!"0".equals( order.getOrderStatus() )){ System.out.println("此訂單不用關閉"); return; } System.out.println("關閉訂單通過校驗:"+orderId); //調用微信訂單查詢,檢測支付狀態 Map wxQueryMap = (Map)wxPayFeign.queryOrder( orderId ).getData(); System.out.println("查詢微信支付訂單:"+wxQueryMap); if("SUCCESS".equals( wxQueryMap.get( "trade_state" ) ) ){ //如果支付狀態是成功,進行補償 updatePayStatus( orderId, (String)wxQueryMap.get( "transaction_id" ) ); System.out.println("補償"); } if("NOTPAY".equals( wxQueryMap.get( "trade_state" ) ) ){ //如果是未支付,關閉訂單 System.out.println("執行關閉!"); order.setCloseTime( new Date( ) );//關閉時間 order.setOrderStatus( "4" );//關閉狀態 orderMapper.updateByPrimaryKeySelective( order );//更新 //記錄訂單變動日志 OrderLog orderLog=new OrderLog(); orderLog.setId( idWorker.nextId()+"" ); orderLog.setOperater("system");// 系統 orderLog.setOperateTime(new Date());//當前日期 orderLog.setOrderStatus("4"); orderLog.setOrderId(order.getId()); orderLogMapper.insert(orderLog); //恢復庫存和銷量 OrderItem _orderItem=new OrderItem(); _orderItem.setOrderId( orderId ); List<OrderItem> orderItemList = orderItemMapper.select( _orderItem ); for(OrderItem orderItem:orderItemList){ skuFeign.resumeStockNum(orderItem.getSkuId(),orderItem.getNum()); } //關閉微信訂單 wxPayFeign.closeOrder( orderId ); } }
延遲消息處理
從消息隊列queue.ordertimeout 中提取消息
(1)修改OrderServiceImpl的add方法,追加代碼,實現mq發送
rabbitTemplate.convertAndSend( "","queue.ordercreate", orderId);
(2)changgou_service_order新建監聽類
@Component public class OrderTimeoutListener { @Autowired private OrderService orderService; /** * 更新支付狀態 * @param orderId */ @RabbitListener(queues = "queue.ordertimeout") public void closeOrder(String orderId){ System.out.println("接收到關閉訂單消息:"+orderId); try { orderService.closeOrder( orderId ); } catch (Exception e) { e.printStackTrace(); } } }
