activemq 的延遲隊列和冪等性檢查


一. 延遲消息隊列

1. 在提交支付之后,可以發送一個延遲檢查的隊列,來主動查詢用戶在支付寶上的支付狀態

mq的配置/conf/activeMq.xmlbroker實例上配置加上schedulerSupport="true",如下圖所示

2 延遲檢查如果失敗,則從新發送新的檢查隊列,並且要將檢查次數減一

// 沒有支付,再次發送檢查隊列
System.out.println("訂單號:"+out_trade_no+"尚未支付成功,再次發送檢查隊列,剩余檢查次數:"+count);
if(count>0){
    System.out.println("訂單號:"+out_trade_no+"檢查次數尚未用盡,再次發送檢查隊列"+count);
    count --;
    paymentService.sendDelayPaymentCheckQueue(out_trade_no,count);
}else {



    System.out.println("訂單號:"+out_trade_no+"檢查次數耗盡,停止檢查,調用關單服務");

}

二. 冪等性檢查

調用相同的服務,返回的結果始終一樣,叫做冪等性檢查.

例子:

第一步,在支付的controller層發起延遲隊列

@RequestMapping("/alipay/submit")
@ResponseBody
public String alipay(Model model, HttpServletRequest request,String orderSn){

// 查詢對應訂單信息
OmsOrder omsOrder = orderService.getOrderBySn(orderSn);

AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();//創建API對應的request
alipayRequest.setReturnUrl(AlipayConfig.return_payment_url);
alipayRequest.setNotifyUrl(AlipayConfig.notify_payment_url);//在公共參數中設置回跳和通知地址

Map<String,Object> requestMap= new HashMap<>();
requestMap.put("out_trade_no",orderSn);
requestMap.put("product_code","FAST_INSTANT_TRADE_PAY");
requestMap.put("total_amount",0.01);
requestMap.put("subject",omsOrder.getOmsOrderItems().get(0).getProductName());

alipayRequest.setBizContent(JSON.toJSONString(requestMap));//填充業務參數
String form="";
try {
form = alipayClient.pageExecute(alipayRequest).getBody(); //調用SDK生成表單
} catch (AlipayApiException e) {
e.printStackTrace();
}

// 保存支付數據
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setOrderSn(orderSn);
paymentInfo.setPaymentStatus("0");
paymentInfo.setTotalAmount(omsOrder.getPayAmount());
paymentInfo.setSubject(omsOrder.getOmsOrderItems().get(0).getProductName());
paymentInfo.setOrderId(omsOrder.getId());
paymentService.addPayment(paymentInfo);

System.out.println(form);

// 發送一個檢查支付結果的隊列(延遲隊列)
paymentService.sendDelayPaymentCheckQueue(orderSn,7);// 調用支付寶的支付狀態接口程序

return form;
}

@Override
public void sendDelayPaymentCheckQueue(String orderSn,long count) {
// 發送orderSn的延遲檢查隊列
ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();

Connection connection = null;
Session session = null;// 開啟消息事務
Queue paymentResultQueue = null; // 隊列
MessageProducer producer = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
paymentResultQueue = session.createQueue("PAYMENT_CHECK_QUEUE");
//text文本格式,map鍵值格式
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("out_trade_no",orderSn);
mapMessage.setLong("count",count);

//消息延遲消費
mapMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000*30);

producer = session.createProducer(paymentResultQueue);// 消息的生成者
producer.send(mapMessage);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

}


@Component
public class PaymentConsumer {

@Autowired
PaymentService paymentService;

@JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_CHECK_QUEUE")
public void consumePaymentCheckQueue(MapMessage mapMessage) throws JMSException {
String out_trade_no = mapMessage.getString("out_trade_no");
long count = mapMessage.getLong("count");

System.out.println("開始檢查支付狀態。。。訂單號:"+out_trade_no);

// 系統支付的冪等性檢查
String status = paymentService.checkPayStatus(out_trade_no);
if(status!=null&&!status.equals("已支付")){
// 檢查支付狀態
Map<String,Object> map = paymentService.checkPayment(out_trade_no);

if(map!=null&&(((String)map.get("trade_status")).equals("TRADE_SUCCESS")||((String)map.get("trade_status")).equals("TRADE_FINISHED"))){
// 保存支付信息,發送支付成功隊列
System.out.println("訂單號:"+out_trade_no+"已經支付成功,繼續后續操作");
//
}else{
// 沒有支付,再次發送檢查隊列
System.out.println("訂單號:"+out_trade_no+"尚未支付成功,再次發送檢查隊列,剩余檢查次數:"+count);
if(count>0){
System.out.println("訂單號:"+out_trade_no+"檢查次數尚未用盡,再次發送檢查隊列"+count);
count --;
paymentService.sendDelayPaymentCheckQueue(out_trade_no,count);
}else {
System.out.println("訂單號:"+out_trade_no+"檢查次數耗盡,停止檢查,調用關單服務");

}
}
}


}
}


@Override
public String checkPayStatus(String order_sn) {

PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setOrderSn(order_sn);
PaymentInfo paymentInfo1 = paymentInfoMapper.selectOne(paymentInfo);

return paymentInfo1.getPaymentStatus();
}


@Override
public Map<String, Object> checkPayment(String out_trade_no) {

// 調用支付寶,檢查支付狀態
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();

Map<String,Object> map = new HashMap<>();
map.put("out_trade_no",out_trade_no);
request.setBizContent(JSON.toJSONString(map));
AlipayTradeQueryResponse response = null;
try {
response = alipayClient.execute(request);
} catch (AlipayApiException e) {
e.printStackTrace();
}


if(response.isSuccess()){// isSuccess
System.out.println("調用成功,交易已經創建");
String tradeNo = response.getTradeNo();
String tradeStatus = response.getTradeStatus();
map.put("trade_no",tradeNo);
map.put("trade_status",tradeStatus);
return map;
} else {
System.out.println("調用失敗,用戶尚未創建交易");//
}

return null;
}

以上,就完成了!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM