一、摘要
BlockingQueue通常用於一個線程在生產對象,而另外一個線程在消費這些對象的場景,例如在線程池中,當運行的線程數目大於核心的線程數目時候,經常就會把新來的線程對象放到BlockingQueue中去。
二、阻塞隊列原理
原理簡單的來講:就是一個線程往隊列里面放,而另外的一個線程從里面取
當線程持續的產生新對象並放入到隊列中,直到隊列達到它所能容納的臨界點。注意,隊列的容量是有限的,不可能一直往里面插入對象。如果隊列到達了臨界點時,這個時候再想往隊列中插入元素則會產生阻塞,直到另一端從隊列中進行消費了,這個時候阻塞現象才不會發生。另外當去消費一個空的隊列時候,這個線程也會產生阻塞現象,直到一個線程把對象插入到隊列中
三、BlockingQueue常用方法總結
| 拋出異常 | 特殊值 | 阻塞 | 超時 | |
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除 | remove(e) | poll | take() | poll(time,unit) |
| 檢查 | element(e) | peek | 不可用 | 不可用 |
四組不同的行為方式解釋:
1. 拋異常:如果試圖的操作無法立即執行,拋一個異常。
2. 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
3. 阻塞:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
4. 超時:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等
待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會拋出
一個 NullPointerException。
四、BlockingQueue源碼分析
1、通過IDE可以明顯的看到BlockingQueue是一個接口,我們在寫代碼的時候需要實現這個接口
java.util.concurrent 具有以下 BlockingQueue 接口的實現(Java 8):

五、數組阻塞隊列ArrayBlockingQueue分析
1、原理分析
首先ArrayBlockingQueue 類實現了 BlockingQueue 接口。其次ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組里,所以一旦創建了該隊列,就不能再增加其容量了。最后ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。
2、ArrayBlockingQueue的方法(下面着重分析put()和take()二者方法)
此構造方法中,我們能看到傳入了兩個參數,capacity代表着隊列的容量大小,而boolean類型的參數則是判斷是否為公平鎖,如果為true,則先到的線程會先得到鎖對象, 反之則有操作系統去決定哪個線程獲得鎖,大多數情況下都是設置為false,這樣性能會高點
在put方法中,我們能看到在執行put方法時,我們必須要對其進行加鎖操作,從而保證線程的安全性。其次會去判斷其隊列是否飽滿了,飽滿時則會發生阻塞現象,直到被其他線程喚醒時插入元素,接着會去調用notEmpty.signal()方法,間接的利用take方法將隊列中的元素取走,最后將鎖釋放。
同理可以看出take()方法是相反的,不再做詳細介紹,代碼注釋已給出
put()和take()精簡源代碼如下:
1 /** The queued items */ 2 final Object[] items; //利用數組來存儲元素 3 /** Main lock guarding all access */ 4 final ReentrantLock lock; 5 6 /** Condition for waiting takes */ 7 private final Condition notEmpty; //定義一個Condition對象,用來對take進行操作 8 9 /** Condition for waiting puts */ 10 private final Condition notFull; //定義一個Condition對象,用來對put進行操作 11 12 /** 13 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 14 * capacity and the specified access policy. 15 * 16 * @param capacity the capacity of this queue 17 * @param fair if {@code true} then queue accesses for threads blocked 18 * on insertion or removal, are processed in FIFO order; 19 * if {@code false} the access order is unspecified. 20 * @throws IllegalArgumentException if {@code capacity < 1} 21 */ 22 public ArrayBlockingQueue(int capacity, boolean fair) { 23 if (capacity <= 0) //判斷初始化的容量大小 24 throw new IllegalArgumentException(); 25 this.items = new Object[capacity]; 26 lock = new ReentrantLock(fair); 27 notEmpty = lock.newCondition(); 28 notFull = lock.newCondition(); 29 } 30 ====================================put()方法============================================== 31 /** 32 * Inserts the specified element at the tail of this queue, waiting 33 * for space to become available if the queue is full. 34 * 35 * @throws InterruptedException {@inheritDoc} 36 * @throws NullPointerException {@inheritDoc} 37 */ 38 public void put(E e) throws InterruptedException { 39 checkNotNull(e); 40 final ReentrantLock lock = this.lock; 41 lock.lockInterruptibly(); 42 try { 43 while (count == items.length) 44 notFull.await(); //隊列飽滿時,將使這個線程進入阻塞狀態,直到被其他線程喚醒時插入元素 45 enqueue(e); //將元素插入到隊列中 46 } finally { 47 lock.unlock(); 48 } 49 } 50 51 52 /** 53 * Inserts element at current put position, advances, and signals. 54 * Call only when holding lock. 55 */ 56 private void enqueue(E x) { 57 // assert lock.getHoldCount() == 1; 58 // assert items[putIndex] == null; 59 final Object[] items = this.items; 60 items[putIndex] = x; 61 if (++putIndex == items.length) 62 putIndex = 0; 63 count++; 64 notEmpty.signal(); //通知take那邊消費其元素 65 } 66 67 ===================================================take()方法======================================================== 68 69 public E take() throws InterruptedException { 70 final ReentrantLock lock = this.lock; //加鎖 71 lock.lockInterruptibly(); 72 try { 73 while (count == 0) 74 notEmpty.await(); //隊列為空時,將使這個線程進入阻塞狀態,直到被其他線程喚醒時取出元素 75 return dequeue(); //消費對頭中的元素 76 } finally { 77 lock.unlock(); 78 } 79 } 80 81 /** 82 * Extracts element at current take position, advances, and signals. 83 * Call only when holding lock. 84 */ 85 private E dequeue() { 86 // assert lock.getHoldCount() == 1; 87 // assert items[takeIndex] != null; 88 final Object[] items = this.items; 89 @SuppressWarnings("unchecked") 90 E x = (E) items[takeIndex]; 91 items[takeIndex] = null; 92 if (++takeIndex == items.length) 93 takeIndex = 0; 94 count--; 95 if (itrs != null) 96 itrs.elementDequeued(); 97 notFull.signal(); //通知put那邊消費其元素 98 return x; 99 }
六、鏈式阻塞隊列LinkedBlockingQueue分析
1、原理分析
LinkedBlockingQueue 類實現了 BlockingQueue 接口。同時LinkedBlockingQueue 內部以一個鏈式結構(鏈接節點)對其元素進行存儲。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。
2、LinkedBlockingQueue方法分析
針對LinkedBlockingQueue的構造方法中,我們能看到沒有定義上限時,會使用Integer.MAX_VALUE 作為上限
其次針對put等方法時,原理與ArrayBlockingQueue大致相同,只不過是基於鏈表去實現的
源碼精簡如下:
1 /** The capacity bound, or Integer.MAX_VALUE if none */ 2 //鏈表的容量 3 private final int capacity; 4 5 /** Current number of elements */ 6 //當前元素個數 7 private final AtomicInteger count = new AtomicInteger(); 8 9 /** 10 * Head of linked list. 11 * Invariant: head.item == null 12 */ 13 //鏈表頭節點 14 transient Node<E> head; 15 16 /** 17 * Tail of linked list. 18 * Invariant: last.next == null 19 */ 20 //鏈表尾節點 21 private transient Node<E> last; 22 23 /** Lock held by take, poll, etc */ 24 //出隊列鎖 25 private final ReentrantLock takeLock = new ReentrantLock(); 26 27 /** Wait queue for waiting takes */ 28 private final Condition notEmpty = takeLock.newCondition(); 29 30 /** Lock held by put, offer, etc */ 31 //入隊列鎖 32 private final ReentrantLock putLock = new ReentrantLock(); 33 34 /** Wait queue for waiting puts */ 35 private final Condition notFull = putLock.newCondition(); 36 37 38 //默認構造方法,默認執行容量上限 39 public LinkedBlockingQueue() { 40 this(Integer.MAX_VALUE); 41 } 42 43 //指定隊列的容量 44 public LinkedBlockingQueue(int capacity) { 45 if (capacity <= 0) throw new IllegalArgumentException(); 46 this.capacity = capacity; 47 //初始化頭尾節點的值,設置均為null 48 last = head = new Node<E>(null); 49 } 50 51 //往對尾中插入元素,隊列滿時,則會發生阻塞,直到有元素消費了或者線程中斷了 52 public void put(E e) throws InterruptedException { 53 if (e == null) throw new NullPointerException(); 54 int c = -1; 55 Node<E> node = new Node<E>(e); 56 final ReentrantLock putLock = this.putLock;//入隊列鎖 57 final AtomicInteger count = this.count;//獲取當前隊列中的元素個數 58 putLock.lockInterruptibly(); 59 try { 60 while (count.get() == capacity) { //條件:如果隊列滿了 61 notFull.await(); //則加入到出隊列等待中,直到隊列不滿了,這時就會被其他線程notFull.signal()喚醒 62 } 63 enqueue(node);//將元素入隊列 64 c = count.getAndIncrement(); //對當前隊列元素個數加1 65 if (c + 1 < capacity) 66 notFull.signal(); 67 } finally { 68 putLock.unlock(); 69 } 70 if (c == 0) 71 signalNotEmpty(); 72 } 73 74 75 //出隊列,大致原理與入隊列相反,當隊列為空時,則會阻塞,直到隊列不為空或者線程中斷 76 public E take() throws InterruptedException { 77 E x; 78 int c = -1; 79 final AtomicInteger count = this.count; 80 final ReentrantLock takeLock = this.takeLock; 81 takeLock.lockInterruptibly(); 82 try { 83 while (count.get() == 0) { 84 notEmpty.await(); 85 } 86 x = dequeue(); 87 c = count.getAndDecrement(); 88 if (c > 1) 89 notEmpty.signal(); 90 } finally { 91 takeLock.unlock(); 92 } 93 if (c == capacity) 94 signalNotFull(); 95 return x; 96 } 97 98 99
七、ArrayBlockingQueue和LinkedBlockingQueue源碼比較
在上述源碼過程我們能發現:
1、入隊列時,當隊列滿了,則會發生阻塞,直到隊列消費了數據或者線程被中斷了才會喚醒
2、出隊列時,當隊列為空時,則會發生阻塞,直到隊列中有數據了或者線程被中斷了才會喚醒
源碼注意:
ArrayBlockingQueue源碼中,共用的是同一把鎖
LinkedBlockingQueue源碼中,則是用到了兩把鎖,一把是入隊列鎖,另一把是出隊列鎖
八、參考資料
https://www.cnblogs.com/java-zhao/p/5135958.html
