已經說了四個並發隊列了,DelayQueue這是最后一個,這是一個無界阻塞延遲隊列,底層基於前面說過的PriorityBlockingQueue實現的 ,隊列中每個元素都有過期時間,當從隊列獲取元素時,只有過期元素才會出隊列,而隊列頭部的元素是過期最快的元素;
一.簡單使用
可以看到我們可以自己設置超時時間和優先級隊列中的比較規則,這樣我們在隊列中取的時候,按照最快超時的先出隊;
package com.example.demo.study; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import lombok.Data; public class Study0210 { @Data static class MyDelayed implements Delayed { private long delayTime;//該任務需要再隊列中的延遲的時候 private long expire;//這個時間表示當前時間和延遲時間相加,這里就叫做到期時間 private String taskName;//任務的名稱 public MyDelayed(long delayTime, String taskName) { this.delayTime = delayTime; this.taskName = taskName; this.expire = System.currentTimeMillis()+delayTime; } //指定優先級隊列里面的比較規則,就跟上篇博客中說的優先級隊列中說的比較器一樣 @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS)); } //這個方法表示該任務在隊列中還有多少剩余時間,也就是expire-當前時間 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire-System.currentTimeMillis(), TimeUnit.MILLISECONDS); } } public static void main(String[] args) throws InterruptedException { //創建延遲隊列 DelayQueue<MyDelayed> queue = new DelayQueue<MyDelayed>(); //創建任務丟到隊列中 Random random = new Random(); for (int i = 1; i < 11; i++) { MyDelayed myDelayed = new MyDelayed(random.nextInt(500),"task"+i); queue.add(myDelayed); } //獲取隊列中的任務,這里只會跟超時時間最小的有關,和入隊順序無關 MyDelayed myDelayed = queue.take(); while(myDelayed!=null) { System.out.println(myDelayed.toString()); myDelayed = queue.take(); } } }
二.基本組成
//由此可是這個隊列中存放的任務必須是Delayed類型的 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //獨占鎖 private final transient ReentrantLock lock = new ReentrantLock(); //優先級隊列 private final PriorityQueue<E> q = new PriorityQueue<E>(); //leader線程,實際上每次進行入隊和出隊操作的只能是leader線程,其余的都叫做fallower線程,這里會用到一個leader-follower模式 private Thread leader = null; //條件變量 private final Condition available = lock.newCondition(); //省略很多代碼 }
具體的繼承關系可以看看下面這個圖,實際操作的都是內部的PriorityQueue;
三.offer方法
上面代碼中我們雖然說調用的是add方法,其實就是調用的是offer方法;
public boolean offer(E e) { final ReentrantLock lock = this.lock; //獲取鎖 lock.lock(); try { //往優先級隊列中添加一個元素 q.offer(e); //注意,peek方法只是獲取優先級隊列中第一個元素,並不會刪除 //如果優先級隊列中取的元素就是和當前添加的元素一樣,說明當前元素就是達到過期要求的,於是設置leader線程為null //然后通知條件隊列中的線程優先級隊列中已經有元素了,可以過來取了 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { //釋放鎖 lock.unlock(); } }
四.take方法
獲取並移除隊列中達到超時時間要求的元素,如果隊列中沒有元素,就把當前線程丟到條件隊列中阻塞;
從下面的代碼邏輯中我們可以知道:線程分為兩種,一種是leader線程,一種是follower線程,其中leader線程只會阻塞一定的時間,follower線程會在條件隊列阻塞無限長的時間;當leader線程執行完take操作之后,就會重置leader線程為null,然后從條件隊列中拿一個出來設置為leader線程
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //獲取鎖,可中斷 lock.lockInterruptibly(); try { for (;;) { //這里先是嘗試從優先級隊列中獲取一下節點,獲取不到的話,說明當前優先級隊列為空,就阻塞當前線程 E first = q.peek(); if (first == null) available.await(); else { //如果優先級隊列中有元素,那么肯定能走到這里來,然后取到該元素的超時時間,如果小於0,說明已經達到要求了,可以獲取並刪除隊列中的元素 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting //如果leader隊列不為空,說明有其他線程正在執行take,於是就把當前線程放到條件隊列中 if (leader != null) available.await(); //到這里,說明優先級隊列中沒有元素到超時時間,而且此時沒有其他線程調用take方法,於是就把leader線程設置為當前線程, //然后當前leader線程就會等待一定的時間,等優先級隊列中最快超時的元素; //在等待的時候,leader線程會釋放鎖,這時其他線程B可以調用offer方法添加元素,線程C也可以調用take方法,然后線程C就會在 //上面這里阻塞無限長的時間,直到被喚醒 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { //當前線程阻塞一定時間之后,不管成功了沒有,都會把leader線程重置為null,然后重新循環 if (leader == thisThread) leader = null; } } } } //這里的意思就是當前線程移除元素成功之后,喚醒條件隊列中的線程去繼續從隊列中獲取元素 } finally { if (leader == null && q.peek() != null) available.signal(); //釋放鎖 lock.unlock(); } }
五.poll操作
獲取並移除隊頭過期元素,如果隊列為空,或者對頭元素沒有過超時時間就返回null
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //嘗試獲取隊頭元素,如果隊頭元素為空或者該延遲過期時間還沒到,就返回null E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else //否則就獲取並移除隊頭元素 return q.poll(); } finally { lock.unlock(); } }
六.總結
這個隊列其實很容易,主要的是有一個延遲時間,我們從優先級隊列中獲取的根節點首先會判斷有沒有過超時時間,有的話就移除並返回就好了,沒有的話,就看看還剩下多少時間才會超時(由於是優先級隊列,所以根節點一般就是最快超時時間的,當然,也可以修改優先級隊列的比較規則),於是當前線程就會等這個節點超時,此時leader等於當前線程,在等待的過程中,會釋放鎖,所以其他線程可以往隊列中添加元素,也可以獲取元素(但是由於此時leader!=null,這些線程會阻塞無限長時間直到被喚醒);
在leader線程超時時間到了之后自動喚醒,再進行一次循環,就會獲取並移除根節點了,最后再重置leader節點為null,順便喚醒條件隊列中的節點;