消息中間件(三)-----限時訂單的實現(delayQueue、mq)


限時訂單

應用場景

用支付寶購買電影票,搶到座位了,通常需要在15分鍾內付錢,否則訂單就會被取消。

 

解決方法一-----DelayQueue

思路

 

下訂單的時候,首先保存到數據庫,並同時將訂單數據保存到delayQueue中,開啟一個線程監控delayQueue,利用delayQueue的特性,先過期的數據會被take出來,若發現此時訂單未支付,那就是過期未支付,更改訂單狀態。 

代碼

1、SaveOrder

package cn.enjoyedu.service.busi;

import cn.enjoyedu.dao.OrderExpDao;
import cn.enjoyedu.model.OrderExp;
import cn.enjoyedu.service.delay.IDelayOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 *類說明:訂單相關的服務
 */
@Service
public class SaveOrder {
    
    private Logger logger = LoggerFactory.getLogger(SaveOrder.class);
    
    public final static short UNPAY = 0;
    public final static short PAYED = 1;
    public final static short EXPIRED = -1;
    
    @Autowired
    private OrderExpDao orderExpDao;
    
    @Autowired
    @Qualifier("dq")
    //@Qualifier("mq")
    private IDelayOrder delayOrder;

    /**
     * 接收前端頁面參數,生成訂單
     * @param orderNumber 訂單個數
     */
    public void insertOrders(int orderNumber){
        Random r = new Random();
        OrderExp orderExp ;
        for(int i=0;i<orderNumber;i++) {
            //訂單的超時時長,單位秒
            long expireTime = r.nextInt(20)+5;
            orderExp = new OrderExp();
            String orderNo = "DD00_"+expireTime+"S";
            orderExp.setOrderNo(orderNo);
            orderExp.setOrderNote("享學訂單——"+orderNo);
            orderExp.setOrderStatus(UNPAY);
            orderExpDao.insertDelayOrder(orderExp,expireTime);  
            logger.info("保存訂單到DB:"+orderNo);
            delayOrder.orderDelay(orderExp, expireTime);
        }
    }

    /**
     * 應用重啟帶來的問題:
     * 1、保存在Queue中的訂單會丟失,這些丟失的訂單會在什么時候過期,因為隊列里已經沒有這個訂單了,無法檢查了,這些訂單就得不到處理了。
     * 2、已過期的訂單不會被處理,在應用的重啟階段,可能會有一部分訂單過期,這部分過期未支付的訂單同樣也得不到處理,會一直放在數據庫里,
     * 過期未支付訂單所對應的資源比如電影票所對應的座位,就不能被釋放出來,讓別的用戶來購買。
     * 解決之道 :在系統啟動時另行處理
     */
    @PostConstruct
    public void initDelayOrder() {
        logger.info("系統啟動,掃描表中過期未支付的訂單並處理.........");
        int counts = orderExpDao.updateExpireOrders();
        logger.info("系統啟動,處理了表中["+counts+"]個過期未支付的訂單!");
        List<OrderExp> orderList = orderExpDao.selectUnPayOrders();
        logger.info("系統啟動,發現了表中還有["+orderList.size() +"]個未到期未支付的訂單!推入檢查隊列准備到期檢查....");
        for(OrderExp order:orderList) {
            long expireTime = order.getExpireTime().getTime()-(new Date().getTime());
            delayOrder.orderDelay(order, expireTime);
        }
    }
}

2、IDelayOrder

package cn.enjoyedu.service.delay;

import cn.enjoyedu.model.OrderExp;

/**
 *類說明:延時處理訂單的的接口
 */
public interface IDelayOrder {

    /**
     * 進行延時處理的方法
     * @param order 要進行延時處理的訂單
     * @param expireTime 延時時長,單位秒
     */
    public void orderDelay(OrderExp order, long expireTime);
}

3、DqMode

package cn.enjoyedu.service.delay.impl;

import cn.enjoyedu.model.OrderExp;
import cn.enjoyedu.service.busi.DlyOrderProcessor;
import cn.enjoyedu.service.delay.IDelayOrder;
import cn.enjoyedu.vo.ItemVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.DelayQueue;

/**
 *類說明:阻塞隊列的實現
 */
@Service
@Qualifier("dq")
public class DqMode implements IDelayOrder {
    
    private Logger logger = LoggerFactory.getLogger(DqMode.class);
    
    @Autowired
    private DlyOrderProcessor processDelayOrder;
    private Thread takeOrder;
    
    private static DelayQueue<ItemVo<OrderExp>> delayOrder = new DelayQueue<ItemVo<OrderExp>>();

    public void orderDelay(OrderExp order, long expireTime) {
        ItemVo<OrderExp> itemOrder = new ItemVo<OrderExp>(expireTime*1000,order);
        delayOrder.put(itemOrder);
        logger.info("訂單[超時時長:"+expireTime+"秒]被推入檢查隊列,訂單詳情:"+order);
    }
    
    private class TakeOrder implements Runnable{
        
        private DlyOrderProcessor processDelayOrder;

        public TakeOrder(DlyOrderProcessor processDelayOrder) {
            super();
            this.processDelayOrder = processDelayOrder;
        }

        public void run() {
            logger.info("處理到期訂單線程已經啟動!");
            while(!Thread.currentThread().isInterrupted()) {
                try {
                    ItemVo<OrderExp> itemOrder = delayOrder.take();
                    if (itemOrder!=null) {
                        processDelayOrder.checkDelayOrder(itemOrder.getData());
                    }
                } catch (Exception e) {
                    logger.error("The thread :",e);
                }
            }
            logger.info("處理到期訂單線程准備關閉......");
        }
    }
    
    @PostConstruct
    public void init() {
        takeOrder = new Thread(new TakeOrder(processDelayOrder));
        takeOrder.start();
    }
    
    @PreDestroy
    public void close() {
        takeOrder.interrupt();
    }
}

4、ItemVo

package cn.enjoyedu.vo;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 *類說明:存放到延遲隊列的元素,對業務數據進行了包裝
 */
public class ItemVo<T> implements Delayed{
    //到期時間,但傳入的數值代表過期的時長,傳入單位毫秒
    private long activeTime;
    private T data;//業務數據,泛型
    
    public ItemVo(long activeTime, T data) {
        super();
        this.activeTime = activeTime + System.currentTimeMillis();
        this.data = data;
    }

    public long getActiveTime() {
        return activeTime;
    }

    public T getData() {
        return data;
    }
    
    /*
     * 這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。
     */
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime - System.currentTimeMillis(), unit);
        return d;
    }

    /*
     *Delayed接口繼承了Comparable接口,按剩余時間排序,實際計算考慮精度為納秒數
     */
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
}

5、DlyOrderProcessor

package cn.enjoyedu.service.busi;

import cn.enjoyedu.dao.OrderExpDao;
import cn.enjoyedu.model.OrderExp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 *類說明:處理延期訂單的服務
 */
@Service
public class DlyOrderProcessor {
    private Logger logger = LoggerFactory.getLogger(DlyOrderProcessor.class);
    
    @Autowired
    private OrderExpDao orderExpDao;
    
    /**檢查數據庫中指定id的訂單的狀態,如果為未支付,則修改為已過期*/
    public void checkDelayOrder(OrderExp record) {
        OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId());
        if(dbOrder.getOrderStatus()==SaveOrder.UNPAY) {
            logger.info("訂單【"+record+"】未支付已過期,需要更改為過期訂單!");
            orderExpDao.updateExpireOrder(record.getId());
        }else {
            logger.info("已支付訂單【"+record+"】,無需修改!");
        }
    }
}

DelayQueue: 阻塞隊列(先進先出)

  • 1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。

延遲期滿時才能從中提取元素(光隊列里有元素還不行)。

Delayed接口使對象成為延遲對象,它使存放在DelayQueue類中的對象具有了激活日期。該接口強制實現下列兩個方法。

  • CompareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個方法。讓元素按激活日期排隊
  • getDelay(TimeUnit unit):這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。

解決方案二-----MQ(activeMq)

1、修改SaveOrder

    @Autowired
    //@Qualifier("dq")
    @Qualifier("mq")
    private IDelayOrder delayOrder;

2、ActiveMQ的延遲和定時投遞

修改配置文件(activemq.xml),增加延遲和定時投遞支持-----schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

3、MqProducer

package cn.enjoyedu.service.mq;

import cn.enjoyedu.model.OrderExp;
import cn.enjoyedu.service.delay.IDelayOrder;
import com.google.gson.Gson;
import org.apache.activemq.ScheduledMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 *類說明:消息隊列的實現
 */
@Service
@Qualifier("mq")
public class MqProducer implements IDelayOrder {
    
    private Logger logger = LoggerFactory.getLogger(MqProducer.class);
    
    @Autowired
    private JmsTemplate jmsTemplate;    
    
    /**
     *類說明:創建消息的類
     */
    private static class CreateMessage implements MessageCreator{
        
        private OrderExp order;
        private long expireTime;
        
        public CreateMessage(OrderExp order, long expireTime) {
            super();
            this.order = order;
            this.expireTime = expireTime;
        }

        public Message createMessage(Session session) throws JMSException {
            Gson gson = new Gson();
            String txtMsg = gson.toJson(order);
            Message message = session.createTextMessage(txtMsg);
            /**
             * 需要把幾個描述消息定時調度方式的參數作為屬性添加到消息,broker端的調度器就會按照我們想要的行為去處理消息。
             * 一共有4個屬性
             * 1:AMQ_SCHEDULED_DELAY :延遲投遞的時間
             * 2:AMQ_SCHEDULED_PERIOD :重復投遞的時間間隔
             * 3:AMQ_SCHEDULED_REPEAT:重復投遞次數
             * 4:AMQ_SCHEDULED_CRON:Cron表達式
             * ActiveMQ也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage,可以使用這個類來輔助設置
             */
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, expireTime);
            return message;
        }
    }

    public void orderDelay(OrderExp order, long expireTime) {
        logger.info("訂單[超時時長:"+expireTime+"秒] 將被發送給消息隊列,詳情:"+order);
        jmsTemplate.send("order.delay", new CreateMessage(order,expireTime*1000));
    }
}

4、MqConsume

package cn.enjoyedu.service.mq;

import cn.enjoyedu.model.OrderExp;
import cn.enjoyedu.service.busi.DlyOrderProcessor;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 *類說明:處理消息隊列返回的延時訂單
 */
@Service
public class MqConsume implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(MqConsume.class);
    
    @Autowired
    private DlyOrderProcessor processDlyOrder;
    
    public void onMessage(Message message) {
        try {
            String txtMsg = ((TextMessage)message).getText();
            logger.info("接收到消息隊列發出消息:"+txtMsg);
            Gson gson = new Gson();
            OrderExp order = (OrderExp)gson.fromJson(txtMsg, OrderExp.class);
            processDlyOrder.checkDelayOrder(order);
        } catch (Exception e) {
            logger.error("處理消費異常!",e);
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM