一、前言
在完成Map下的並發集合后,現在來分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一個阻塞型隊列,支持多任務並發操作,有了之前看源碼的積累,再看ArrayBlockingQueue源碼會很容易,下面開始正文。
二、ArrayBlockingQueue數據結構
通過源碼分析,並且可以對比ArrayList可知,ArrayBlockingQueue的底層數據結構是數組,數據結構如下
說明:ArrayBlockingQueue底層采用數據才存放數據,對數組的訪問添加了鎖的機制,使其能夠支持多線程並發。
三、ArrayBlockingQueue源碼分析
3.1 類的繼承關系
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
說明:可以看到ArrayBlockingQueue繼承了AbstractQueue抽象類,AbstractQueue定義了對隊列的基本操作;同時實現了BlockingQueue接口,BlockingQueue表示阻塞型的隊列,其對隊列的操作可能會拋出異常;同時也實現了Searializable接口,表示可以被序列化。
3.2 類的屬性

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 版本序列號 private static final long serialVersionUID = -817911632652898426L; // 存放實際元素的數組 final Object[] items; // 取元素索引 int takeIndex; // 獲取元素索引 int putIndex; // 隊列中的項 int count; // 可重入鎖 final ReentrantLock lock; // 等待獲取條件 private final Condition notEmpty; // 等待存放條件 private final Condition notFull; // 迭代器 transient Itrs itrs = null; }
說明:從類的屬性中可以清楚的看到其底層的結構是Object類型的數組,取元素和存元素有不同的索引,有一個可重入鎖ReentrantLock,兩個條件Condition。對ReentrantLock和Condition不太熟悉的讀者可以參考筆者的這篇博客,【JUC】JDK1.8源碼分析之ReentrantLock(三)。
3.3 類的構造函數
1. ArrayBlockingQueue(int)型構造函數

public ArrayBlockingQueue(int capacity) { // 調用兩個參數的構造函數 this(capacity, false); }
說明:該構造函數用於創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。
2. ArrayBlockingQueue(int, boolean)型構造函數

public ArrayBlockingQueue(int capacity, boolean fair) { // 初始容量必須大於0 if (capacity <= 0) throw new IllegalArgumentException(); // 初始化數組 this.items = new Object[capacity]; // 初始化可重入鎖 lock = new ReentrantLock(fair); // 初始化等待條件 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
說明:該構造函數用於創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。
3. ArrayBlockingQueue(int, boolean, Collection<? extends E>)型構造函數

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { // 調用兩個參數的構造函數 this(capacity, fair); // 可重入鎖 final ReentrantLock lock = this.lock; // 上鎖 lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { // 遍歷集合 // 檢查元素是否為空 checkNotNull(e); // 存入ArrayBlockingQueue中 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { // 當初始化容量小於傳入集合的大小時,會拋出異常 throw new IllegalArgumentException(); } // 元素數量 count = i; // 初始化存元素的索引 putIndex = (i == capacity) ? 0 : i; } finally { // 釋放鎖 lock.unlock(); } }
說明:該構造函數用於創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。
3.4 核心函數分析
1. put函數

public void put(E e) throws InterruptedException { checkNotNull(e); // 獲取可重入鎖 final ReentrantLock lock = this.lock; // 如果當前線程未被中斷,則獲取鎖 lock.lockInterruptibly(); try { while (count == items.length) // 判斷元素是否已滿 // 若滿,則等待 notFull.await(); // 入隊列 enqueue(e); } finally { // 釋放鎖 lock.unlock(); } }
說明:put函數用於存放元素,在當前線程被中斷時會拋出異常,並且當隊列已經滿時,會阻塞一直等待。其中,put會調用enqueue函數,enqueue函數源碼如下

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 獲取數組 final Object[] items = this.items; // 將元素放入 items[putIndex] = x; if (++putIndex == items.length) // 放入后存元素的索引等於數組長度(表示已滿) // 重置存索引為0 putIndex = 0; // 元素數量加1 count++; // 喚醒在notEmpty條件上等待的線程 notEmpty.signal(); }
說明:enqueue函數用於將元素存入底層Object數組中,並且會喚醒等待notEmpty條件的線程。
2. offer函數

public boolean offer(E e) { // 檢查元素不能為空 checkNotNull(e); // 可重入鎖 final ReentrantLock lock = this.lock; // 獲取鎖 lock.lock(); try { if (count == items.length) // 元素個數等於數組長度,則返回 return false; else { // 添加進數組 enqueue(e); return true; } } finally { // 釋放數組 lock.unlock(); } }
說明:offer函數也用於存放元素,在調用ArrayBlockingQueue的add方法時,會間接的調用到offer函數,offer函數添加元素不會拋出異常,當底層Object數組已滿時,則返回false,否則,會調用enqueue函數,將元素存入底層Object數組。並喚醒等待notEmpty條件的線程。
3. take函數

public E take() throws InterruptedException { // 可重入鎖 final ReentrantLock lock = this.lock; // 如果當前線程未被中斷,則獲取鎖,中斷會拋出異常 lock.lockInterruptibly(); try { while (count == 0) // 元素數量為0,即Object數組為空 // 則等待notEmpty條件 notEmpty.await(); // 出隊列 return dequeue(); } finally { // 釋放鎖 lock.unlock(); } }
說明:take函數用於從ArrayBlockingQueue中獲取一個元素,其與put函數相對應,在當前線程被中斷時會拋出異常,並且當隊列為空時,會阻塞一直等待。其中,take會調用dequeue函數,dequeue函數源碼如下

private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取元素 E x = (E) items[takeIndex]; // 該索引的值賦值為null items[takeIndex] = null; // 取值索引等於數組長度 if (++takeIndex == items.length) // 重新賦值取值索引 takeIndex = 0; // 元素個數減1 count--; if (itrs != null) itrs.elementDequeued(); // 喚醒在notFull條件上等待的線程 notFull.signal(); return x; }
說明:dequeue函數用於將取元素,並且會喚醒等待notFull條件的線程。
4. poll函數

public E poll() { // 重入鎖 final ReentrantLock lock = this.lock; // 獲取鎖 lock.lock(); try { // 若元素個數為0則返回null,否則,調用dequeue,出隊列 return (count == 0) ? null : dequeue(); } finally { // 釋放鎖 lock.unlock(); } }
說明:poll函數用於獲取元素,其與offer函數相對應,不會拋出異常,當元素個數為0是,返回null,否則,調用dequeue函數,並喚醒等待notFull條件的線程。並返回。
5. clear函數

public void clear() { // 數組 final Object[] items = this.items; // 可重入鎖 final ReentrantLock lock = this.lock; // 獲取鎖 lock.lock(); try { // 保存元素個數 int k = count; if (k > 0) { // 元素個數大於0 // 存數元素索引 final int putIndex = this.putIndex; // 取元素索引 int i = takeIndex; do { // 賦值為null items[i] = null; if (++i == items.length) // 重新賦值i i = 0; } while (i != putIndex); // 重新賦值取元素索引 takeIndex = putIndex; // 元素個數為0 count = 0; if (itrs != null) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull條件的線程,則逐一喚醒 notFull.signal(); } } finally { // 釋放鎖 lock.unlock(); } }
說明:clear函數用於清空ArrayBlockingQueue,並且會釋放所有等待notFull條件的線程(存放元素的線程)。
四、示例
下面給出一個具體的示例來演示ArrayBlockingQueue的使用

package com.hust.grid.leesf.collections; import java.util.concurrent.ArrayBlockingQueue; class PutThread extends Thread { private ArrayBlockingQueue<Integer> abq; public PutThread(ArrayBlockingQueue<Integer> abq) { this.abq = abq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("put " + i); abq.put(i); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } class GetThread extends Thread { private ArrayBlockingQueue<Integer> abq; public GetThread(ArrayBlockingQueue<Integer> abq) { this.abq = abq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("take " + abq.take()); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class ArrayBlockingQueueDemo { public static void main(String[] args) { ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10); PutThread p1 = new PutThread(abq); GetThread g1 = new GetThread(abq); p1.start(); g1.start(); } }
運行結果:

put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9
說明:示例中使用了兩個線程,一個用於存元素,一個用於讀元素,存和讀各10次,每個線程存一個元素或者讀一個元素后都會休眠100ms,可以看到結果是交替打印,並且首先打印的肯定是put線程語句(因為若取線程先取元素,此時隊列並沒有元素,其會阻塞,等待存線程存入元素),並且最終程序可以正常結束。
① 若修改取元素線程,將存的元素的次數修改為15次(for循環的結束條件改為15即可),運行結果如下:

put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9
說明:運行結果與上面的運行結果相同,但是,此時程序無法正常結束,因為take方法被阻塞了,等待被喚醒。
五、總結
總的來說,有了前面分析的基礎,分析ArrayBlockingQueue就會非常的簡單,ArrayBlockingQueue是通過ReentrantLock和Condition條件來保證多線程的正確訪問的。ArrayBockingQueue的分析就到這里,歡迎交流,謝謝各位園友的觀看~