我們知道線程池運行時,會不斷從任務隊列中獲取任務,然后執行任務。如果我們想實現延時或者定時執行任務,重要一點就是任務隊列會根據任務延時時間的不同進行排序,延時時間越短地就排在隊列的前面,先被獲取執行。
隊列是先進先出的數據結構,就是先進入隊列的數據,先被獲取。但是有一種特殊的隊列叫做優先級隊列,它會對插入的數據進行優先級排序,保證優先級越高的數據首先被獲取,與數據的插入順序無關。
實現優先級隊列高效常用的一種方式就是使用堆。
什么是堆?
堆通常是一個可以被看做一棵樹的數組對象。
堆(heap)又被為優先隊列(priority queue)。盡管名為優先隊列,但堆並不是隊列。
因為隊列中允許的操作是先進先出(FIFO),在隊尾插入元素,在隊頭取出元素。
而堆雖然在堆底插入元素,在堆頂取出元素,但是堆中元素的排列不是按照到來的先后順序,而是按照一定的優先順序排列的。
這里來說明一下滿二叉樹的概念與完全二叉樹的概念。
滿二叉樹
除了葉子節點,所有的節點的左右孩子都不為空,就是一棵滿二叉樹,如下圖。
可以看出:滿二叉樹所有的節點都擁有左孩子,又擁有右孩子。
完全二叉樹
不一定是一個滿二叉樹,但它不滿的那部分一定在右下側,如下圖
堆總是滿足下列性質:
-
堆中某個節點的值總是不大於或不小於其父節點的值;
-
堆總是一棵完全二叉樹。
- 最大值時,稱為“最大堆”,也稱大頂堆;
- 最小值時,稱為“最小堆”,也稱小頂堆。
堆的實現
堆是一個二叉樹,但是它最簡單的方式是通過數組去實現二叉樹,而且因為堆是一個完全二叉樹,就不存在數組空間的浪費。怎么使用數組來存儲二叉樹呢?
就是用數組的下標來模擬二叉樹的各個節點,比如說根節點就是0,第一層的左節點是1,右節點是2。由此我們可以得出下列公式:
1 // 對於n位置的節點來說: 2 int left = 2 * n + 1; // 左子節點 3 int right = 2 * n + 2; // 右子節點 4 int parent = (n - 1) / 2; // 父節點,當然n要大於0,根節點是沒有父節點的
對於堆來說,只有兩個操作,插入insert和刪除remove,不管插入還是刪除保證堆的成立條件,1.是完全二叉樹,2.父節點的值不能小於子節點的值。
最大堆的插入(ADD)
1 public void insert(int value) { 2 // 第一步將插入的值,直接放在最后一個位置。並將長度加一 3 store[size++] = value; 4 // 得到新插入值所在位置。 5 int index = size - 1; 6 while(index > 0) { 7 // 它的父節點位置坐標 8 int parentIndex = (index - 1) / 2; 9 // 如果父節點的值小於子節點的值,你不滿足堆的條件,那么就交換值 10 if (store[index] > store[parentIndex]) { 11 swap(store, index, parentIndex); 12 index = parentIndex; 13 } else { 14 // 否則表示這條路徑上的值已經滿足降序,跳出循環 15 break; 16 } 17 } 18 }
主要步驟:
-
直接將value插入到size位置,並將size自增,這樣store數組中插入一個值了。
-
要保證從這個葉節點到根節點這條路徑上的節點,滿足父節點的值不能小於子節點。
-
通過int parentIndex = (index - 1) / 2得到父節點,如果比父節點值大,那么兩者位置的值交換,然后再拿這個父節點和它的父父節點比較。
直到這個節點值比父節點值小,或者這個節點已經是根節點就退出循環。
因為每次循環index都是除以2這種倍數遞減的方式,所以它最多循環次數是(log N)次。
最大堆的刪除(DELETE)
1 public int remove() { 2 // 將根的值記錄,最后返回 3 int result = store[0]; 4 // 將最后位置的值放到根節點位置 5 store[0] = store[--size]; 6 int index = 0; 7 // 通過循環,保證父節點的值不能小於子節點。 8 while(true) { 9 int leftIndex = 2 * index + 1; // 左子節點 10 int rightIndex = 2 * index + 2; // 右子節點 11 // leftIndex >= size 表示這個子節點還沒有值。 12 if (leftIndex >= size) break; 13 int maxIndex = leftIndex; 14 //找到左右節點中較大的一個節點 15 if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex; 16 //與子節點中較大的子節點比較,如果子節點更大,則交換位置 17 //為什么要與較大的子節點比較呢?如果和較小的節點比較,沒有交換位置,但有可能比較大的節點小 18 if (store[index] < store[maxIndex]) { 19 swap(store, index, maxIndex); 20 index = maxIndex; 21 } else { 22 //滿足子節點比當前節點小,退出循環 23 break; 24 } 25 } 26 //返回最開始的第一個值 27 return result; 28 }
在堆中最大值就在根節點,所以操作步驟:
-
將根節點的值保存到result中。
-
將最后節點的值移動到根節點,再將長度減一,這樣滿足堆成立第一個條件,堆是一個完全二叉樹。
-
使用循環,來滿足堆成立的第二個條件,父節點的值不能小於子節點的值。
-
最后返回result。
每次循環我們都是以2的倍數遞增,所以它也是最多循環次數是(log N)次。
所以通過堆這種方式可以快速實現優先級隊列,它的插入和刪除操作的效率都是O(log N)。
那么怎么實現堆排序?這個很簡單,利用優先隊列的特性:
- 先遍歷數組。將數組中的值依次插入到堆中。
- 然后再用一個循環將值從堆中取出來。
1 private static void headSort(int[] arr) { 2 int size = arr.length; 3 Head head = new Head(size); 4 for (int i = 0; i < size; i++) { 5 head.insert(arr[i]); 6 } 7 for (int i = 0; i < size; i++) { 8 // 實現從大到小的排序 9 arr[size - 1 - i] = head.remove(); 10 } 11 }
堆排序的效率:因為每次插入數據效率是O(log N),而我們需要進行n次循環,將數組中每個值插入到堆中,所以它的執行時間是O(N * log N)級。
DelayedWorkQueue類
1 static class DelayedWorkQueue extends AbstractQueue<Runnable> 2 implements BlockingQueue<Runnable> {
從定義中看出DelayedWorkQueue是一個阻塞隊列。並且DelayedWorkQueue是一個最小堆,最頂點的值最小,即堆中某個節點的值總是不小於其父節點的值。
屬性
1 // 初始時,數組長度大小。 2 private static final int INITIAL_CAPACITY = 16; 3 // 使用數組來儲存隊列中的元素。 4 private RunnableScheduledFuture<?>[] queue = 5 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 6 // 使用lock來保證多線程並發安全問題。 7 private final ReentrantLock lock = new ReentrantLock(); 8 // 隊列中儲存元素的大小 9 private int size = 0; 10 11 //特指隊列頭任務所在線程 12 private Thread leader = null; 13 14 // 當隊列頭的任務延時時間到了,或者有新的任務變成隊列頭時,用來喚醒等待線程 15 private final Condition available = lock.newCondition();
DelayedWorkQueue是用數組來儲存隊列中的元素,那么我們看看它是怎么實現優先級隊列的。
插入元素方法
1 public void put(Runnable e) { 2 offer(e); 3 } 4 5 public boolean add(Runnable e) { 6 return offer(e); 7 } 8 9 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 10 return offer(e); 11 }
我們發現與普通阻塞隊列相比,這三個添加方法都是調用offer方法。那是因為它沒有隊列已滿的條件,也就是說可以不斷地向DelayedWorkQueue添加元素,當元素個數超過數組長度時,會進行數組擴容。
1 public boolean offer(Runnable x) { 2 if (x == null) 3 throw new NullPointerException(); 4 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 5 // 使用lock保證並發操作安全 6 final ReentrantLock lock = this.lock; 7 lock.lock(); 8 try { 9 int i = size; 10 // 如果要超過數組長度,就要進行數組擴容 11 if (i >= queue.length) 12 // 數組擴容 13 grow(); 14 // 將隊列中元素個數加一 15 size = i + 1; 16 // 如果是第一個元素,那么就不需要排序,直接賦值就行了 17 if (i == 0) { 18 queue[0] = e; 19 setIndex(e, 0); 20 } else { 21 // 調用siftUp方法,使插入的元素變得有序。 22 siftUp(i, e); 23 } 24 // 表示新插入的元素是隊列頭,更換了隊列頭, 25 // 那么就要喚醒正在等待獲取任務的線程。 26 if (queue[0] == e) { 27 leader = null; 28 // 喚醒正在等待等待獲取任務的線程 29 available.signal(); 30 } 31 } finally { 32 lock.unlock(); 33 } 34 return true; 35 }
數組擴容方法:
1 private void grow() { 2 int oldCapacity = queue.length; 3 // 每次擴容增加原來數組的一半數量。 4 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 5 if (newCapacity < 0) // overflow 6 newCapacity = Integer.MAX_VALUE; 7 // 使用Arrays.copyOf來復制一個新數組 8 queue = Arrays.copyOf(queue, newCapacity); 9 }
插入元素排序siftUp方法:
1 private void siftUp(int k, RunnableScheduledFuture<?> key) { 2 // 當k==0時,就到了堆二叉樹的根節點了,跳出循環 3 while (k > 0) { 4 // 父節點位置坐標, 相當於(k - 1) / 2 5 int parent = (k - 1) >>> 1; 6 // 獲取父節點位置元素 7 RunnableScheduledFuture<?> e = queue[parent]; 8 // 如果key元素大於父節點位置元素,滿足條件,那么跳出循環 9 // 因為是從小到大排序的。 10 if (key.compareTo(e) >= 0) 11 break; 12 // 否則就將父節點元素存放到k位置 13 queue[k] = e; 14 // 這個只有當元素是ScheduledFutureTask對象實例才有用,用來快速取消任務。 15 setIndex(e, k); 16 // 重新賦值k,尋找元素key應該插入到堆二叉樹的那個節點 17 k = parent; 18 } 19 // 循環結束,k就是元素key應該插入的節點位置 20 queue[k] = key; 21 setIndex(key, k); 22 }
主要是三步:
- 元素個數超過數組長度,就會調用grow()方法,進行數組擴容。
- 將新元素e添加到優先級隊列中對應的位置,通過siftUp方法,保證按照元素的優先級排序。
- 如果新插入的元素是隊列頭,即更換了隊列頭,那么就要喚醒正在等待獲取任務的線程。這些線程可能是因為原隊列頭元素的延時時間沒到,而等待的。

假設現有元素 5 需要插入,為了維持完全二叉樹的特性,新插入的元素一定是放在結點 6 的右子樹;同時為了滿足任一結點的值要小於左右子樹的值這一特性,新插入的元素要和其父結點作比較,如果比父結點小,就要把父結點拉下來頂替當前結點的位置,自己則依次不斷向上尋找,找到比自己大的父結點就拉下來,直到沒有符合條件的值為止。
動畫講解:
在這里先將元素 5 插入到末尾,即放在結點 6 的右子樹。
然后與父類比較, 6 > 5 ,父類數字大於子類數字,子類與父類交換。
重復此操作,直到不發生替換。
立即獲取隊列頭元素
1 public RunnableScheduledFuture<?> poll() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 RunnableScheduledFuture<?> first = queue[0]; 6 // 隊列頭任務是null,或者任務延時時間沒有到,都返回null 7 if (first == null || first.getDelay(NANOSECONDS) > 0) 8 return null; 9 else 10 // 移除隊列頭元素 11 return finishPoll(first); 12 } finally { 13 lock.unlock(); 14 } 15 }
1 public long getDelay(TimeUnit unit) { 2 return unit.convert(time - now(), NANOSECONDS); 3 }
當隊列頭任務是null,或者任務延時時間沒有到,表示這個任務還不能返回,因此直接返回null。否則調用finishPoll方法,移除隊列頭元素並返回。
1 // 移除隊列頭元素 2 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 3 // 將隊列中元素個數減一 4 int s = --size; 5 // 獲取隊列末尾元素x 6 RunnableScheduledFuture<?> x = queue[s]; 7 // 原隊列末尾元素設置為null 8 queue[s] = null; 9 if (s != 0) 10 // 將隊列最后一個元素移動到對列頭元素位置,然后向下排序 11 // 因為移除了隊列頭元素,所以進行重新排序。 12 siftDown(0, x); 13 setIndex(f, -1); 14 return f; 15 }
這個方法與我們在第一節中,介紹堆的刪除方法一樣。
- 先將隊列中元素個數減一。
- 將原隊列末尾元素設置成隊列頭元素,再將隊列末尾元素設置為null。
- 調用siftDown(0, x)方法,保證按照元素的優先級排序。
移除元素排序siftDown方法:
1 private void siftDown(int k, RunnableScheduledFuture<?> key) { 2 int half = size >>> 1; 3 // 通過循環,保證父節點的值不能大於子節點。 4 while (k < half) { 5 // 左子節點, 相當於 (k * 2) + 1 6 int child = (k << 1) + 1; 7 // 左子節點位置元素 8 RunnableScheduledFuture<?> c = queue[child]; 9 // 右子節點, 相當於 (k * 2) + 2 10 int right = child + 1; 11 // 如果左子節點元素值大於右子節點元素值,那么右子節點才是較小值的子節點。 12 // 就要將c與child值重新賦值 13 if (right < size && c.compareTo(queue[right]) > 0) 14 c = queue[child = right]; 15 // 如果父節點元素值小於較小的子節點元素值,那么就跳出循環 16 if (key.compareTo(c) <= 0) 17 break; 18 // 否則,父節點元素就要和子節點進行交換 19 queue[k] = c; 20 setIndex(c, k); 21 k = child; 22 } 23 // 循環結束,k就是元素key應該插入的節點位置 24 queue[k] = key; 25 setIndex(key, k); 26 }
我們來看看動畫
核心點:將最后一個元素填充到堆頂,然后不斷的下沉這個元素。
假設要從節點 1 ,也可以稱為取出節點 1 ,為了維持完全二叉樹的特性 ,我們將最后一個元素 6 去替代這個 1 ;然后比較 1 和其子樹的大小關系,如果比左右子樹大(如果存在的話),就要從左右子樹中找一個較小的值替換它,而它能自己就要跑到對應子樹的位置,再次循環這種操作,直到沒有子樹比它小。
通過這樣的操作,堆依然是堆,總結一下:
- 找到要刪除的節點(取出的節點)在數組中的位置
- 用數組中最后一個元素替代這個位置的元素
- 當前位置和其左右子樹比較,保證符合最小堆的節點間規則
- 刪除最后一個元素
等待獲取隊列頭元素
1 public RunnableScheduledFuture<?> take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 for (;;) { 6 RunnableScheduledFuture<?> first = queue[0]; 7 // 如果沒有任務,就讓線程在available條件下等待。 8 if (first == null) 9 available.await(); 10 else { 11 // 獲取任務的剩余延時時間 12 long delay = first.getDelay(NANOSECONDS); 13 // 如果延時時間到了,就返回這個任務,用來執行。 14 if (delay <= 0) 15 return finishPoll(first); 16 // 將first設置為null,當線程等待時,不持有first的引用 17 first = null; // don't retain ref while waiting 18 19 // 如果還是原來那個等待隊列頭任務的線程, 20 // 說明隊列頭任務的延時時間還沒有到,繼續等待。 21 if (leader != null) 22 available.await(); 23 else { 24 // 記錄一下當前等待隊列頭任務的線程 25 Thread thisThread = Thread.currentThread(); 26 leader = thisThread; 27 try { 28 // 當任務的延時時間到了時,能夠自動超時喚醒。 29 available.awaitNanos(delay); 30 } finally { 31 if (leader == thisThread) 32 leader = null; 33 } 34 } 35 } 36 } 37 } finally { 38 if (leader == null && queue[0] != null) 39 // 喚醒等待任務的線程 40 available.signal(); 41 lock.unlock(); 42 } 43 }
如果隊列中沒有任務,那么就讓當前線程在available條件下等待。如果隊列頭任務的剩余延時時間delay大於0,那么就讓當前線程在available條件下等待delay時間。
超時等待獲取隊列頭元素
1 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 long nanos = unit.toNanos(timeout); 4 final ReentrantLock lock = this.lock; 5 lock.lockInterruptibly(); 6 try { 7 for (;;) { 8 RunnableScheduledFuture<?> first = queue[0]; 9 // 如果沒有任務。 10 if (first == null) { 11 // 超時時間已到,那么就直接返回null 12 if (nanos <= 0) 13 return null; 14 else 15 // 否則就讓線程在available條件下等待nanos時間 16 nanos = available.awaitNanos(nanos); 17 } else { 18 // 獲取任務的剩余延時時間 19 long delay = first.getDelay(NANOSECONDS); 20 // 如果延時時間到了,就返回這個任務,用來執行。 21 if (delay <= 0) 22 return finishPoll(first); 23 // 如果超時時間已到,那么就直接返回null 24 if (nanos <= 0) 25 return null; 26 // 將first設置為null,當線程等待時,不持有first的引用 27 first = null; // don't retain ref while waiting 28 // 如果超時時間小於任務的剩余延時時間,那么就有可能獲取不到任務。 29 // 在這里讓線程等待超時時間nanos 30 if (nanos < delay || leader != null) 31 nanos = available.awaitNanos(nanos); 32 else { 33 Thread thisThread = Thread.currentThread(); 34 leader = thisThread; 35 try { 36 // 當任務的延時時間到了時,能夠自動超時喚醒。 37 long timeLeft = available.awaitNanos(delay); 38 // 計算剩余的超時時間 39 nanos -= delay - timeLeft; 40 } finally { 41 if (leader == thisThread) 42 leader = null; 43 } 44 } 45 } 46 } 47 } finally { 48 if (leader == null && queue[0] != null) 49 // 喚醒等待任務的線程 50 available.signal(); 51 lock.unlock(); 52 } 53 }
與take方法相比較,就要考慮設置的超時時間,如果超時時間到了,還沒有獲取到有用任務,那么就返回null。其他的與take方法中邏輯一樣。
總結
使用優先級隊列DelayedWorkQueue,保證添加到隊列中的任務,會按照任務的延時時間進行排序,延時時間少的任務首先被獲取。