Java同步數據結構之DelayQueue/DelayedWorkQueue


前言

前面介紹了優先級隊列PriorityBlockingQueue,順帶也說了一下PriorityQueue,兩者的實現方式是一模一樣的,都是采用基於數組的平衡二叉堆實現,不論入隊的順序怎么樣,take、poll出隊的節點都是按優先級排序的。但是PriorityBlockingQueue/PriorityQueue隊列中的所有元素並不是在入隊之后就已經全部按優先級排好序了,而是只保證head節點即隊列的首個元素是當前最小或者說最高優先級的,其它節點的順序並不保證是按優先級排序的,PriorityBlockingQueue/PriorityQueue隊列只會在通過take、poll取走head之后才會再次決出新的最小或者說最高優先級的節點作為新的head,其它節點的順序依然不保證。所以通過peek拿到的head節點就是當前隊列中最高優先級的節點。

明白了優先級隊列的原理要理解DelayQueue就非常簡單,因為DelayQueue就是基於PriorityQueue實現的,DelayQueue隊列實際上就是將隊列元素保存到內部的一個PriorityQueue實例中的(所以也不支持插入null值),DelayQueue只專注於實現隊列元素的延時出隊。

延遲隊列DelayQueue是一個無界阻塞隊列,它的隊列元素只能在該元素的延遲已經結束或者說過期才能被出隊。它怎么判斷一個元素的延遲是否結束呢,原來DelayQueue隊列元素必須是實現了Delayed接口的實例,該接口有一個getDelay方法需要實現,延遲隊列就是通過實時的調用元素的該方法來判斷當前元素是否延遲已經結束。

既然DelayQueue是基於優先級隊列來實現的,那肯定元素也要實現Comparable接口,沒錯因為Delayed接口繼承了Comparable接口,所以實現Delayed的隊列元素也必須要實現Comparable的compareTo方法。延遲隊列就是以時間作為比較基准的優先級隊列,這個時間即延遲時間,這個時間大都在構造元素的時候就已經設置好,隨着程序的運行時間的推移,隊列元素的延遲時間逐步到期,DelayQueue就能夠基於延遲時間運用優先級隊列並配合getDelay方法達到延遲隊列中的元素在延遲結束時精准出隊。

Delayed接口

1 public interface Delayed extends Comparable<Delayed> {
2 
3      //以指定的時間單位unit返回此對象的剩余延遲
4     // 返回值 小於等於0 表示延遲已經結束
5      long getDelay(TimeUnit unit);
6 }

 放入DelayQueue隊列的元素必須實現Delayed接口,getDelay方法用於查看當前對象的延遲剩余時間,返回值是以參數指定的unit為單位的數字(unit可以是秒,分鍾等),返回值小於等於0就表示該元素延遲結束。注意:該接口的實現類必須要實現compareTo方法,且compareTo方法提供與getDelay方法一致的排序,也就是說compareTo要基於getDelay方法的返回值來實現比較。

DelayQueue

