阻塞隊列與普通的隊列(LinkedList/ArrayList)相比,支持在向隊列中添加元素時,隊列的長度已滿阻塞當前添加線程,直到隊列未滿或者等待超時;從隊列中獲取元素時,隊列中元素為空 ,會將獲取元素的線程阻塞,直到隊列中存在元素 或者等待超時。
在JUC包中常用的阻塞隊列包含ArrayBlockingQueue/LinkedBlockingQueue/LinkedBlockingDeque等,從結構來看都繼承了AbstractQueue實現了BlockingQueue接口(LinkedBlockingDeque是雙向阻塞隊列,實現的是BlockingDeque接口),在BlockingQueue接口中定義了幾個供子類實現的接口,可以分為3部分,puts操作、takes操作、其他操作。
puts操作 add(E e) : 添加成功返回true,失敗拋IllegalStateException異常 offer(E e) : 成功返回 true,如果此隊列已滿,則返回 false(如果添加了時間參數,且隊列已滿也會阻塞) put(E e) :將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞 takes操作 remove(Object o) :移除指定元素,成功返回true,失敗返回false poll() : 獲取並移除此隊列的頭元素,若隊列為空,則返回 null(如果添加了時間參數,且隊列中沒有數據也會阻塞) take():獲取並移除此隊列頭元素,若沒有元素則一直阻塞。 peek() :獲取但不移除此隊列的頭;若隊列為空,則返回 null。 other操作 contains(Object o):隊列中是否包含指定元素 drainTo(Collection<? super E> c):隊列轉化為集合
關於阻塞隊列,我們主要看LinkedBlockingQueue與ArrayBlockingQueue.
ArrayBlockingQueue
ArrayBlockingQueue是基於數組的、有界的、遵循FIFO原則的阻塞隊列,隊列初始化時必須指定隊列的長度。
這是一個經典的“有界緩沖區”,其中固定大小的數組包含由生產者插入並由消費者提取的元素。創建后,無法更改容量。此類支持用於排序等待生產者和消費者線程的可選公平策略。默認情況下,不保證此順序。但是,將fairness設置為true構造的隊列以FIFO順序授予線程訪問權限。公平性通常會降低吞吐量,但會降低可變性並避免飢餓。
結構

相關變量
final Object[] items; //一個數組,用來存放隊列中的變量(隊列的基礎) int count; //隊列中元素的數量 int takeIndex; //下一次take、poll、remove、peek操作的下標值 int putIndex; //下次add、offer、put操作的下標值
構造函數
ArrayBlockingQueue提供了三個構造函數,在只傳遞初始化大小值時,默認使用的鎖是非公平鎖,對比三個不同的構造函數而言,真正初始化隊列的構造方法是ArrayBlockingQueue(int capacity, boolean fair)方法,傳入集合的構造方法會在調用該方法后將集合遍歷存入隊列中
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //使用同一個鎖對象,此處是與LinkedBlockingQueue(使用兩個不同的鎖來控制添加,取出操作)不同的地方 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); //向數組中添加數據 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //設置下次添加操作對應的數組下標值 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
offer/add操作
add本質上調用的是offer操作,通過返回值true/false可以判斷隊列中添加元素是否成功,隊列已滿返回false
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //隊列已滿,直接返回false if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } //入隊列操作 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //喚醒take/poll(有時間參數)方法獲取數據的線程 notEmpty.signal(); }
put操作
沒有返回值,隊列已滿則等待,知道被喚醒
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) //隊列已滿線程掛起等待 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
poll操作
隊列為空返回null,對於有時間參數的poll操作,在隊列為空時,會被掛起等待
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; //歸零操作 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //喚醒puts線程 notFull.signal(); return x; }
take操作
隊列為空等待,直到隊列中存在元素被喚醒
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
peek操作
獲取隊列中第一個不為空的元素(每次takes操作,或者puts操作都會設置下次takes/puts操作的下標)
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //通過下標值獲取元素 return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; }
remove操作
刪除內部元素操作是一種本質上緩慢且具有破壞性的操作,需要將刪除元素后的元素統一遷移一個單位,並且在操作過程中會獲得鎖,對性能有影響,因此不應輕易執行remove操作
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; //遍歷隊列 do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; //如果需要移除的元素下標值為下一次取數的下標值,執行類似取數的操作 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; //如果takes下標到達隊列最大長度,歸零 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; //如果不相等,將需刪除元素的后續元素統一遷移一位 for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { //移動完成,設置puts操作下標 items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } //喚醒put操作等待的線程 notFull.signal(); }
綜上,ArrayBlockingQueue隊列的邏輯並不復雜,但需要注意一下幾點
1.ArrayBlockingQueue是以數組來實現隊列功能的,在執行puts或者takes操作時一旦下一個操作的下標值大於隊列的長度,類中用來記錄存取下標值會歸零,已達到循環使用隊列的目的 2.ArrayBlockingQueue是通過一個鎖控制takes以及puts操作,說明在同一時間內只能執行takes操作或者puts操作中的一種,對於阻塞隊列來說,保證了線程安全,
但是會影響隊列的消費和生產效率,並發性會下降 3.ArrayBlockingQueue在執行remove操作時,會將整個數組進行移動(最壞情況下),同時還會獲得鎖,對性能的影響比較大
LinkedBlockingQueue
LinkedBlockingQueue是基於鏈表的、有界的、遵循FIFO原則的阻塞隊列,隊列默認的最大長度為Integer.MAX_VALUE
結構

重要屬性
private final int capacity;//隊列的大小,可以自定義,默認為Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger(); //當前隊列中元素的數量
//take、poll操作需要持有的鎖,LinkedBlockingQueue支持並發操作,對於從隊列中獲取數據需要加鎖(會阻塞,ConcurrentLinkedQueue/ConcurrentLinkedDeque
是使用CAS操作來控制的,不會出現阻塞的問題) private final ReentrantLock takeLock = new ReentrantLock();
//put、offer操作需要持有的鎖,同上 private final ReentrantLock putLock = new ReentrantLock();
//notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 private final Condition notEmpty = takeLock.newCondition();
//notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 private final Condition notFull = putLock.newCondition();
Node類
相對於其他(ConcurrentLinkedQueue/ConcurrentLinkedDeque)類來說,LinkedBlockingQueue的Node類要簡單好多,由於是基於單鏈表實現的,只有一個next屬性(保存后繼節點),一個item屬性(存放值),一個構造函數
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
構造函數
LinkedBlockingQueue提供了三個構造函數,在不傳參數的情況下,默認隊列的大小為Integer.MAX_VALUE,對比三個不同的構造函數而言,真正初始化 隊列的構造方法是LinkedBlockingQueue(int capacity)方法,傳入集合的構造方法會在調用該方法后將集合遍歷存入隊列中
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); //設置隊列大小,new一個null節點,head、tail節點指向改節點 this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); //獲取put、offer操作需要的鎖,可重入 final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); //將隊列的last節點指向該節點 enqueue(new Node<E>(e)); ++n; } //隊列元素計數 count.set(n); } finally { putLock.unlock(); } }
offer操作
通過返回值true/false判斷是否成功,offer操作當隊列滿后並不會阻塞(有時間參數的offer操作也會阻塞),而是直接返回false,put操作是沒有返回值的,並且會一直阻塞,等待被喚醒(或者超過時間拋出異常)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //是否超過最大值 if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //獲取鎖 putLock.lock(); try { //再次判斷隊列是否存放滿(可能存在多線程的情況) if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //如果隊列沒有放滿,喚醒下一個添加線程 if (c + 1 < capacity) //其實這個地方,喚醒的添加線程是執行put方法(或者offer有時間參數的操作)時被阻塞的線程,如果僅僅只是執行offer操作應該不會執行任何操作, 沒有對應的添加線程添加到條件隊列中(個人理解,也是不太理解的地方) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) //當c=0時,表明當前隊列中存在一個元素,通知消費線程去消費 signalNotEmpty(); return c >= 0; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //喚醒條件等待隊列中的消費者去消費數據 notEmpty.signal(); } finally { takeLock.unlock(); } }
put操作
put方法不會像offer方法那樣去檢查隊列大小是否超過設定值,put操作一個元素入隊列時,如果隊列已滿,當前線程會進入nofull的條件等待隊列中等待,直到隊列中元素個數小於隊列大小時被喚醒,才繼續put操作
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //隊列已滿存放線程阻塞,等待被喚醒 while (count.get() == capacity) { notFull.await(); } //入隊列 enqueue(node); c = count.getAndIncrement(); //隊列沒滿,喚醒notfull等待隊列中的添加線程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) //當c=0時,表明當前隊列中存在一個元素,通知消費線程去消費 signalNotEmpty(); }
poll操作
必定會有返回值(異常除外),但包含null(隊列中沒有數據),與take操作比較可以發現,take操作在隊列中沒有數據時執行take操作的線程會被掛起,直到隊列中有數據(有時間參數的poll操作也會被掛起,等待喚醒或者超時)
public E poll() { final AtomicInteger count = this.count; //如果隊列沒有數據,返回null if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) //喚醒其他消費線程 notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //出隊列操作,因為隊列的head節點為null節點,在出隊列時,會始終保持head節點為空,next節點為真正意義上的首節點 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; //自指向,該節點已經無用,便於GC h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
take操作
take操作不會向poll操作去檢查隊列中有沒有數據,隊列中沒有數據時會被掛起,等待被喚醒
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //隊列中是否有數據,沒有等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) //如果隊列中有數據存在,喚醒notempty等待隊列中的消費線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //喚醒添加線程 signalNotFull(); return x; }
peek操作
獲取隊列中頭部元素,可能存在其他線程執行的刪除、take(poll)操作,所以要加鎖,獲取數據不一定准確
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //head節點不存放數據,所以取的是next節點 Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
remove操作
在執行刪除操作時,會將puts以及takes操作都上鎖,保證線程安全,然后執行遍歷刪除操作,在刪除后,會去喚醒等待中的添加線程執行 添加操作
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { //遍歷單項鏈表 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { //移除數據 unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; //喚醒添加線程 if (count.getAndDecrement() == capacity) notFull.signal(); }
綜上,LinkedBlockingQueue的api與ArrayBlockingQueue並無太大差別,在實現思想上,LinkedBlockingQueue使用了鎖分離以及鏈表,其他與ArraBlockingQueue(一個鎖統一管理、數組)沒太大區別
LinkedBlockingQueue與ArrayBlockingQueue異同
1.LinkedBlockingQueue是基於鏈表實現的初始化是可以不用指定隊列大小(默認是Integer.MAX_VALUE);ArrayBlockingQueue是基於數組實現的初始化時必須指定隊列大小 2.LinkedBlockingQueue在puts操作都會生成新的Node對象,takes操作Node對象在某一時間會被GC,可能會影響GC性能;ArrayBlockingQueue是固定的數組長度循環使用,
不會出現對象的產生與回收 3.LinkedBlockingQueue是基於鏈表的形式,在執行remove操作時,不用移動其他數據;ArrayBlockingQueue是基於鏈表,在remove時需要移動數據,影響性能 4.LinkedBlockingQueue使用兩個鎖將puts操作與takes操作分開;ArrayBlockingQueue使用一個鎖來控制,在高並發高吞吐的情況下,LinkedBlockingQueue的性能較好
