如果讀者還有一點印象,我們在實現線程池時,用了隊列這種數據結構來存儲接收到的任務,在多線程環境中阻塞隊列是一種非常有用的隊列,在介紹BlockingQueue之前,我們先解釋一下Queue接口。
Queue接口
boolean offer(E e); 將指定的元素插入此隊列,當使用有容量限制的隊列時,此方法通常要優於add(E),果該元素已添加到此隊列,則返回true;否則返回false
E peek(); 獲取但不移除此隊列的頭元素;如果此隊列為空,則返回 null。
E poll(); 獲取並移除此隊列的頭元素,如果此隊列為空,則返回 null。
boolean add(E e); 將指定的元素插入此隊列,在成功時返回 true,如果當前沒有可用的空間,則拋出 IllegalStateException
E element(); 獲取,但是不移除此隊列的第一個元素。此方法與 peek 唯一的不同在於:此隊列為空時將拋出NoSuchElementException異常
E remove(); 獲取,並移除此隊列的第一個元素。此方法與 poll唯一的不同在於:此隊列為空時將拋出NoSuchElementException異常
AbstractQueue是Queue實現的抽象類,實現了Queue的大部分函數,只有三個函數offer、peek和poll沒有實現。
BlockingQueue接口
BlockingQueue接口繼承了Queue接口,它還定義了下面一些操作:
void put(E e) throws InterruptedException; 將指定元素插入此隊列中,將等待可用的空間,當等待時可以被中斷。
E take() throws InterruptedException; 獲取並移除此隊列的頭元素,在元素變得可用之前一直等待,當等待時,也可以被中斷。
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 將指定元素插入此隊列中,在到達指定的等待時間前等待可用的空間,timeout為等待時間,如果成功,則返回 true;如果在空間可用前超過了指定的等待時間,則返回 false,當等待時可以被中斷。
E poll(long timeout, TimeUnit unit) throws InterruptedException; 獲取並移除此隊列的頭元素,可以在指定的等待時間前等待可用的元素,timeout表明放棄之前要等待的時間長度,用 unit 的時間單位表示,如果在元素可用前超過了指定的等待時間,則返回null,當等待時可以被中斷。
int remainingCapacity(); BlockingQueue是限定容量的,它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。
int drainTo(Collection<? super E> c); 移除此隊列中所有可用的元素,並將它們添加到給定collection 中。此操作可能比反復輪詢此隊列更有效。在試圖向 collection c 中添加元素沒有成功時,可能導致在拋出相關異常時。如果試圖將一個隊列放入自身隊列中,則會導致 IllegalArgumentException異常。此外,如果正在進行此操作時修改指定的 collection,則此操作行為是不確定的
int drainTo(Collection<? super E> c, int maxElements); 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定collection中,別的行為與上面int drainTo(Collection<? super E> c)一樣。
BlockingQueue方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:
|
拋出異常 |
特殊值 |
阻塞 |
超時 |
插入 |
Add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
刪除 |
Remove() |
poll() |
take() |
poll(time, unit) |
檢查 |
Element() |
peek() |
不可用 |
不可用 |
下面面依次介紹幾種典型的阻塞隊列。
ArrayBlockingQueue
ArrayBlockingQueue是一個以數組為存儲空間的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素,隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致阻塞。
此類支持對等待的生產者線程和消費者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
內部實現
private final E[] items;//存儲數組,一旦設定,不能改變
private int takeIndex;//檢查或輸出元素的位置
private int putIndex;//添加元素的位置
private int count;//元素的個數
private final ReentrantLock lock;//鎖
private final Condition notEmpty;//隊列空,采用的信號量
private final Condition notFull;//隊列滿,采用的信號量
ArrayBlockingQueue構造函數
ArrayBlockingQueue也提供了幾個構造函數,我們看其中幾個:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Capacity參數表明隊列最多能存儲的元素
Fair參數是否創建可重入公平鎖
ArrayBlockingQueue的構造函數,首先創建一個存儲緩沖區,然后創建一把鎖好兩個條件變量。
ArrayBlockingQueue的offer函數
Offer函數通常都是先加鎖,然后判斷隊列是否為滿,如果不滿,直接插入到隊尾,否則可能等待,或者返回false,或者拋異常,最后釋放鎖。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
if (count == items.length)
return false;//滿時,直接返回false
else {
insert(e);//添加到隊尾
return true;
}
} finally {
lock.unlock();//解鎖
}
}
帶有等待時間的offer函數通常會調用條件變量的awaitNanos函數
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中斷加鎖
try {
for (;;) {
if (count != items.length) {
insert(e);//可以在隊尾插入元素
return true;
}
if (nanos <= 0)
return false; //反正時間到,直接返回false
try {
nanos = notFull.awaitNanos(nanos);//等待,知道返回為<=0
} catch (InterruptedException ie) {
notFull.signal(); // 中斷是,喚醒別他線程
throw ie;
}
}
} finally {
lock.unlock();
}
}
ArrayBlockingQueue的peek函數
Peek函數比較簡單,先加鎖,再判斷是否為空,空時,放回null,否則,返回頭元素。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}
ArrayBlockingQueue的poll函數
Poll有兩個函數,一個不帶參數,另外一個帶等待時間參數,不帶參數的poll比較簡單,如下面的源代碼
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;//空時,返回null
E x = extract();//操作隊列的頭部,返回頭部
return x;
} finally {
lock.unlock();
}
}
帶等待時間參數poll函數,跟帶有時間參數的offer函數,復雜度差不多,都是調用條件變量的等待函數,如下面的源代碼:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); //
throw ie;
}
}
} finally {
lock.unlock();
}
}
ArrayBlockingQueue需要用戶事先知道隊列的大小,而且容量需要設定,因此在使用時,需要特別注意這一點,下面介紹的LinkedBlockingQueue是個沒有界限的阻塞隊列,應用的場景比界限的ArrayBlockingQueue多。
LinkedBlockingQueue
LinkedBlockingQueue是一個基於已鏈接節點的、范圍任意的blocking queue的實現。 此隊列按 FIFO(先進先出)排序元素,隊列的頭部是在隊列中時間最長的元素,隊列的尾部 是在隊列中時間最短的元素。 新元素插入到隊列的尾部,並且隊列檢索操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列。
內部實現
LinkedBlockingQueue定義了Node節點存儲數據元素,Node就是一個普通的鏈表節點
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
LinkedBlockingQueue使用下面的數據變量來實現阻塞隊列
private final int capacity;//隊列的最大容量
private final AtomicInteger count = new AtomicInteger(0);//隊列大小
private transient Node<E> head;//隊列頭
private transient Node<E> last;//隊列尾
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue中讀只操作隊頭,而寫只操作隊尾,因此巧妙地采用了兩把鎖,對put和offer采用putLock,對take和poll采用takeLock,避免了讀寫時相互競爭鎖的現象,因此LinkedBlockingQueue在高並發讀寫操作都多的情況下,性能會比ArrayBlockingQueue好很多,在遍歷以及刪除元素時則要鎖住兩把鎖。
LinkedBlockingQueue構造函數
LinkedBlockingQueue提供了幾個構造函數,我們這里列舉其中兩個:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//默認容量上限為 Integer.MAX_VALUE
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
LinkedBlockingQueue的offer函數
先看不帶時間參數的offer,它先判斷隊列是否滿,空時返回false,再添加元素,判斷隊列未添加之前,是否為空,如果是,喚醒需要取得線程。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//e==null
final AtomicInteger count = this.count;
if (count.get() == capacity)//隊里滿
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {//添加到隊列
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)//沒有滿時
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)//隊列空時,喚醒取線程
signalNotEmpty();
return c >= 0;
}
帶時間參數的offer,邏輯上與上面的offer函數相似,多采用了一個while循環,在循環里,等待指定的時間。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
LinkedBlockingQueue的peek函數
Peek函數在加鎖后,直接取頭元素,請看下面的代碼
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
LinkedBlockingQueue的poll函數
poll函數與offer函數恰好相反,代碼邏輯相反
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
帶有等待時間的poll代碼采用等待指定的時間的await函數,代碼很容易懂。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
LinkedBlockingDeque(雙向並發阻塞隊列)
LinkedBlockingDeque是雙向並發阻塞隊列,所謂雙向是指可以從隊列的頭和尾同時操作,並發只是線程安全的實現,阻塞允許在入隊出隊不滿足條件時掛起線程,這里說的隊列是指支持FIFO/FILO實現的鏈表。
LinkedBlockingDeque實現了BlockingDeque接口,我們看了BlockingDeque中的幾個方法,就能明白LinkedBlockingDeque的功能
boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException;
E pollFirst(long timeout, TimeUnit unit) throws InterruptedException;
E pollLast(long timeout, TimeUnit unit) throws InterruptedException;
我們再來看一下它的成員變量:
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x, Node<E> p, Node<E> n) {
item = x;
prev = p;
next = n;
}
}
transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
查看這些數據結構,可以得到以下結論:
(1)要想支持阻塞功能,隊列的容量一定是固定的,否則無法在入隊的時候掛起線程。也就是capacity是final類型的。
(2)既然是雙向鏈表,每一個結點就需要前后兩個引用,這樣才能將所有元素串聯起來,支持雙向遍歷,也即prev/next兩個引用。
(3)雙向鏈表需要頭尾同時操作,需要first/last兩個節點。
(4)既然要支持阻塞功能,就需要鎖和條件變量來掛起線程。這里使用一個鎖兩個條件變量來完成此功能。
前面分析linkedBlockingQueue實現后,LinkedBlockingDeque的原理和實現就不值得一提了,無非是在獨占鎖下對一個鏈表的普通操作,我們這里就不分析源代碼了。
LinkedBlockingDeque優點當然是功能足夠強大,同時由於采用一個獨占鎖,因此實現起來也比較簡單。所有對隊列的操作都加鎖就可以完成。同時獨占鎖也能夠很好的支持雙向阻塞的特性。缺點就是由於獨占鎖,所以不能同時進行兩個操作,這樣性能上就大打折扣。從性能的角度講LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,這在高並發情況下就比較明顯了。
PriorityBlockingQueue(優先阻塞隊列)
PriorityBlockingQueue是個一個無界的阻塞隊列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞檢索的操作。雖然此隊列邏輯上是無界的,但是由於資源被耗盡,所以試圖執行添加操作可能會失敗(導致 OutOfMemoryError),此類不允許使用 null 元素。
PriorityBlockingQueue的實現比較簡單,我們稍微看看,就能了解它是怎么實現的,因為它是無價的,不需要notFull這個條件變量
private final PriorityQueue<E> q;
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition notEmpty = lock.newCondition();
offer、peek和poll函數也相當簡單,這里也貼出它們的代碼,
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean ok = q.offer(e);
assert ok;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.poll();
} finally {
lock.unlock();
}
}
SynchronousQueue
SynchronousQueue也是一種阻塞隊列,其中每個 put 必須等待一個 take,同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。不能在同步隊列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;也不能迭代隊列,因為其中沒有元素可用於迭代。隊列的頭 是嘗試添加到隊列中的首個已排隊線程元素;如果沒有已排隊線程,則不添加元素並且頭為 null。
這里就不詳細介紹它的原理
DelayQueue
DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿后保存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,並且poll 將返回 null。當一個元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿。此隊列不允許使用 null 元素。
DelayQueue中的元素必須實現Delayed接口
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
DelayQueue定義了下面幾個變量實現阻塞隊列:
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
我們稍微解釋了DelayQueue中的offer、peek和poll函數
帶等待時間參數和不帶等待時間參數的offer實現是一樣的,在隊列為空的時候,喚醒線程。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
Peek函數返回第一個元素,返回的元素不一定符合延時要求。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
Poll函數在拿出元素后,會判斷該元素是否符合延時要求,當getDelay<=0時,符合要求,否則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
} finally {
lock.unlock();
}
}
帶等待時間參數的poll也差不多,只是要多考慮timeout參數,如果元素的延時在timeout之前,則返回該元素,否則返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (nanos <= 0)
return null;
if (delay > nanos)
delay = nanos;
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
}
}
} finally {
lock.unlock();
}
}
總結一下阻塞隊列
(1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.
(2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的
(3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序.
(4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.