現來看看它的成員變量:

 1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
 2     implements BlockingQueue<E> {
 3 
 4     //非公平鎖
 5     private final transient ReentrantLock lock = new ReentrantLock();
 6     private final PriorityQueue<E> q = new PriorityQueue<E>(); //優先級隊列
 7 
 8 /* Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves 
 9  * to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. 
10  * The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. 
11  * Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, 
12  * and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting.
13 
14  * 指定用於等待隊列頭部的元素的線程。這種Leader-Follower模式的變體(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)可以減少不必要的定時等待。
15  * 當一個線程成為leader時,它只等待下一個延遲過期,而其他線程則無限期地等待。leader線程必須在從take()或poll(…)返回之前向其他線程發出信號,除非其他線程在此期間成為leader。
16  * 每當隊列的頭部被具有更早過期時間的元素替換時,leader字段就會通過重置為null而無效,並且會通知等待的線程(不一定是當前的leader)的信號。
17  * 因此,等待線程必須准備好在等待時獲得和失去領導權。
18  */
19 private Thread leader = null;
20 
21 /* Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.
22  * 當隊列頭部的新元素可用或新線程可能需要成為leader時發出的條件。
23  */
24 private final Condition available = lock.newCondition();
View Code

  DelayQueue的成員很簡單,一個非公平鎖以及Condition、一個優先級隊列,以及一個leader線程。這個leader線程很關鍵,它上面的Java Doc描述很長,其實就已經把DelayQueue的內部實現說的很清楚了。簡單來說,DelayQueue使用了所謂的Leader-Follower模式的變體來減少消費線程不必要的定時等待,由於優先級隊列中的head就是整個隊列中最先要延遲結束的元素,其它元素的延遲結束時間都比它長,所以就讓獲取head元素的線程只等待它延時剩余的時間,而讓其它消費線程無限期等待,當獲取head元素的線程結束等待取走head之后再喚醒其它消費線程,那些消費線程又會因為拿到新的head而產生一個新的leader,這就是Leader-Follower模式,只等待指定時間的獲取head元素的線程就叫leader,而其它所有調用take的消費線程都是Follower。當然如果在leader等待的過程中有延遲時間更短的元素入隊,根據優先級隊列的平衡二叉堆實現,它必然會被排到原來的head之前,成為新的head,這時候原來的leader線程可能就會失去leader的領導權,誰被非公平鎖喚醒拿到新的head誰就是新的leader。

DelayQueue只有兩個構造方法,一個無參,一個用指定的集合元素初始化延遲隊列,源碼很簡單就不貼了。直接看DelayQueue的入隊邏輯。

入隊offer

DelayQueue的入隊方法都是調用的offer實現,直接看offer就可以了:

 1 public boolean offer(E e) {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         q.offer(e); //直接PriorityQueue入隊
 6         if (q.peek() == e) { //如果當前是第一個入隊的元素或者以前的head元素被當前元素替代了
 7             leader = null; //剝奪leader的領導權,由於是非公平鎖,喚醒的不一定是leader線程,所以需要重置為null
 8             available.signal();//喚醒leader,更換head
 9         }
10         return true;
11     } finally {
12         lock.unlock();
13     }
14 }

 入隊操作很簡單,直接調用的優先級隊列的offer入隊,如果是①第一個入隊的元素或者是②當前時刻當前元素比原來的head具有更短的剩余延遲時間,那么是①的話需要喚醒因為隊列空而阻塞的消費線程,是②的話需要剝奪當前leader的領導權,並隨機喚醒(非公平鎖)一個消費線程(如果有的話)成為新的leader。這里有一個問題,怎么就能判斷當前入隊的元素比head具有更短的延遲時間呢,因為head的延遲時間已經過去了一部分,其實這就需要在實現元素的compareTo方法時要根據getDelay方法返回的實時延遲剩余時間來作比較,這樣優先級隊列在通過siftUp冒泡確定新入隊元素的位置的時候就能精確的把握實時的延遲剩余時間從而找到自己正確的位置。

出隊take

DelayQueue的精髓就在出隊方法的實現了,因為要保證只讓延遲結束的元素才能出隊:

 1 /**
 2  * Retrieves and removes the head of this queue, waiting if necessary
 3  * until an element with an expired delay is available on this queue.
 4  * 檢索並刪除此隊列的頭部,如果需要,將一直等待,直到該隊列上具有延遲結束的元素為止。 
 5  * @return the head of this queue
 6  * @throws InterruptedException {@inheritDoc}
 7  */
 8 public E take() throws InterruptedException {
 9     final ReentrantLock lock = this.lock;
10     lock.lockInterruptibly();
11     try {
12         for (;;) { //注意是循環
13             E first = q.peek(); //獲取但不移除head
14             if (first == null) //隊列為空,等待生產者offer完喚醒
15                 available.await(); 
16             else {
17                 long delay = first.getDelay(NANOSECONDS);
18                 if (delay <= 0) //head元素已經過期
19                     return q.poll();
20                 first = null; // don't retain ref while waiting 等待時不要持有引用
21                 if (leader != null) //已經有leader在等待了,無限期等待leader醒來通知
22                     available.await();
23                 else {
24                     // 當前線程成為leader
25                     Thread thisThread = Thread.currentThread();
26                     leader = thisThread;
27                     try {
28                         available.awaitNanos(delay); //等待heade元素剩余的延遲時間結束
29                     } finally {
30                         if (leader == thisThread)
31                             leader = null;    //交出leader權限
32                     }
33                 }
34             }
35         }
36     } finally {
37         if (leader == null && q.peek() != null) //隊列不為空,並且沒有leader
38             available.signal(); //喚醒其它可能等待的消費線程
39         lock.unlock();
40     }
41 }

出隊的邏輯就是Leader-Follower模式的實現,Leader只等待head剩余的延遲時間(28行),而Follower無限期等待(22行)Leader成功取走head之后來喚醒(finally的部分),如果由於offer入隊更短剩余延遲時間的元素導致leader失去領導權,非公平鎖喚醒的將可能是無限期等待的Follower,也可能是原來的Leader,由於offer重置了leader為null,所以被喚醒的線程能夠立即拿走head返回(如果head已經延遲結束)或者重新成為leader(如果head還沒有延遲結束)。

