Java 延遲隊列使用


延時隊列,第一他是個隊列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務否則獲取不到……

應用場景比較多,比如延時1分鍾發短信,延時1分鍾再次執行等,下面先看看延時隊列demo之后再看延時隊列在項目中的使用:

簡單的延時隊列要有三部分:第一實現了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列,那下面就來看看延時隊列demo。

一、消息體

 

[java]  view plain  copy
 
  1. package com.delqueue;  
  2.   
  3. import java.util.concurrent.Delayed;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. /** 
  7.  * 消息體定義 實現Delayed接口就是實現兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… 
  8.  *  
  9.  * @author whd 
  10.  * @date 2017年9月24日 下午8:57:14 
  11.  */  
  12. public class Message implements Delayed {  
  13.     private int id;  
  14.     private String body; // 消息內容  
  15.     private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。  
  16.   
  17.     public int getId() {  
  18.         return id;  
  19.     }  
  20.   
  21.     public String getBody() {  
  22.         return body;  
  23.     }  
  24.   
  25.     public long getExcuteTime() {  
  26.         return excuteTime;  
  27.     }  
  28.   
  29.     public Message(int id, String body, long delayTime) {  
  30.         this.id = id;  
  31.         this.body = body;  
  32.         this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();  
  33.     }  
  34.   
  35.     // 自定義實現比較方法返回 1 0 -1三個參數  
  36.     @Override  
  37.     public int compareTo(Delayed delayed) {  
  38.         Message msg = (Message) delayed;  
  39.         return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1  
  40.                 : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);  
  41.     }  
  42.   
  43.     // 延遲任務是否到時就是按照這個方法判斷如果返回的是負數則說明到期否則還沒到期  
  44.     @Override  
  45.     public long getDelay(TimeUnit unit) {  
  46.         return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);  
  47.     }  
  48. }  


二、消息消費者

 

[java]  view plain  copy
 
  1. package com.delqueue;  
  2.   
  3. import java.util.concurrent.DelayQueue;  
  4.   
  5. public class Consumer implements Runnable {  
  6.     // 延時隊列 ,消費者從其中獲取消息進行消費  
  7.     private DelayQueue<Message> queue;  
  8.   
  9.     public Consumer(DelayQueue<Message> queue) {  
  10.         this.queue = queue;  
  11.     }  
  12.   
  13.     @Override  
  14.     public void run() {  
  15.         while (true) {  
  16.             try {  
  17.                 Message take = queue.take();  
  18.                 System.out.println("消費消息id:" + take.getId() + " 消息體:" + take.getBody());  
  19.             } catch (InterruptedException e) {  
  20.                 e.printStackTrace();  
  21.             }  
  22.         }  
  23.     }  
  24. }  


三、延時隊列

 

[java]  view plain  copy
 
  1. package com.delqueue;  
  2.   
  3. import java.util.concurrent.DelayQueue;  
  4. import java.util.concurrent.ExecutorService;  
  5. import java.util.concurrent.Executors;  
  6.   
  7. public class DelayQueueTest {  
  8.      public static void main(String[] args) {    
  9.             // 創建延時隊列    
  10.             DelayQueue<Message> queue = new DelayQueue<Message>();    
  11.             // 添加延時消息,m1 延時3s    
  12.             Message m1 = new Message(1, "world", 3000);    
  13.             // 添加延時消息,m2 延時10s    
  14.             Message m2 = new Message(2, "hello", 10000);    
  15.             //將延時消息放到延時隊列中  
  16.             queue.offer(m2);    
  17.             queue.offer(m1);    
  18.             // 啟動消費線程 消費添加到延時隊列中的消息,前提是任務到了延期時間   
  19.             ExecutorService exec = Executors.newFixedThreadPool(1);  
  20.             exec.execute(new Consumer(queue));  
  21.             exec.shutdown();  
  22.         }    
  23. }  


將消息體放入延遲隊列中,在啟動消費者線程去消費延遲隊列中的消息,如果延遲隊列中的消息到了延遲時間則可以從中取出消息否則無法取出消息也就無法消費。

這就是延遲隊列demo,下面我們來說說在真實環境下的使用。

使用場景描述:

