訂單付款倒計時實現方案


當使用 12306 搶票成功后,就會進入付款界面,這個時候就會出現一個訂單倒計時,下面我們就對付款倒計時的功能實現,進行深入學習和介紹,界面展示如下:

 如何實現付款及時呢,首先用戶下單后,存儲用戶的下單時間。下面介紹四種系統自動取消訂單的方案:

一、DelayQueue 延時無界阻塞隊列


我們的第一反應是用 數據庫輪序+任務調度 來實現此功能。但這種高效率的延遲任務用任務調度(定時器)實現就得不償失。而且對系統也是一種壓力且數據庫消耗極大。因此我們使用 Java 延遲隊列 DelayQueue 來實現,DelayQueue 是一個無界的延時阻塞隊列(BlockingQueue),用於存放實現了 Delayed 接口的對象,隊列中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,既隊頭對象的延遲到期時間最長。

 1 //加入delayQueue的對象,必須實現Delayed接口,同時實現如下:compareTo和GetDelay方法 
 2 static class DelayItem implements Delayed{
 3      //過期時間(單位:分鍾)
 4      private long expTime;
 5      private String orderCode;
 6         
 7      public DelayItem(String orderCode,long expTime,Date createTime) {
 8           super();
 9           this.orderCode=orderCode;
10           this.expTime=TimeUnit.MILLISECONDS.convert(expTime, TimeUnit.MINUTES)+createTime.getTime();
11      }
12      /**
13      * 用於延遲隊列內部比較排序,當前時間的延遲時間  -  比較對象的延遲時間
14      */
15      @Override
16      public int compareTo(Delayed o) {
17           return Long.valueOf(this.expTime).compareTo(Long.valueOf(((DelayItem)o).expTime));
18      }
19         
20      /**
21      * 獲得延遲時間,過期時間-當前時間(單位ms)
22      */
23      @Override
24      public long getDelay(TimeUnit unit) {
25           return this.expTime-System.currentTimeMillis();
26      }
27 }

將未付款的訂單都 add 到延遲隊列中,並通過線程池啟動多個線程不斷獲取延遲隊列的內容,獲取到后進行狀態的修改,進行業務邏輯處理。具體代碼如下:

 1 public class DelayQueueTest implements Runnable{
 2     //創建一個延遲隊列
 3     private    DelayQueue<Delayed> item = new DelayQueue<>();
 4 
 5     @Override
 6     public void run() {
 7          while(true) {
 8           try {
 9             //只有當到期了才會獲取到此對象
10             DelayItem delayed = (DelayItem) item.take();
11                 //獲取到之后修改狀態
12            } catch (InterruptedException e) {
13                     e.printStackTrace();
14            }
15          }
16     }
17 
18         //添加數據調用的方法
19     public void orderTimer(DelayItem delayItem) {
20         //向隊列匯總添加數據
21         item.add(delayItem);
22     }
23     
24     public static void main(String[] args) {
25           //創建一個線程池
26           ExecutorService executor = Executors.newCachedThreadPool();
27           //多線程執行程序
28           executor.execute(new DelayQueueTest());
29     }
30 }

這種方案的缺點【1】代碼復雜度較高,大量消息堆積,性能不能保證,且很容易觸發OOM。
【2】需要考慮分布式的實現、存在單點故障。

二、環形隊列


58同城架構沈劍提供一種基於時間輪的環形隊列算法,在他的分享中,一個高效延時消息,包含兩個重要的數據結構:
【1】環形隊列,例如可以創建一個包含3600個 slot 的環形隊列(本質是個數組)
【2】任務集合,環上每一個 slot 是一個 Set<Task>
同時,啟動一個 timer ,這個 timer 每隔一秒,在上述環形隊列中移動一格,有一個 Current Index 指針來標識正在檢測的 slot。環形隊列分為 3600 個長度,每秒移動一格,移動 3600 秒正好一個小時。比如一個任務需要在60秒后執行,那這個任務應該放在那個槽位的集合里呢?假設當前指針移動到 slot 的位置為2,那么60秒后的槽位就是62,所以數據應該放在索引為 62 的那個槽位圈數為0。如果這個任務要70分鍾,70*60+2=4202,4202-3600=602,減了一次3600,所以應該放在第二圈的602槽位,既放在隊列索引為602槽位的集合,且圈數為1,代表運行一圈后才執行這個任務。
這種方案效率高,任務觸發時間延遲時間比 delayQueue 低,代碼復雜度 delayQueue 低,但沒有公開源碼,不過通過次思路可以實現次組件,當然缺點和 delayQueue 相同。

三、使用 Redis 實現