DelayQueue的其余方法例如立即返回的poll和指定超時時間的poll方法邏輯簡單或者與take的實現原理一致就不作分析了,有個比較特殊的方法就是drainTo,它只會轉移當前所有延遲已經結束的元素。peek方法返回的head不判斷是否延遲結束只表示當前剩余延遲時間最少的元素,size方法返回隊列中所有元素的個數包括延遲沒有結束的。

DelayQueue的迭代器與PriorityBlockingQueue的迭代器實現一模一樣,都是通過toArray將原隊列數組元素拷貝到新的數組進行遍歷,不會與原隊列同步更新,但是remove可以刪除原隊列中的指定元素,而且迭代時不區分元素是否延遲結束。

DelayQueue的可拆分迭代器繼承自Collection接口的默認迭代器實現IteratorSpliterator,和ArrayBlockingQueue一樣,IteratorSpliterator的特性就是頂層迭代器實際上調用的是隊列本身的Iterator迭代器實現,拆分后的迭代器按是數組下標方式的迭代,拆分按一半的原則進行,因此DelayQueue的可拆分迭代器也是脫離隊列源數據的,不會隨着隊列變化同步更新。更多關於IteratorSpliterator的其它特性請到ArrayBlockingQueue章節中查看。

應用實例

 1 package com.Queue;
 2 
 3 import java.util.concurrent.DelayQueue;
 4 import java.util.concurrent.Delayed;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 public class DelayQueueTest {
 8 
 9 
10     public static void main(String[] args) throws Exception {
11         DelayQueue<DelayTask> dq = new DelayQueue();
12 
13         //入隊四個元素,注意它們的延遲時間單位不一樣。
14         dq.offer(new DelayTask(5, TimeUnit.SECONDS));
15         dq.offer(new DelayTask(2, TimeUnit.MINUTES));
16         dq.offer(new DelayTask(700, TimeUnit.MILLISECONDS));
17         dq.offer(new DelayTask(1000, TimeUnit.NANOSECONDS));
18 
19         while(dq.size() > 0){
20             System.out.println(dq.take());
21         }
22 
23         /*
24         打印順序:
25         DelayTask{delay=1000, unit=NANOSECONDS}
26         DelayTask{delay=700000000, unit=MILLISECONDS}
27         DelayTask{delay=5000000000, unit=SECONDS}
28         DelayTask{delay=120000000000, unit=MINUTES}
29         */
30     }
31 }
32 
33 class DelayTask implements Delayed {
34 
35     private long delay; //延遲多少納秒開始執行
36     private TimeUnit unit;
37 
38     public DelayTask(long delay, TimeUnit unit){
39         this.unit = unit;
40         this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);//統一轉換成納秒計數
41     }
42 
43     @Override
44     public long getDelay(TimeUnit unit) {//延遲剩余時間,單位unit指定
45         return unit.convert(delay - System.currentTimeMillis(), TimeUnit.NANOSECONDS);
46     }
47 
48     @Override
49     public int compareTo(Delayed o) {//基於getDelay實時延遲剩余時間進行比較
50         if(this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS)) //都換算成納秒計算
51             return -1;
52         else if(this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS)) //都換算成納秒計算
53             return 1;
54         else
55             return 0;
56     }
57 
58     @Override
59     public String toString() {
60         return "DelayTask{" +
61                 "delay=" + delay +
62                 ", unit=" + unit +
63                 '}';
64     }
65 }
View Code

 假設有四個延遲任務,分別需要延遲不同的時間開始執行,上例中最終打印出的結果是按延遲剩余時間從小到大排列的。

DelayedWorkQueue

順便把ScheduledThreadPoolExecutor的內部類DelayedWorkQueue也說一下把,它也是一種無界延遲阻塞隊列,它主要用於線程池定時或周期任務的使用,關於線程池和定時任務在后面的章節介紹,本文僅限於分析DelayedWorkQueue。從DelayQueue的特性很容易想到它適合定時任務的實現,所以Java並發包中調度定時任務的線程池隊列是基於這種實現的,它就是DelayedWorkQueue,為什么不直接使用DelayQueue而要重新實現一個DelayedWorkQueue呢,可能是了方便在實現過程中加入一些擴展。放入該延遲隊列中的元素是特殊的,例如DelayedWorkQueue中放的元素是線程運行時代碼RunnableScheduledFuture。

 1 //A ScheduledFuture that is Runnable. Successful execution of the run method causes completion of the Future and allows access to its results.
 2 //run方法的成功執行將導致Future的完成並允許訪問其結果。
 3 public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
 4 
 5     //如果此任務是周期性的,則返回true。定期任務可以根據某些計划重新運行。非周期任務只能運行一次。
 6     boolean isPeriodic();
 7 }
 8 
 9 //A delayed result-bearing action that can be cancelled.  一種可以取消的延遲產生結果的動作。
10 //Usually a scheduled future is the result of scheduling a task with a ScheduledExecutorService.
11 //通常,ScheduledFuture是ScheduledExecutorService調度任務的結果。
12 public interface ScheduledFuture<V> extends Delayed, Future<V> {
13         //繼承了Delayed接口
14 }
View Code

RunnableScheduledFuture接口繼承了ScheduledFuture接口,ScheduledFuture接口繼承了Delayed接口。

DelayedWorkQueue的實現沒有像DelayQueue那樣直接借助優先級隊列來實現,而是重寫了相關的邏輯但是實現的算法還是基於數組的平衡二叉堆實現,並且糅合了DelayQueue中實現延遲時間結束元素才能出隊的Leader-Follower模式。可以說,DelayedWorkQueue = 優先級隊列實現 + 延遲隊列實現。理解DelayedWorkQueue之前請先理解PriorityBlockingQueue和DelayQueue,然后理解DelayedWorkQueue不費吹灰之力。

 1 /**
 2      * Specialized delay queue. To mesh with TPE declarations, this
 3      * class must be declared as a BlockingQueue<Runnable> even though
 4      * it can only hold RunnableScheduledFutures.
 5      */
 6     static class DelayedWorkQueue extends AbstractQueue<Runnable>
 7         implements BlockingQueue<Runnable> {
 8 
 9         /*
10          * A DelayedWorkQueue is based on a heap-based data structure
11          * like those in DelayQueue and PriorityQueue, except that
12          * every ScheduledFutureTask also records its index into the
13          * heap array. This eliminates the need to find a task upon
14          * cancellation, greatly speeding up removal (down from O(n)
15          * to O(log n)), and reducing garbage retention that would
16          * otherwise occur by waiting for the element to rise to top
17          * before clearing. But because the queue may also hold
18          * RunnableScheduledFutures that are not ScheduledFutureTasks,
19          * we are not guaranteed to have such indices available, in
20          * which case we fall back to linear search. (We expect that
21          * most tasks will not be decorated, and that the faster cases
22          * will be much more common.)
23          *
24          * All heap operations must record index changes -- mainly
25          * within siftUp and siftDown. Upon removal, a task's
26          * heapIndex is set to -1. Note that ScheduledFutureTasks can
27          * appear at most once in the queue (this need not be true for
28          * other kinds of tasks or work queues), so are uniquely
29          * identified by heapIndex.
30          */
31 
32         private static final int INITIAL_CAPACITY = 16;
33         private RunnableScheduledFuture<?>[] queue =
34             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
35         private final ReentrantLock lock = new ReentrantLock();
36         private int size = 0;
37 
38         /**
39          * Thread designated to wait for the task at the head of the
40          * queue.  This variant of the Leader-Follower pattern
41          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
42          * minimize unnecessary timed waiting.  When a thread becomes
43          * the leader, it waits only for the next delay to elapse, but
44          * other threads await indefinitely.  The leader thread must
45          * signal some other thread before returning from take() or
46          * poll(...), unless some other thread becomes leader in the
47          * interim.  Whenever the head of the queue is replaced with a
48          * task with an earlier expiration time, the leader field is
49          * invalidated by being reset to null, and some waiting
50          * thread, but not necessarily the current leader, is
51          * signalled.  So waiting threads must be prepared to acquire
52          * and lose leadership while waiting.
53          */
54         private Thread leader = null;
55 
56         /**
57          * Condition signalled when a newer task becomes available at the
58          * head of the queue or a new thread may need to become leader.
59          */
60         private final Condition available = lock.newCondition();
View Code

看到沒有,類似PriorityBlockingQueue的初始化容量為16的類型為RunnableScheduledFuture的數組queue,和DelayQueue一樣的非公平鎖ReentrantLock和Condition,以及特有的leader線程。不同的是DelayedWorkQueue的數組元素是繼承了Delayed接口的RunnableScheduledFuture接口的實現類, 

入隊offer

 1 public boolean offer(Runnable x) {
 2     if (x == null)
 3         throw new NullPointerException(); //不允許插入null值
 4     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 5     final ReentrantLock lock = this.lock;
 6     lock.lock();
 7     try {
 8         int i = size;
 9         if (i >= queue.length)
10             grow();  //類似優先級隊列PriorityBlockingQueue的擴容
11         size = i + 1;
12         if (i == 0) { //隊列為空直接放在第一個位置
13             queue[0] = e;
14             setIndex(e, 0);//這是線程池定時任務特有的邏輯
15         } else {
16             siftUp(i, e); //類似優先級隊列PriorityBlockingQueue的冒泡方式插入元素
17         }
18         if (queue[0] == e) { //類似延遲隊列DelayQueue的消費線程喚醒與leader剝奪
19             leader = null;
20             available.signal();
21         }
22     } finally {
23         lock.unlock();
24     }
25     return true;
26 }
View Code

入隊的邏輯綜合了PriorityBlockingQueue的平衡二叉堆冒泡插入以及DelayQueue的消費線程喚醒與leader領導權剝奪。只有setIndex方法是特有的,該方法記錄了元素在數組中的索引下標(在其他出隊方法中,會將出隊的元素的索引下標置為-1,表示已經不在隊列中了),為了方便實現快速查找。它的擴容方法grow比優先級隊列的實現簡單粗暴多了,在持有鎖的情況下每次擴容50%。siftUp與PriorityBlockingQueue的siftUpXXX方法一模一樣,也只是多了一個特有的setIndex方法的調用。

出隊take

 1 public RunnableScheduledFuture<?> take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         for (;;) {
 6             RunnableScheduledFuture<?> first = queue[0];//獲取不移除head
 7             if (first == null) //同DelayQueue一樣,隊列為空,等待生產者offer完喚醒
 8                 available.await();
 9             else {
10                 long delay = first.getDelay(NANOSECONDS);
11                 if (delay <= 0)  //head元素已經過期
12                     return finishPoll(first);
13                 first = null; // don't retain ref while waiting
14                 if (leader != null) //已經有leader在等待了,無限期等待leader醒來通知
15                     available.await();
16                 else { // 當前線程成為leader
17                     Thread thisThread = Thread.currentThread();
18                     leader = thisThread;
19                     try {
20                         available.awaitNanos(delay); //等待heade元素剩余的延遲時間結束
21                     } finally {
22                         if (leader == thisThread)
23                             leader = null;  //交出leader權限
24                     }
25                 }
26             }
27         }
28     } finally {
29         if (leader == null && queue[0] != null) //隊列不為空,並且沒有leader
30             available.signal(); //喚醒其它可能等待的消費線程
31         lock.unlock();
32     }
33 }
34 
35 //類似優先級隊列PriorityBlockingQueue的出隊邏輯
36 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
37     int s = --size;
38     RunnableScheduledFuture<?> x = queue[s];
39     queue[s] = null;
40     if (s != 0)
41         siftDown(0, x); //類似PriorityBlockingQueue的拿最后一個元素從head向下降級來確定位置
42     setIndex(f, -1);
43     return f;
44 }
View Code