在打車軟件中對訂單進行派單的流程,當有訂單的時候給該訂單篩選司機,然后給當訂單綁定司機,但是有時運氣沒那么好,訂單進來后第一次沒有篩選到合適的司機,但我們也不能就此結束派單,而是將該訂單的信息放到延時隊列中過個2秒鍾在進行一次,其實這個2秒鍾就是一個延遲,所以這里我們就可以使用延時隊列來實現……

下面看看簡單的流程圖:

 

下面來看看具體代碼實現:

在項目中有如下幾個類:第一 、任務類   第二、按照任務類組裝的消息體類  第三、延遲隊列管理類

任務類即執行篩選司機、綁單、push消息的任務類

 

[java]  view plain  copy
 
  1. package com.test.delayqueue;  
  2. /** 
  3.  * 具體執行相關業務的業務類 
  4.  * @author whd 
  5.  * @date 2017年9月25日 上午12:49:32 
  6.  */  
  7. public class DelayOrderWorker  implements Runnable {  
  8.   
  9.     @Override  
  10.     public void run() {  
  11.         // TODO Auto-generated method stub  
  12.         //相關業務邏輯處理  
  13.         System.out.println(Thread.currentThread().getName()+" do something ……");  
  14.     }  
  15. }  

 

消息體類,在延時隊列中這個實現了Delayed接口的消息類是比不可少的,實現接口時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的

這里定義的是一個泛型類,所以可以將我們上面的任務類作為其中的task,這樣就將任務類分裝成了一個消息體

 

[java]  view plain  copy
 
  1. package com.test.delayqueue;  
  2.   
  3. import java.util.concurrent.Delayed;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. /** 
  7.  * 延時隊列中的消息體將任務封裝為消息體 
  8.  *  
  9.  * @author whd 
  10.  * @date 2017年9月25日 上午12:48:30 
  11.  * @param <T> 
  12.  */  
  13. public class DelayOrderTask<T extends Runnable> implements Delayed {  
  14.     private final long time;  
  15.     private final T task; // 任務類,也就是之前定義的任務類  
  16.   
  17.     /** 
  18.      * @param timeout 
  19.      *            超時時間(秒) 
  20.      * @param task 
  21.      *            任務 
  22.      */  
  23.     public DelayOrderTask(long timeout, T task) {  
  24.         this.time = System.nanoTime() + timeout;  
  25.         this.task = task;  
  26.     }  
  27.   
  28.     @Override  
  29.     public int compareTo(Delayed o) {  
  30.         // TODO Auto-generated method stub  
  31.         DelayOrderTask other = (DelayOrderTask) o;  
  32.         long diff = time - other.time;  
  33.         if (diff > 0) {  
  34.             return 1;  
  35.         } else if (diff < 0) {  
  36.             return -1;  
  37.         } else {  
  38.             return 0;  
  39.         }  
  40.     }  
  41.   
  42.     @Override  
  43.     public long getDelay(TimeUnit unit) {  
  44.         // TODO Auto-generated method stub  
  45.         return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);  
  46.     }  
  47.   
  48.     @Override  
  49.     public int hashCode() {  
  50.         return task.hashCode();  
  51.     }  
  52.   
  53.     public T getTask() {  
  54.         return task;  
  55.     }  
  56. }  

 

延時隊列管理類,這個類主要就是將任務類封裝成消息並並添加到延時隊列中,以及輪詢延時隊列從中取出到時的消息體,在獲取任務類放到線程池中執行任務

 

[java]  view plain  copy
 
  1. package com.test.delayqueue;  
  2.   
  3. import java.util.Map;  
  4. import java.util.concurrent.DelayQueue;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.TimeUnit;  
  8. import java.util.concurrent.atomic.AtomicLong;  
  9.   
  10. /** 
  11.  * 延時隊列管理類,用來添加任務、執行任務 
  12.  *  
  13.  * @author whd 
  14.  * @date 2017年9月25日 上午12:44:59 
  15.  */  
  16. public class DelayOrderQueueManager {  
  17.     private final static int DEFAULT_THREAD_NUM = 5;  
  18.     private static int thread_num = DEFAULT_THREAD_NUM;  
  19.     // 固定大小線程池  
  20.     private ExecutorService executor;  
  21.     // 守護線程  
  22.     private Thread daemonThread;  
  23.     // 延時隊列  
  24.     private DelayQueue<DelayOrderTask<?>> delayQueue;  
  25.     private static final AtomicLong atomic = new AtomicLong(0);  
  26.     private final long n = 1;  
  27.     private static DelayOrderQueueManager instance = new DelayOrderQueueManager();  
  28.   
  29.     private DelayOrderQueueManager() {  
  30.         executor = Executors.newFixedThreadPool(thread_num);  
  31.         delayQueue = new DelayQueue<>();  
  32.         init();  
  33.     }  
  34.   
  35.     public static DelayOrderQueueManager getInstance() {  
  36.         return instance;  
  37.     }  
  38.   
  39.     /** 
  40.      * 初始化 
  41.      */  
  42.     public void init() {  
  43.         daemonThread = new Thread(() -> {  
  44.             execute();  
  45.         });  
  46.         daemonThread.setName("DelayQueueMonitor");  
  47.         daemonThread.start();  
  48.     }  
  49.   
  50.     private void execute() {  
  51.         while (true) {  
  52.             Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();  
  53.             System.out.println("當前存活線程數量:" + map.size());  
  54.             int taskNum = delayQueue.size();  
  55.             System.out.println("當前延時任務數量:" + taskNum);  
  56.             try {  
  57.                 // 從延時隊列中獲取任務  
  58.                 DelayOrderTask<?> delayOrderTask = delayQueue.take();  
  59.                 if (delayOrderTask != null) {  
  60.                     Runnable task = delayOrderTask.getTask();  
  61.                     if (null == task) {  
  62.                         continue;  
  63.                     }  
  64.                     // 提交到線程池執行task  
  65.                     executor.execute(task);  
  66.                 }  
  67.             } catch (Exception e) {  
  68.                 e.printStackTrace();  
  69.             }  
  70.         }  
  71.     }  
  72.   
  73.     /** 
  74.      * 添加任務 
  75.      *  
  76.      * @param task 
  77.      * @param time 
  78.      *            延時時間 
  79.      * @param unit 
  80.      *            時間單位 
  81.      */  
  82.     public void put(Runnable task, long time, TimeUnit unit) {  
  83.         // 獲取延時時間  
  84.         long timeout = TimeUnit.NANOSECONDS.convert(time, unit);  
  85.         // 將任務封裝成實現Delayed接口的消息體  
  86.         DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);  
  87.         // 將消息體放到延時隊列中  
  88.         delayQueue.put(delayOrder);  
  89.     }  
  90.   
  91.     /** 
  92.      * 刪除任務 
  93.      *  
  94.      * @param task 
  95.      * @return 
  96.      */  
  97.     public boolean removeTask(DelayOrderTask task) {  
  98.   
  99.         return delayQueue.remove(task);  
  100.     }  
  101. }  

 

測試類

 

[java]  view plain  copy
 
  1. package com.delqueue;  
  2.   
  3. import java.util.concurrent.TimeUnit;  
  4.   
  5. import com.test.delayqueue.DelayOrderQueueManager;  
  6. import com.test.delayqueue.DelayOrderWorker;  
  7.   
  8. public class Test {  
  9.     public static void main(String[] args) {  
  10.         DelayOrderWorker work1 = new DelayOrderWorker();// 任務1  
  11.         DelayOrderWorker work2 = new DelayOrderWorker();// 任務2  
  12.         DelayOrderWorker work3 = new DelayOrderWorker();// 任務3  
  13.         // 延遲隊列管理類,將任務轉化消息體並將消息體放入延遲對列中等待執行  
  14.         DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();  
  15.         manager.put(work1, 3000, TimeUnit.MILLISECONDS);  
  16.         manager.put(work2, 6000, TimeUnit.MILLISECONDS);  
  17.         manager.put(work3, 9000, TimeUnit.MILLISECONDS);  
  18.     }  
  19.   
  20. }  


OK 這就是項目中的具體使用情況,當然具體內容被忽略,整體框架就是這樣,還有這里使用java的延時隊列但是這種方式是有問題的如果如果down機則會出現任務丟失,所以也可以考慮使用mq、redis來實現

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM