限時訂單
應用場景
用支付寶購買電影票,搶到座位了,通常需要在15分鍾內付錢,否則訂單就會被取消。
解決方法一-----DelayQueue
思路
下訂單的時候,首先保存到數據庫,並同時將訂單數據保存到delayQueue中,開啟一個線程監控delayQueue,利用delayQueue的特性,先過期的數據會被take出來,若發現此時訂單未支付,那就是過期未支付,更改訂單狀態。
代碼
1、SaveOrder
package cn.enjoyedu.service.busi; import cn.enjoyedu.dao.OrderExpDao; import cn.enjoyedu.model.OrderExp; import cn.enjoyedu.service.delay.IDelayOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Date; import java.util.List; import java.util.Random; /** *類說明:訂單相關的服務 */ @Service public class SaveOrder { private Logger logger = LoggerFactory.getLogger(SaveOrder.class); public final static short UNPAY = 0; public final static short PAYED = 1; public final static short EXPIRED = -1; @Autowired private OrderExpDao orderExpDao; @Autowired @Qualifier("dq") //@Qualifier("mq") private IDelayOrder delayOrder; /** * 接收前端頁面參數,生成訂單 * @param orderNumber 訂單個數 */ public void insertOrders(int orderNumber){ Random r = new Random(); OrderExp orderExp ; for(int i=0;i<orderNumber;i++) { //訂單的超時時長,單位秒 long expireTime = r.nextInt(20)+5; orderExp = new OrderExp(); String orderNo = "DD00_"+expireTime+"S"; orderExp.setOrderNo(orderNo); orderExp.setOrderNote("享學訂單——"+orderNo); orderExp.setOrderStatus(UNPAY); orderExpDao.insertDelayOrder(orderExp,expireTime); logger.info("保存訂單到DB:"+orderNo); delayOrder.orderDelay(orderExp, expireTime); } } /** * 應用重啟帶來的問題: * 1、保存在Queue中的訂單會丟失,這些丟失的訂單會在什么時候過期,因為隊列里已經沒有這個訂單了,無法檢查了,這些訂單就得不到處理了。 * 2、已過期的訂單不會被處理,在應用的重啟階段,可能會有一部分訂單過期,這部分過期未支付的訂單同樣也得不到處理,會一直放在數據庫里, * 過期未支付訂單所對應的資源比如電影票所對應的座位,就不能被釋放出來,讓別的用戶來購買。 * 解決之道 :在系統啟動時另行處理 */ @PostConstruct public void initDelayOrder() { logger.info("系統啟動,掃描表中過期未支付的訂單並處理........."); int counts = orderExpDao.updateExpireOrders(); logger.info("系統啟動,處理了表中["+counts+"]個過期未支付的訂單!"); List<OrderExp> orderList = orderExpDao.selectUnPayOrders(); logger.info("系統啟動,發現了表中還有["+orderList.size() +"]個未到期未支付的訂單!推入檢查隊列准備到期檢查...."); for(OrderExp order:orderList) { long expireTime = order.getExpireTime().getTime()-(new Date().getTime()); delayOrder.orderDelay(order, expireTime); } } }
2、IDelayOrder
package cn.enjoyedu.service.delay; import cn.enjoyedu.model.OrderExp; /** *類說明:延時處理訂單的的接口 */ public interface IDelayOrder { /** * 進行延時處理的方法 * @param order 要進行延時處理的訂單 * @param expireTime 延時時長,單位秒 */ public void orderDelay(OrderExp order, long expireTime); }
3、DqMode
package cn.enjoyedu.service.delay.impl; import cn.enjoyedu.model.OrderExp; import cn.enjoyedu.service.busi.DlyOrderProcessor; import cn.enjoyedu.service.delay.IDelayOrder; import cn.enjoyedu.vo.ItemVo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.DelayQueue; /** *類說明:阻塞隊列的實現 */ @Service @Qualifier("dq") public class DqMode implements IDelayOrder { private Logger logger = LoggerFactory.getLogger(DqMode.class); @Autowired private DlyOrderProcessor processDelayOrder; private Thread takeOrder; private static DelayQueue<ItemVo<OrderExp>> delayOrder = new DelayQueue<ItemVo<OrderExp>>(); public void orderDelay(OrderExp order, long expireTime) { ItemVo<OrderExp> itemOrder = new ItemVo<OrderExp>(expireTime*1000,order); delayOrder.put(itemOrder); logger.info("訂單[超時時長:"+expireTime+"秒]被推入檢查隊列,訂單詳情:"+order); } private class TakeOrder implements Runnable{ private DlyOrderProcessor processDelayOrder; public TakeOrder(DlyOrderProcessor processDelayOrder) { super(); this.processDelayOrder = processDelayOrder; } public void run() { logger.info("處理到期訂單線程已經啟動!"); while(!Thread.currentThread().isInterrupted()) { try { ItemVo<OrderExp> itemOrder = delayOrder.take(); if (itemOrder!=null) { processDelayOrder.checkDelayOrder(itemOrder.getData()); } } catch (Exception e) { logger.error("The thread :",e); } } logger.info("處理到期訂單線程准備關閉......"); } } @PostConstruct public void init() { takeOrder = new Thread(new TakeOrder(processDelayOrder)); takeOrder.start(); } @PreDestroy public void close() { takeOrder.interrupt(); } }
4、ItemVo
package cn.enjoyedu.vo; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** *類說明:存放到延遲隊列的元素,對業務數據進行了包裝 */ public class ItemVo<T> implements Delayed{ //到期時間,但傳入的數值代表過期的時長,傳入單位毫秒 private long activeTime; private T data;//業務數據,泛型 public ItemVo(long activeTime, T data) { super(); this.activeTime = activeTime + System.currentTimeMillis(); this.data = data; } public long getActiveTime() { return activeTime; } public T getData() { return data; } /* * 這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。 */ public long getDelay(TimeUnit unit) { long d = unit.convert(this.activeTime - System.currentTimeMillis(), unit); return d; } /* *Delayed接口繼承了Comparable接口,按剩余時間排序,實際計算考慮精度為納秒數 */ public int compareTo(Delayed o) { long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } }
5、DlyOrderProcessor
package cn.enjoyedu.service.busi; import cn.enjoyedu.dao.OrderExpDao; import cn.enjoyedu.model.OrderExp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** *類說明:處理延期訂單的服務 */ @Service public class DlyOrderProcessor { private Logger logger = LoggerFactory.getLogger(DlyOrderProcessor.class); @Autowired private OrderExpDao orderExpDao; /**檢查數據庫中指定id的訂單的狀態,如果為未支付,則修改為已過期*/ public void checkDelayOrder(OrderExp record) { OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId()); if(dbOrder.getOrderStatus()==SaveOrder.UNPAY) { logger.info("訂單【"+record+"】未支付已過期,需要更改為過期訂單!"); orderExpDao.updateExpireOrder(record.getId()); }else { logger.info("已支付訂單【"+record+"】,無需修改!"); } } }
DelayQueue: 阻塞隊列(先進先出)
- 1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。
延遲期滿時才能從中提取元素(光隊列里有元素還不行)。
Delayed接口使對象成為延遲對象,它使存放在DelayQueue類中的對象具有了激活日期。該接口強制實現下列兩個方法。
- CompareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個方法。讓元素按激活日期排隊
- getDelay(TimeUnit unit):這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。
解決方案二-----MQ(activeMq)
1、修改SaveOrder
@Autowired //@Qualifier("dq") @Qualifier("mq") private IDelayOrder delayOrder;
2、ActiveMQ的延遲和定時投遞
修改配置文件(activemq.xml),增加延遲和定時投遞支持-----schedulerSupport="true"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
3、MqProducer
package cn.enjoyedu.service.mq; import cn.enjoyedu.model.OrderExp; import cn.enjoyedu.service.delay.IDelayOrder; import com.google.gson.Gson; import org.apache.activemq.ScheduledMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** *類說明:消息隊列的實現 */ @Service @Qualifier("mq") public class MqProducer implements IDelayOrder { private Logger logger = LoggerFactory.getLogger(MqProducer.class); @Autowired private JmsTemplate jmsTemplate; /** *類說明:創建消息的類 */ private static class CreateMessage implements MessageCreator{ private OrderExp order; private long expireTime; public CreateMessage(OrderExp order, long expireTime) { super(); this.order = order; this.expireTime = expireTime; } public Message createMessage(Session session) throws JMSException { Gson gson = new Gson(); String txtMsg = gson.toJson(order); Message message = session.createTextMessage(txtMsg); /** * 需要把幾個描述消息定時調度方式的參數作為屬性添加到消息,broker端的調度器就會按照我們想要的行為去處理消息。 * 一共有4個屬性 * 1:AMQ_SCHEDULED_DELAY :延遲投遞的時間 * 2:AMQ_SCHEDULED_PERIOD :重復投遞的時間間隔 * 3:AMQ_SCHEDULED_REPEAT:重復投遞次數 * 4:AMQ_SCHEDULED_CRON:Cron表達式 * ActiveMQ也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage,可以使用這個類來輔助設置 */ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, expireTime); return message; } } public void orderDelay(OrderExp order, long expireTime) { logger.info("訂單[超時時長:"+expireTime+"秒] 將被發送給消息隊列,詳情:"+order); jmsTemplate.send("order.delay", new CreateMessage(order,expireTime*1000)); } }
4、MqConsume
package cn.enjoyedu.service.mq; import cn.enjoyedu.model.OrderExp; import cn.enjoyedu.service.busi.DlyOrderProcessor; import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** *類說明:處理消息隊列返回的延時訂單 */ @Service public class MqConsume implements MessageListener { private Logger logger = LoggerFactory.getLogger(MqConsume.class); @Autowired private DlyOrderProcessor processDlyOrder; public void onMessage(Message message) { try { String txtMsg = ((TextMessage)message).getText(); logger.info("接收到消息隊列發出消息:"+txtMsg); Gson gson = new Gson(); OrderExp order = (OrderExp)gson.fromJson(txtMsg, OrderExp.class); processDlyOrder.checkDelayOrder(order); } catch (Exception e) { logger.error("處理消費異常!",e); } } }