通過 Redis ZSet 類型及操作命令實現一個延遲隊列,用時間戳(當前時間+延遲的分鍾數)作為元素的 score 存入ZSet。只需獲取 zset中的第一條記錄,即最早時間下單數據,如果該記錄未超時支付,剩下的訂單必然未超時。

 1 public class DelayQueueComponent {
 2     private final static String delayQueueKey = "delay:queue";
 3 
 4     @Autowired
 5     private RedisService redisService;
 6 
 7     // 將延遲對象推送至隊列中
 8     public void add(Object obj, long seconds) {
 9         this.redisService.zadd(delayQueueKey, obj, getDelayTimeMills(seconds));
10     }
11 
12     public void startMonitor() {
13         Runnable runnable = new Runnable() {
14             @Override
15             public void run() {
16                 monitorQueue();
17             }
18         };
19         System.out.println("start monitor delay queue.");
20         new Thread(runnable).start();
21         System.out.println("finish start monitor delay queue.");
22     }
23 
24     private void monitorQueue() {
25         while(true) {
26             if(lock()) {
27                 //從延遲隊列中拿一個最舊的
28                 TypedTuple<Object> tuple = this.redisService.zrangeFirst(delayQueueKey);
29                 // isCanPush 判斷是否延遲
30                 if(isCanPush(tuple)) {
31                     //刪除掉處理的延遲消息
32                     this.redisService.zremFirst(delayQueueKey);
33                     //釋放鎖
34                     releaseLock();
35                 }else {
36                     releaseLock();
37                 }
38             }
39             sleep();
40         }
41     }
42 
43      // 是否可推送
44     private boolean isCanPush(TypedTuple<Object> tuple) {
45         if(tuple == null) {
46             return false;
47         }
48         long currentTimeMills = System.currentTimeMillis();
49         //當前時間小於延遲時間時,獲取對象進行業務邏輯處理
50         if(currentTimeMills >= tuple.getScore()) {
51             return true;
52         }
53         return false;
54     }
55 }

這種方案的缺點:【1】消息處理失敗,不能恢復處理。
【2】數據量大時,zset 性能有問題,多定義幾個 zset,增加了內存和定時器去讀的復雜度。

四、RabbitMQ 實現


利用 RabbitMQ做延時隊列是比較常見的一種方式,而實際上 RabbitMQ自身並沒有直接支持提供延遲隊列功能,而是通過 RabbitMQ 消息隊列的 TTLDLX這兩個屬性間接實現的。

Time To Live(TTL):消息的存活時間,RabbitMQ可以通過 x-message-tt參數來設置指定Queue(隊列)和 Message(消息)上消息的存活時間,它的值是一個非負整數,單位為微秒

RabbitMQ 可以從兩種維度設置消息過期時間,分別是隊列和消息本身:
【1】設置隊列過期時間,那么隊列中所有消息都具有相同的過期時間。
【2】設置消息過期時間,對隊列中的某一條消息設置過期時間,每條消息TTL都可以不同。
如果同時設置隊列和隊列中消息的TTL,則TTL值以兩者中較小的值為准。而隊列中的消息存在隊列中的時間,一旦超過TTL過期時間則成為Dead Letter(死信)。

隊列出現 Dead Letter的情況有:
【1】消息或者隊列的TTL過期;
【2】隊列達到最大長度;
【3】消息被消費端拒絕(basic.reject or basic.nack);

應用場景:一般應用在當正常業務處理時出現異常時,將消息拒絕則會進入到死信隊列中,有助於統計異常數據並做后續處理;重試隊列在重試16次(默認次數)將消息放入死信隊列。利用 RabbitMQ 的死信隊列(Dead-Letter-Exchage)機制實現,在 queueDeclare 方法中加入 “x-dead-letter-exchage”實現:

RabbitMQ的 Queue可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。
x-dead-letter-exchage:過期消息路由轉發(轉發器類型)
x-dead-letter-routing-key:當消息達到過期時間由該 exchange 安裝配置的 x-dead-letter-routing-key 轉發到指定隊列,最后被消費者消費

下邊結合一張圖看看如何實現超30分鍾未支付關單功能,我們將訂單消息A0001發送到延遲隊列order.delay.queue,並設置x-message-tt消息存活時間為30分鍾,當到達30分鍾后訂單消息A0001成為了Dead Letter(死信),延遲隊列檢測到有死信,通過配置x-dead-letter-exchange,將死信重新轉發到能正常消費的隊列,直接監聽隊列處理關閉訂單邏輯即可。 

我們需要兩個隊列,一個用來做主隊列,真正的投遞消息;另一個用來延遲處理消息。

1 channel.queueDeclare("MAIN_QUEUE",true,false,false,null);
2 channel.queueBind("MAIN_QUEUE","amq.direct","MAIN_QUEUE");
3 
4 HashMap<String,Object> arguments = new HashMap<String,Object>();
5 arguments.put("x-dead-letter-exchange","amq.direct");
6 arguments.put("x-dead-letter-routing-key","MAIN_QUEUE");
7 
8 channel.queueDeclare("DELAY_QUEUE",true,false,false,arguments); 

放入延遲消息(DeliveryMode 等於 2 說明這個消息是 persistent 的):

1 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
2 AMQP.BasicProperties properties = builder.expiration(
3                     String.valueOf(task.getDelayMillis())).deliveryMode(2).build();
4 channel.basicPublish("","DELAY_QUEUE",properties,SerializationUtils.serialize(task));            

這種方案的缺點:【1】筆者之前做 MQ 性能測試時,在公司的服務器上單機 TPS 接近 3W,如果是中小型企業級應用基本滿足。但如果大量的消息積壓得不到投遞,性能仍然是個問題。
【2】依賴於 RabbitMQ 的運維,復雜度和成本提高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM