DelayQueue是一個支持延時獲取元素的無界阻塞隊列。里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列里面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從隊列中取元素。
DelayQueue主要用於兩個方面:
- 緩存:清掉緩存中超時的緩存數據
- 任務超時處理
class DelayedEle implements Delayed { private final long delayTime; //延遲時間 private final long expire; //到期時間 private String data; //數據 public DelayedEle(long delay, String data) { delayTime = delay; this.data = data; expire = System.currentTimeMillis() + delay; } /** * 剩余時間=到期時間-當前時間 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 優先隊列里面優先級規則 */ @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delayTime); sb.append(", expire=").append(expire); sb.append(", data='").append(data).append('\''); sb.append('}'); return sb.toString(); } }
看了DelayQueue的內部結構就對上面幾個關鍵點一目了然了,但是這里有一點需要注意,DelayQueue的元素都必須繼承Delayed接口。同時也可以從這里初步理清楚DelayQueue內部實現的機制了:以支持優先級無界隊列的PriorityQueue作為一個容器,容器里面的元素都應該實現Delayed接口,在每次往優先級隊列中添加元素時以元素的過期時間作為排序條件,最先過期的元素放在優先級最高。
基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。
DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據為空的操作(消費者)才會被阻塞。
放的時候不用因為滿了而阻塞,取的時候空了會阻塞等待有,時間沒到也會阻塞等到,自己也可以指定超時時間。
只有一個getDelay(TimeUnit)方法,該方法返回與此對象相關的的剩余時間。同時我們看到Delayed接口繼承自Comperable接口,所以實現Delayed接口的類還必須要定義一個compareTo方法,該方法提供與此接口的getDelay方法一致的排序。
DelayQueue隊列中每個元素都有個過期時間,並且隊列是個優先級隊列,當從隊列獲取元素時候,只有過期元素才會出隊列。
q:優先隊列,用於存儲元素,並按優先級排序
leader:用於優化內部阻塞通知的線程,第一個阻塞等待的線程。
以支持優先級的PriorityQueue無界隊列作為一個容器,因為元素都必須實現Delayed接口,可以根據元素的過期時間來對元素進行排列,因此,先過期的元素會在隊首,每次從隊列里取出來都是最先要過期的元素。
leader是等待獲取隊列元素的線程,應用主從式設計減少不必要的等待。如果leader不等於空,表示已經有線程在等待獲取隊列的元素。所以,通過await()方法讓出當前線程等待信號。如果leader等於空,則把當前線程設置為leader,當一個線程為leader,它會使用awaitNanos()方法讓當前線程等待接收信號或等待delay時間。
private final PriorityQueue<E> q = new PriorityQueue<E>(); private final transient ReentrantLock lock = new ReentrantLock(); private Thread leader = null; private final Condition available = lock.newCondition(); public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //放一個元素 if (q.peek() == e) { //隊列的第0個元素是這個元素,放進去的元素成為第0個元素。最大堆第0和元素是最大的,依次是第23大,第4567大, leader = null; // leader 線程為空 available.signal(); //其他線程喚醒,如果隊首元素是剛插入的元素,則設置leader為null,並喚醒阻塞在available上的線程 } return true; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) //時間不到不走 return null; else return q.poll(); // 取出優先級隊列里面第0個元素,優先級隊列是根據時間排序了的 } finally { lock.unlock(); } }
線程阻塞地方,與代碼對應關系。
推論1:有leader線程等待的時候,新的線程要取,必然加入await隊列排隊。所有已經await的線程,喚醒了非leader線程就會繼續await。每次喚醒一個處於await的線程。同時之能一個線程取就是leader去取。
await釋放鎖別的線程可以執行,singal不會釋放鎖別的線程不執行。
推論2:把take(),put()方法全部鎖住,只能一個線程調用take或put。里面有await就不行了,await后別的線程就可以進來take,put方法,只要不是執行take里面的await方法,那么就只能一個線程執行take,put方法。
推論3:如果都不設置等待時間,那么就沒有優先級,只有等到放的時候 或者 成功取到的線程喚醒,都設置等待時間,時間到了一起搶。
推論4:取的時候為空等待,時間到了直接返回,有元素但是時間沒到,就設置leader。
推論5:等待取的線程,有一個是leader,就是由第一優先權的線程也是第一個嘗試獲取的線程。所有的等待取的線程中,下一次不一定是ledaer去取,如果喚醒了非leader線程,那也不行,要繼續等到,一定要leader線程喚醒后去取,其他線程才能去取,然后成為新的leader。
方法調用是根據線程隨機來的,一旦走進去方法就根據代碼來了。
public E take() throws InterruptedException { // 獲取全局獨占鎖,只能同時一個線程取或者放,喚醒只是喚醒一個線程。 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 獲取隊首元素 E first = q.peek(); // 隊首為空,則阻塞當前線程 if (first == null) available.await();//等待offer方法喚醒,再次循環去peek, else { // 獲取隊首元素的超時時間 long delay = first.getDelay(NANOSECONDS);//返回還需等待3秒 // 已超時,直接出隊 if (delay <= 0) return q.poll(); // 釋放first的引用,避免內存泄漏 first = null; //有元素,但是元素沒到時間 // leader != null表明有其他線程在等待,阻塞當前線程,自己不是第一個等待線程 if (leader != null) available.await(); //等待獲取了頭元素的線程喚醒,或者offer方法喚醒,再次循環去peek, else { //沒有其他線程在等待, leader指向當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 超時阻塞,只需要等3秒,醒了之后也要重新獲取。 available.awaitNanos(delay); } finally { // 釋放leader if (leader == thisThread) leader = null; } } } } } finally { // leader為null並且隊列不為空,說明沒有其他線程在等待,那就通知條件隊列 if (leader == null && q.peek() != null) available.signal(); // 釋放全局獨占鎖 lock.unlock(); } } 這里為什么如果不設置first = null,則會引起內存泄漏呢?線程A到達,列首元素沒有到期,設置leader = 線程A,這是線程B來了因為leader != null,則會阻塞,線程C一樣。假如線程阻塞完畢了,獲取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被線程B、線程C持有着,所以不會回收,這里只有兩個線程,如果有線程D、線程E…呢?這樣會無限期的不能回收,就會造成內存泄漏。
try里面有return,也要先執行finally在執行return。Condition.await(16,TimeUnit.SECONDS):睡醒之后,獲得了鎖就執行,沒有獲得鎖阻塞。睡的中間被人singnal了,可以醒來,有鎖執行,無鎖阻塞。