在Java多線程應用中,隊列的使用率很高,多數生產消費模型的首選數據結構就是隊列。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞隊列或者非阻塞隊列。
注:什么叫線程安全?這個首先要明確。線程安全的類 ,指的是類內共享的全局變量的訪問必須保證是不受多線程形式影響的。如果由於多線程的訪問(比如修改、遍歷、查看)而使這些變量結構被破壞或者針對這些變量操作的原子性被破壞,則這個類就不是線程安全的。
今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開:
BlockingQueue 阻塞算法
ConcurrentLinkedQueue,非阻塞算法
首先來看看BlockingQueue:
Queue是什么就不需要多說了吧,一句話:隊列是先進先出。相對的,棧是后進先出。如果不熟悉的話先找本基礎的數據結構的書看看吧。
BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。
首先,看看BlockingQueue提供的常用方法:
可能報異常 | 返回布爾值 | 可能阻塞 | 設定等待時間 | |
入隊 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
出隊 | remove() | poll() | take() | poll(timeout, unit) |
查看 | element() | peek() | 無 | 無 |
從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:
- add(e) remove() element() 方法不會阻塞線程。當不滿足約束條件時,會拋出IllegalStateException 異常。例如:當隊列被元素填滿后,再調用add(e),則會拋出異常。
- ffer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當隊列被元素填滿后,再調用offer(e),則不會插入元素,函數返回false。
- 要想要實現阻塞功能,需要調用put(e) take() 方法。當不滿足約束條件時,會阻塞線程。
好了,上點源碼你就更明白了。以ArrayBlockingQueue類為例,對於第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實調用的是第二類的方法,為什么?因為第二類方法返回boolean啊。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");//隊列已滿,拋異常
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();//隊列為空,拋異常
}
對於第二類方法,很標准的ReentrantLock使用方式,另外對於insert和extract的實現沒啥好說的。
注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是線程安全類。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//隊列已滿,返回false
return false;
else {
insert(e);//insert方法中發出了notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)//隊列為空,返回false
return null;
E x = extract();//extract方法中發出了notFull.signal();
return x;
} finally {
lock.unlock();
}
}
對於第三類方法,這里面涉及到Condition類,簡要提一下,await方法指:造成當前線程在接到信號或被中斷之前一直處於等待狀態。signal方法指:喚醒一個等待線程。
1 public void put(E e)throws InterruptedException { 2 if (e == null)throw new NullPointerException(); 3 final E[] items = this.items; 4 final ReentrantLock lock = this.lock; 5 lock.lockInterruptibly(); 6 try { 7 try { 8 while (count == items.length)//如果隊列已滿,等待notFull這個條件,這時當前線程被阻塞 9 notFull.await(); 10 } catch (InterruptedException ie) { 11 notFull.signal(); //喚醒受notFull阻塞的當前線程 12 throw ie; 13 } 14 insert(e); 15 } finally { 16 lock.unlock(); 17 } 18 } 19 public E take() throws InterruptedException { 20 final ReentrantLock lock = this.lock; 21 lock.lockInterruptibly(); 22 try { 23 try { 24 while (count == 0)//如果隊列為空,等待notEmpty這個條件,這時當前線程被阻塞 25 notEmpty.await(); 26 } catch (InterruptedException ie) { 27 notEmpty.signal();//喚醒受notEmpty阻塞的當前線程 28 throw ie; 29 } 30 E x = extract(); 31 return x; 32 } finally { 33 lock.unlock(); 34 } 35 } 36
第四類方法就是指在有必要時等待指定時間,就不詳細說了。再來看看BlockingQueue接口的具體實現類吧:
- ArrayBlockingQueue,其構造函數必須帶一個int參數來指明其大小
- LinkedBlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
- PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序
上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue:
首先,既然是鏈表,就應該有Node節點,它是一個內部靜態類:
1 static class Node<E> { 2 /** The item, volatile to ensure barrier separating write and read */ 3 volatile E item; 4 Node<E> next; 5 Node(E x) { item = x; } 6 }
1 /** 頭指針 */ 2 private transient Node<E> head;//head.next是隊列的頭元素 3 /** 尾指針 */ 4 private transient Node<E> last;//last.next是null
那么,對於入隊和出隊就很自然能理解了:
1 private void enqueue(E x) { 2 last = last.next = new Node<E>(x);//入隊是為last再找個下家 3 } 4 private E dequeue() { 5 Node<E> first = head.next; //出隊是把head.next取出來,然后將head向后移一位 6 head = first; 7 E x = first.item; 8 first.item = null; 9 return x; 10 }
另外,LinkedBlockingQueue相對於ArrayBlockingQueue還有不同是,有兩個ReentrantLock,且隊列現有元素的大小由一個AtomicInteger對象標示。注:AtomicInteger類是以原子的方式操作整型變量。
1 private final AtomicInteger count =new AtomicInteger(0); 2 /** 用於讀取的獨占鎖*/ 3 private final ReentrantLock takeLock =new ReentrantLock(); 4 /** 隊列是否為空的條件 */ 5 private final Condition notEmpty = takeLock.newCondition(); 6 /** 用於寫入的獨占鎖 */ 7 private final ReentrantLock putLock =new ReentrantLock(); 8 /** 隊列是否已滿的條件 */ 9 private final Condition notFull = putLock.newCondition();
有兩個Condition很好理解,在ArrayBlockingQueue也是這樣做的。但是為什么需要兩個ReentrantLock呢?下面會慢慢道來。
讓我們來看看offer和poll方法的代碼:
1 public boolean offer(E e) { 2 if (e == null)throw new NullPointerException(); 3 final AtomicInteger count = this.count; 4 if (count.get() == capacity) 5 return false; 6 int c = -1; 7 final ReentrantLock putLock =this.putLock;//入隊當然用putLock 8 putLock.lock(); 9 try { 10 if (count.get() < capacity) { 11 enqueue(e); //入隊 12 c = count.getAndIncrement(); //隊長度+1 13 if (c + 1 < capacity) 14 notFull.signal(); //隊列沒滿,當然可以解鎖了 15 } 16 } finally { 17 putLock.unlock(); 18 } 19 if (c == 0) 20 signalNotEmpty();//這個方法里發出了notEmpty.signal(); 21 return c >= 0; 22 } 23 public E poll() { 24 final AtomicInteger count = this.count; 25 if (count.get() == 0) 26 return null; 27 E x = null; 28 int c = -1; 29 final ReentrantLock takeLock =this.takeLock;出隊當然用takeLock 30 takeLock.lock(); 31 try { 32 if (count.get() > 0) { 33 x = dequeue();//出隊 34 c = count.getAndDecrement();//隊長度-1 35 if (c > 1) 36 notEmpty.signal();//隊列沒空,解鎖 37 } 38 } finally { 39 takeLock.unlock(); 40 } 41 if (c == capacity) 42 signalNotFull();//這個方法里發出了notFull.signal(); 43 return x; 44 }
看看源代碼發現和上面ArrayBlockingQueue的很類似,關鍵的問題在於:為什么要用兩個ReentrantLockputLock和takeLock?
我們仔細想一下,入隊操作其實操作的只有隊尾引用last,並且沒有牽涉到head。而出隊操作其實只針對head,和last沒有關系。那么就是說入隊和出隊的操作完全不需要公用一把鎖,所以就設計了兩個鎖,這樣就實現了多個不同任務的線程入隊的同時可以進行出隊的操作,另一方面由於兩個操作所共同使用的count是AtomicInteger類型的,所以完全不用考慮計數器遞增遞減的問題。
另外,還有一點需要說明一下:await()和singal()這兩個方法執行時都會檢查當前線程是否是獨占鎖的當前線程,如果不是則拋出java.lang.IllegalMonitorStateException異常。所以可以看到在源碼中這兩個方法都出現在Lock的保護塊中。
-------------------------------我是分割線--------------------------------------
下面再來說說ConcurrentLinkedQueue,它是一個無鎖的並發線程安全的隊列。
以下部分的內容參照了這個帖子http://yanxuxin.iteye.com/blog/586943
對比鎖機制的實現,使用無鎖機制的難點在於要充分考慮線程間的協調。簡單的說就是多個線程對內部數據結構進行訪問時,如果其中一個線程執行的中途因為一些原因出現故障,其他的線程能夠檢測並幫助完成剩下的操作。這就需要把對數據結構的操作過程精細的划分成多個狀態或階段,考慮每個階段或狀態多線程訪問會出現的情況。
ConcurrentLinkedQueue有兩個volatile的線程共享變量:head,tail。要保證這個隊列的線程安全就是保證對這兩個Node的引用的訪問(更新,查看)的原子性和可見性,由於volatile本身能夠保證可見性,所以就是對其修改的原子性要被保證。
下面通過offer方法的實現來看看在無鎖情況下如何保證原子性:
1 public boolean offer(E e) { 2 if (e == null)throw new NullPointerException(); 3 Node<E> n = new Node<E>(e, null); 4 for (;;) { 5 Node<E> t = tail; 6 Node<E> s = t.getNext(); 7 if (t == tail) { //------------------------------a 8 if (s == null) {//---------------------------b 9 if (t.casNext(s, n)) { //-------------------c 10 casTail(t, n); //------------------------d 11 return true; 12 } 13 } else { 14 casTail(t, s); //----------------------------e 15 } 16 } 17 } 18 }
此方法的循環內首先獲得尾指針和其next指向的對象,由於tail和Node的next均是volatile的,所以保證了獲得的分別都是最新的值。
代碼a:t==tail是最上層的協調,如果其他線程改變了tail的引用,則說明現在獲得不是最新的尾指針需要重新循環獲得最新的值。
代碼b:s==null的判斷。靜止狀態下tail的next一定是指向null的,但是多線程下的另一個狀態就是中間態:tail的指向沒有改變,但是其next已經指向新的結點,即完成tail引用改變前的狀態,這時候s!=null。這里就是協調的典型應用,直接進入代碼e去協調參與中間態的線程去完成最后的更新,然后重新循環獲得新的tail開始自己的新一次的入隊嘗試。另外值得注意的是a,b之間,其他的線程可能會改變tail的指向,使得協調的操作失敗。從這個步驟可以看到無鎖實現的復雜性。
代碼c:t.casNext(s, n)是入隊的第一步,因為入隊需要兩步:更新Node的next,改變tail的指向。代碼c之前可能發生tail引用指向的改變或者進入更新的中間態,這兩種情況均會使得t指向的元素的next屬性被原子的改變,不再指向null。這時代碼c操作失敗,重新進入循環。
代碼d:這是完成更新的最后一步了,就是更新tail的指向,最有意思的協調在這兒又有了體現。從代碼看casTail(t, n)不管是否成功都會接着返回true標志着更新的成功。首先如果成功則表明本線程完成了兩步的更新,返回true是理所當然的;如果 casTail(t, n)不成功呢?要清楚的是完成代碼c則代表着更新進入了中間態,代碼d不成功則是tail的指向被其他線程改變。意味着對於其他的線程而言:它們得到的是中間態的更新,s!=null,進入代碼e幫助本線程執行最后一步並且先於本線程成功。這樣本線程雖然代碼d失敗了,但是是由於別的線程的協助先完成了,所以返回true也就理所當然了。
通過分析這個入隊的操作,可以清晰的看到無鎖實現的每個步驟和狀態下多線程之間的協調和工作。
注:上面這大段文字看起來很累,先能看懂多少看懂多少,現在看不懂先不急,下面還會提到這個算法,並且用示意圖說明,就易懂很多了。
在使用ConcurrentLinkedQueue時要注意,如果直接使用它提供的函數,比如add或者poll方法,這樣我們自己不需要做任何同步。
但如果是非原子操作,比如:
1 if(!queue.isEmpty()) { 2 queue.poll(obj); 3 }
我們很難保證,在調用了isEmpty()之后,poll()之前,這個queue沒有被其他線程修改。所以對於這種情況,我們還是需要自己同步:
1 synchronized(queue) { 2 if(!queue.isEmpty()) { 3 queue.poll(obj); 4 } 5 }
注:這種需要進行自己同步的情況要視情況而定,不是任何情況下都需要這樣做。
另外還說一下,ConcurrentLinkedQueue的size()是要遍歷一遍集合的,所以盡量要避免用size而改用isEmpty(),以免性能過慢。
最后想說點什么呢,阻塞算法其實很好理解,簡單點理解就是加鎖,比如在BlockingQueue中看到的那樣,再往前推點,那就是synchronized。相比而言,非阻塞算法的設計和實現都很困難,要通過低級的原子性來支持並發。下面就簡要的介紹一下非阻塞算法,以下部分的內容參照了一篇很經典的文章http://www.ibm.com/developerworks/cn/java/j-jtp04186/
注:我覺得可以這樣理解,阻塞對應同步,非阻塞對應並發。也可以說:同步是阻塞模式,異步是非阻塞模式
舉個例子來說明什么是非阻塞算法:非阻塞的計數器
首先,使用同步的線程安全的計數器代碼如下

1 public finalclass Counter { 2 private long value =0; 3 public synchronizedlong getValue() { 4 return value; 5 } 6 public synchronizedlong increment() { 7 return ++value; 8 } 9 }
下面的代碼顯示了一種最簡單的非阻塞算法:使用 AtomicInteger的compareAndSet()(CAS方法)的計數器。compareAndSet()方法規定“將這個變量更新為新值,但是如果從我上次看到這個變量之后其他線程修改了它的值,那么更新就失敗”

1 public class NonblockingCounter { 2 private AtomicInteger value;//前面提到過,AtomicInteger類是以原子的方式操作整型變量。 3 public int getValue() { 4 return value.get(); 5 } 6 public int increment() { 7 int v; 8 do { 9 v = value.get(); 10 while (!value.compareAndSet(v, v +1)); 11 return v + 1; 12 } 13 }
非阻塞版本相對於基於鎖的版本有幾個性能優勢。首先,它用硬件的原生形態代替 JVM 的鎖定代碼路徑,從而在更細的粒度層次上(獨立的內存位置)進行同步,失敗的線程也可以立即重試,而不會被掛起后重新調度。更細的粒度降低了爭用的機會,不用重新調度就能重試的能力也降低了爭用的成本。即使有少量失敗的 CAS 操作,這種方法仍然會比由於鎖爭用造成的重新調度快得多。
NonblockingCounter 這個示例可能簡單了些,但是它演示了所有非阻塞算法的一個基本特征——有些算法步驟的執行是要冒險的,因為知道如果 CAS 不成功可能不得不重做。非阻塞算法通常叫作樂觀算法,因為它們繼續操作的假設是不會有干擾。如果發現干擾,就會回退並重試。在計數器的示例中,冒險的步驟是遞增——它檢索舊值並在舊值上加一,希望在計算更新期間值不會變化。如果它的希望落空,就會再次檢索值,並重做遞增計算。
再來一個例子,Michael-Scott 非阻塞隊列算法的插入操作,ConcurrentLinkedQueue 就是用這個算法實現的,現在來結合示意圖分析一下,很明朗:

1 public class LinkedQueue <E> { 2 private staticclass Node <E> { 3 final E item; 4 final AtomicReference<Node<E>> next; 5 Node(E item, Node<E> next) { 6 this.item = item; 7 this.next = new AtomicReference<Node<E>>(next); 8 } 9 } 10 private AtomicReference<Node<E>> head 11 = new AtomicReference<Node<E>>(new Node<E>(null,null)); 12 private AtomicReference<Node<E>> tail = head; 13 public boolean put(E item) { 14 Node<E> newNode = new Node<E>(item,null); 15 while (true) { 16 Node<E> curTail = tail.get(); 17 Node<E> residue = curTail.next.get(); 18 if (curTail == tail.get()) { 19 if (residue == null)/* A */ { 20 if (curTail.next.compareAndSet(null, newNode))/* C */ { 21 tail.compareAndSet(curTail, newNode) /* D */ ; 22 return true; 23 } 24 } else { 25 tail.compareAndSet(curTail, residue) /* B */; 26 } 27 } 28 } 29 } 30 }
看看這代碼完全就是ConcurrentLinkedQueue 源碼啊。
插入一個元素涉及頭指針和尾指針兩個指針更新,這兩個更新都是通過 CAS 進行的:從隊列當前的最后節點(C)鏈接到新節點,並把尾指針移動到新的最后一個節點(D)。如果第一步失敗,那么隊列的狀態不變,插入線程會繼續重試,直到成功。一旦操作成功,插入被當成生效,其他線程就可以看到修改。還需要把尾指針移動到新節點的位置上,但是這項工作可以看成是 “清理工作”,因為任何處在這種情況下的線程都可以判斷出是否需要這種清理,也知道如何進行清理。
隊列總是處於兩種狀態之一:正常狀態(或稱靜止狀態,圖 1 和 圖 3)或中間狀態(圖 2)。在插入操作之前和第二個 CAS(D)成功之后,隊列處在靜止狀態;在第一個 CAS(C)成功之后,隊列處在中間狀態。在靜止狀態時,尾指針指向的鏈接節點的 next 字段總為 null,而在中間狀態時,這個字段為非 null。任何線程通過比較 tail.next 是否為 null,就可以判斷出隊列的狀態,這是讓線程可以幫助其他線程 “完成” 操作的關鍵。 上圖顯示的是:有兩個元素,處在靜止狀態的隊列