出隊的邏輯一樣綜合了PriorityBlockingQueue的平衡二叉堆向下降級以及DelayQueue的Leader-Follower線程等待喚醒模式,就不細說了。只有在finishPoll方法中,會將已經出隊的RunnableScheduledFuture元素的索引下標通過setIndex設置成-1.

其它種種方法:remove、toArray、size、contains、put、drainTo(只轉移延遲結束的)、peek、add、poll、clear都和DelayQueue的實現大同小異。

DelayedWorkQueue的迭代器也是同DelayQueue一樣,迭代的是脫離源隊列的拷貝數組,但是可以通過remove刪除源隊列中的對象。而且迭代器不區分元素是否延遲結束。

總結

DelayQueue延遲隊列只允許放入實現了Delayed接口的實例,它是優先級隊列針對計時的一種運用,所以它是基於優先級隊列 + Leader-Follower的線程等待模式,只允許取出延遲結束的隊列元素。獲取head的線程(往往是第一個消費線程)由於head是整個隊列中最先延遲結束的元素,所以線程只等待特定的剩余的延遲時間它即是leader,而其他后來的消費線程則無限期等待即follower,直到leader取走head時隨機喚醒一個follower使其拿到新的head變成新的leader或者新入隊了一個剩余延遲時間更短的元素導致leader失去領導權也會隨機喚醒一個線程成為新的leader,如此往復等待喚醒。

 

至於DelayedWorkQueue也是一種設計為定時任務的延遲隊列,它的實現和DelayQueue一樣,不過是將優先級隊列和DelayQueue的實現過程遷移到本身方法體中,從而可以在該過程當中靈活的加入定時任務特有的方法調用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM