1. 阻塞隊列介紹
顧名思義,阻塞隊列是一個具備先進先出特性的隊列結構,從隊列末尾插入數據,從隊列頭部取出數據。而阻塞隊列與普通隊列的最大不同在於阻塞隊列提供了阻塞式的同步插入、取出數據的功能(阻塞入隊put/阻塞出隊take)。
使用put插入數據時,如果隊列空間已滿並不直接返回,而是令當前操作的線程陷入阻塞態(生產者線程),等待着阻塞隊列中的元素被其它線程(消費者線程)取走,令隊列重新變得不滿時被喚醒再次嘗試插入數據。使用take取出數據時,如果隊列空間為空並不直接返回,而是令當前操作的線程陷入阻塞態(消費者線程),等待其它線程(生產者線程)插入新元素,令隊列非空時被喚醒再次嘗試取出數據。
阻塞隊列主要用於解決並發場景下消費者線程與生產者線程處理速度不一致的問題。例如jdk的線程池實現中,線程池核心線程(消費者線程)處理速度一定的情況下,如果業務方線程提交的任務過多導致核心線程處理不過來時,將任務暫時放進阻塞隊列等待核心線程消費(阻塞隊列未滿);由於核心線程常駐的原因,當業務方線程提交的任務較少,核心線程消費速度高於業務方生產速度時,核心線程作為消費者會阻塞在阻塞隊列的take方法中,避免無謂的浪費cpu資源。
由於阻塞隊列在內部實現了協調生產者/消費者的機制而不需要外部使用者過多的考慮並發同步問題,極大的降低了生產者/消費者場景下程序的復雜度。
2. 自己實現阻塞隊列
下面我們自己動手一步步的實現幾個不同版本、效率由低到高的的阻塞隊列,來加深對阻塞隊列工作原理的理解。
阻塞隊列接口
為了降低復雜度,我們的阻塞隊列只提供最基礎的出隊、入隊和判空接口。
/** * 阻塞隊列 * 1. 首先是一個先進先出的隊列 * 2. 提供特別的api,在入隊時如果隊列已滿令當前操作線程阻塞;在出隊時如果隊列為空令當前操作線程阻塞 * 3. 單個元素的插入、刪除操作是線程安全的 */ public interface MyBlockingQueue<E> {
/** * 插入特定元素e,加入隊尾 * 隊列已滿時阻塞當前線程,直到隊列中元素被其它線程刪除並插入成功 * */ void put(E e) throws InterruptedException; /** * 隊列頭部的元素出隊(返回頭部元素,將其從隊列中刪除) * 隊列為空時阻塞當前線程,直到隊列被其它元素插入新元素並出隊成功 * */ E take() throws InterruptedException; /** * 隊列是否為空 * */ boolean isEmpty(); }
2.1 v1版本(最基本的隊列實現)
博客中所實現的阻塞隊列底層是使用數組承載數據的(ArrayBlockingQueue),內部提供了私有方法enqueue和dequeue來實現原始的內部入隊和出隊操作。
最初始的v1版本中,我們只實現最基本的FIFO隊列功能,其put和take方法只是簡單的調用了enqueue和dequeue,因此v1版本中其入隊、出隊不是阻塞的,也無法保障線程安全,十分簡陋。
后續的版本中,我們會以v1版本為基礎,實現阻塞調用以及線程安全的特性,並且對所實現的阻塞隊列性能進行不斷的優化。
/** * 數組作為底層結構的阻塞隊列 v1版本 */ public class MyArrayBlockingQueueV1<E> implements MyBlockingQueue<E> { /** * 隊列默認的容量大小 * */ private static final int DEFAULT_CAPACITY = 16; /** * 承載隊列元素的底層數組 * */ private final Object[] elements; /** * 當前頭部元素的下標 * */ private int head; /** * 下一個元素插入時的下標 * */ private int tail; /** * 隊列中元素個數 * */ private int count; //=================================================構造方法====================================================== public MyArrayBlockingQueueV1() { // 設置數組大小為默認 this.elements = new Object[DEFAULT_CAPACITY]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; } public MyArrayBlockingQueueV1(int initCapacity) { assert initCapacity > 0;
this.elements = new Object[initCapacity]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; } /** * 下標取模 * */ private int getMod(int logicIndex){ int innerArrayLength = this.elements.length; // 由於隊列下標邏輯上是循環的 if(logicIndex < 0){ // 當邏輯下標小於零時 // 真實下標 = 邏輯下標 + 加上當前數組長度 return logicIndex + innerArrayLength; } else if(logicIndex >= innerArrayLength){ // 當邏輯下標大於數組長度時 // 真實下標 = 邏輯下標 - 減去當前數組長度 return logicIndex - innerArrayLength; } else { // 真實下標 = 邏輯下標 return logicIndex; } } /** * 入隊 * */ private void enqueue(E e){ // 存放新插入的元素 this.elements[this.tail] = e; // 尾部插入新元素后 tail下標后移一位 this.tail = getMod(this.tail + 1); this.count++; } /** * 出隊 * */ private E dequeue(){ // 暫存需要被刪除的數據 E dataNeedRemove = (E)this.elements[this.head]; // 將當前頭部元素引用釋放 this.elements[this.head] = null; // 頭部下標 后移一位 this.head = getMod(this.head + 1); this.count--; return dataNeedRemove; } @Override public void put(E e){ enqueue(e); } @Override public E take() { return dequeue(); } @Override public boolean isEmpty() { return this.count == 0; } }
2.2 v2版本(實現同步阻塞和線程安全的特性)
前面提到阻塞調用的出隊、入隊的功能是阻塞隊列區別於普通隊列的關鍵特性。阻塞調用實現的方式有很多,其中最容易理解的一種方式便是無限循環的輪詢,直到出隊/入隊成功(雖然cpu效率很低)。
v2版本在v1的基礎上,使用無限循環加定時休眠的方式簡單的實現了同步調用時阻塞的特性。並且在put/take內增加了synchronized塊將入隊/出隊代碼包裹起來,阻止多個線程並發的操作隊列而產生線程安全問題。
v2版本入隊方法實現:
@Override public void put(E e) throws InterruptedException { while (true) { synchronized (this) { // 隊列未滿時執行入隊操作 if (count != elements.length) { // 入隊,並返回 enqueue(e); return; } } // 隊列已滿,休眠一段時間后重試 Thread.sleep(100L); } }
v2版本出隊方法實現:
@Override public E take() throws InterruptedException { while (true) { synchronized (this) { // 隊列非空時執行出隊操作 if (count != 0) { // 出隊並立即返回 return dequeue(); } } // 隊列為空的情況下,休眠一段時間后重試 Thread.sleep(100L); } }
v2版本完整代碼:

/** * 數組作為底層結構的阻塞隊列 v2版本 */ public class MyArrayBlockingQueueV2<E> implements MyBlockingQueue<E> { /** * 隊列默認的容量大小 * */ private static final int DEFAULT_CAPACITY = 16; /** * 承載隊列元素的底層數組 * */ private final Object[] elements; /** * 當前頭部元素的下標 * */ private int head; /** * 下一個元素插入時的下標 * */ private int tail; /** * 隊列中元素個數 * */ private int count; //=================================================構造方法====================================================== /** * 默認構造方法 * */ public MyArrayBlockingQueueV2() { // 設置數組大小為默認 this.elements = new Object[DEFAULT_CAPACITY]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; } /** * 默認構造方法 * */ public MyArrayBlockingQueueV2(int initCapacity) { assert initCapacity > 0; // 設置數組大小為默認 this.elements = new Object[initCapacity]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; } /** * 下標取模 * */ private int getMod(int logicIndex){ int innerArrayLength = this.elements.length; // 由於隊列下標邏輯上是循環的 if(logicIndex < 0){ // 當邏輯下標小於零時 // 真實下標 = 邏輯下標 + 加上當前數組長度 return logicIndex + innerArrayLength; } else if(logicIndex >= innerArrayLength){ // 當邏輯下標大於數組長度時 // 真實下標 = 邏輯下標 - 減去當前數組長度 return logicIndex - innerArrayLength; } else { // 真實下標 = 邏輯下標 return logicIndex; } } /** * 入隊 * */ private void enqueue(E e){ // 存放新插入的元素 this.elements[this.tail] = e; // 尾部插入新元素后 tail下標后移一位 this.tail = getMod(this.tail + 1); this.count++; } /** * 出隊 * */ private E dequeue(){ // 暫存需要被刪除的數據 E dataNeedRemove = (E)this.elements[this.head]; // 將當前頭部元素引用釋放 this.elements[this.head] = null; // 頭部下標 后移一位 this.head = getMod(this.head + 1); this.count--; return dataNeedRemove; } @Override public void put(E e) throws InterruptedException { while (true) { synchronized (this) { // 隊列未滿時執行入隊操作 if (count != elements.length) { // 入隊,並返回 enqueue(e); return; } } // 隊列已滿,休眠一段時間后重試 Thread.sleep(100L); } } @Override public E take() throws InterruptedException { while (true) { synchronized (this) { // 隊列非空時執行出隊操作 if (count != 0) { // 出隊並立即返回 return dequeue(); } } // 隊列為空的情況下,休眠一段時間后重試 Thread.sleep(100L); } } @Override public boolean isEmpty() { return this.count == 0; } }
2.3 v3版本(引入條件變量優化無限循環輪詢)
在有大量線程競爭的情況下,v2版本無限循環加休眠的阻塞方式存在兩個嚴重的問題。
無限循環輪詢的缺陷
1. 線程周期性的休眠/喚醒會造成頻繁的發生線程上下文切換,非常浪費cpu資源
2. 線程在嘗試操作失敗被阻塞時(嘗試入隊時隊列已滿、嘗試出隊時隊列為空),如果休眠時間設置的太短,則休眠/喚醒的次數會非常多,cpu性能低下;但如果休眠的時間設置的較長,則會導致被阻塞線程在隊列狀態發生變化時無法及時的響應
舉個例子:某一生產者線程在入隊時發現隊列已滿,當前線程休眠1s,在0.1s之后一個消費者線程取走了一個元素,而此時休眠的生產者線程還需要白白等待0.9s后才被喚醒並感知到隊列未滿而接着執行入隊操作。綜上所述,無限循環加休眠的v2版本阻塞隊列其性能極差,需要進一步的優化。
使用條件變量進行優化
為了解決上述循環休眠浪費cpu和隊列狀態發生變化時(已滿到未滿,已空到未空)被阻塞線程無法及時響應的問題,v3版本引入條件變量對其進行優化。
條件變量由底層的操作系統內核實現的、用於線程間同步的利器。(條件變量的實現原理可以參考我之前的博客:https://www.cnblogs.com/xiaoxiongcanguan/p/14152830.html)
java將不同操作系統內核提供的條件變量機制抽象封裝后,作為可重入鎖ReentrantLock的附屬給程序員使用。且為了避免lost wakeup問題,在條件變量的實現中增加了校驗,要求調用條件變量的signal和await方法時當前線程必須先獲得條件變量所附屬的鎖才行,更具體的解析可以參考這篇文章:https://mp.weixin.qq.com/s/ohcr6T1aB7-lVFJIfyJZjA。
引入條件變量后,可以令未滿足某種條件的線程暫時進入阻塞態,等待在一個條件變量上;當對應條件滿足時由其它的線程將等待在條件變量上的線程喚醒,將其從阻塞態再切換回就緒態。
舉個例子:當某一生產者線程想要插入新元素但阻塞隊列已滿時,可以令當前生產者線程等待並阻塞在對應的條件變量中;當后續某一消費者線程執行出隊操作使得隊列非空后,將等待在條件變量上的生產者線程喚醒,被喚醒的生產者線程便能及時的再次嘗試進行入隊操作。
v3和v2版本相比,等待在條件變量進入阻塞態的線程不再周期性的被喚醒而占用過多的cpu資源,且在特定條件滿足時也能被及時喚醒。
引入條件變量后的v3版本阻塞隊列效率比v2高出許多。
v3版本完整代碼:
/** * 數組作為底層結構的阻塞隊列 v3版本 */ public class MyArrayBlockingQueueV3<E> implements MyBlockingQueue<E> { /** * 隊列默認的容量大小 * */ private static final int DEFAULT_CAPACITY = 16; /** * 承載隊列元素的底層數組 * */ private final Object[] elements; /** * 當前頭部元素的下標 * */ private int head; /** * 下一個元素插入時的下標 * */ private int tail; /** * 隊列中元素個數 * */ private int count; private final ReentrantLock reentrantLock; private final Condition condition; //=================================================構造方法====================================================== /** * 默認構造方法 * */ public MyArrayBlockingQueueV3() { this(DEFAULT_CAPACITY); } /** * 默認構造方法 * */ public MyArrayBlockingQueueV3(int initCapacity) { assert initCapacity > 0; // 設置數組大小為默認 this.elements = new Object[initCapacity]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; this.reentrantLock = new ReentrantLock(); this.condition = this.reentrantLock.newCondition(); } /** * 下標取模 * */ private int getMod(int logicIndex){ int innerArrayLength = this.elements.length; // 由於隊列下標邏輯上是循環的 if(logicIndex < 0){ // 當邏輯下標小於零時 // 真實下標 = 邏輯下標 + 加上當前數組長度 return logicIndex + innerArrayLength; } else if(logicIndex >= innerArrayLength){ // 當邏輯下標大於數組長度時 // 真實下標 = 邏輯下標 - 減去當前數組長度 return logicIndex - innerArrayLength; } else { // 真實下標 = 邏輯下標 return logicIndex; } } /** * 入隊 * */ private void enqueue(E e){ // 存放新插入的元素 this.elements[this.tail] = e; // 尾部插入新元素后 tail下標后移一位 this.tail = getMod(this.tail + 1); this.count++; } /** * 出隊 * */ private E dequeue(){ // 暫存需要被刪除的數據 E dataNeedRemove = (E)this.elements[this.head]; // 將當前頭部元素引用釋放 this.elements[this.head] = null; // 頭部下標 后移一位 this.head = getMod(this.head + 1); this.count--; return dataNeedRemove; } @Override public void put(E e) throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 reentrantLock.lockInterruptibly(); try { // 因為被消費者喚醒后可能會被其它的生產者再度填滿隊列,需要循環的判斷 while (this.count == elements.length) { // put操作時,如果隊列已滿則進入條件變量的等待隊列,並釋放條件變量對應的鎖 condition.await(); } // 走到這里,說明當前隊列不滿,可以執行入隊操作 enqueue(e); // 喚醒可能等待着的消費者線程 // 由於共用了一個condition,所以不能用signal,否則一旦喚醒的也是生產者線程就會陷入上面的while死循環) condition.signalAll(); } finally { // 入隊完畢,釋放鎖 reentrantLock.unlock(); } } @Override public E take() throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 reentrantLock.lockInterruptibly(); try { // 因為被生產者喚醒后可能會被其它的消費者消費而使得隊列再次為空,需要循環的判斷 while(this.count == 0){ condition.await(); } E headElement = dequeue(); // 喚醒可能等待着的生產者線程 // 由於共用了一個condition,所以不能用signal,否則一旦喚醒的也是消費者線程就會陷入上面的while死循環) condition.signalAll(); return headElement; } finally { // 出隊完畢,釋放鎖 reentrantLock.unlock(); } } @Override public boolean isEmpty() { return this.count == 0; } }
2.4 v4版本(引入雙條件變量,優化喚醒效率)
v3版本通過引入條件變量解決了v2版本中循環休眠、喚醒效率低下的問題,但v3版本還是存在一定的性能問題。
v3版本中signalAll的效率問題
jdk的Condition條件變量提供了signal和signalAll這兩個方法用於喚醒等待在條件變量中的線程,其中signalAll會喚醒等待在條件變量上的所有線程,而signal則只會喚醒其中一個。
舉個例子,v3版本中消費者線程在隊列已滿時進行出隊操作后,通過signalAll會喚醒所有等待入隊的多個生產者線程,但最終只會有一個線程成功競爭到互斥鎖並成功執行入隊操作,其它的生產者線程在被喚醒后發現隊列依然是滿的,而繼續等待。v3版本中的signalAll喚醒操作造成了驚群效應,無意義的喚醒了過多的等待中的線程。
但僅僅將v3版本中的signalAll改成signal是不行的,因為生產者和消費者線程是等待在同一個條件變量中的,如果消費者在出隊后通過signal喚醒的不是與之對應的生產者線程,而是另一個消費者線程,則本該被喚醒的生產者線程可能遲遲無法被喚醒,甚至在一些場景下會永遠被阻塞,無法再喚醒。
仔細思索后可以發現,對於生產者線程其在隊列已滿時阻塞等待,等待的是隊列不滿的條件(notFull);而對於消費者線程其在隊列為空時阻塞等待,等待的是隊列不空的條件(notEmpty)。隊列不滿和隊列不空實質上是兩個互不相關的條件。
因此v4版本中將生產者線程和消費者線程關注的條件變量拆分成兩個:生產者線程在隊列已滿時阻塞等待在notFull條件變量上,消費者線程出隊后通過notFull.signal嘗試着喚醒一個等待的生產者線程;與之相對的,消費者線程在隊列為空時阻塞等待在notEmpty條件變量上,生產者線程入隊后通過notEmpty.signal嘗試着喚醒一個等待的消費者線程。
通過拆分出兩個互相獨立的條件變量,v4版本避免了v3版本中signalAll操作帶來的驚群效應,避免了signalAll操作無效喚醒帶來的額外開銷。
v4版本完整代碼:
/** * 數組作為底層結構的阻塞隊列 v4版本 */ public class MyArrayBlockingQueueV4<E> implements MyBlockingQueue<E> { /** * 隊列默認的容量大小 * */ private static final int DEFAULT_CAPACITY = 16; /** * 承載隊列元素的底層數組 * */ private final Object[] elements; /** * 當前頭部元素的下標 * */ private int head; /** * 下一個元素插入時的下標 * */ private int tail; /** * 隊列中元素個數 * */ private int count; private final ReentrantLock reentrantLock; private final Condition notEmpty; private final Condition notFull; //=================================================構造方法====================================================== /** * 默認構造方法 * */ public MyArrayBlockingQueueV4() { this(DEFAULT_CAPACITY); } /** * 默認構造方法 * */ public MyArrayBlockingQueueV4(int initCapacity) { assert initCapacity > 0; // 設置數組大小為默認 this.elements = new Object[initCapacity]; // 初始化隊列 頭部,尾部下標 this.head = 0; this.tail = 0; this.reentrantLock = new ReentrantLock(); this.notEmpty = this.reentrantLock.newCondition(); this.notFull = this.reentrantLock.newCondition(); } /** * 下標取模 * */ private int getMod(int logicIndex){ int innerArrayLength = this.elements.length; // 由於隊列下標邏輯上是循環的 if(logicIndex < 0){ // 當邏輯下標小於零時 // 真實下標 = 邏輯下標 + 加上當前數組長度 return logicIndex + innerArrayLength; } else if(logicIndex >= innerArrayLength){ // 當邏輯下標大於數組長度時 // 真實下標 = 邏輯下標 - 減去當前數組長度 return logicIndex - innerArrayLength; } else { // 真實下標 = 邏輯下標 return logicIndex; } } /** * 入隊 * */ private void enqueue(E e){ // 存放新插入的元素 this.elements[this.tail] = e; // 尾部插入新元素后 tail下標后移一位 this.tail = getMod(this.tail + 1); this.count++; } /** * 出隊 * */ private E dequeue(){ // 暫存需要被刪除的數據 E dataNeedRemove = (E)this.elements[this.head]; // 將當前頭部元素引用釋放 this.elements[this.head] = null; // 頭部下標 后移一位 this.head = getMod(this.head + 1); this.count--; return dataNeedRemove; } @Override public void put(E e) throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 reentrantLock.lockInterruptibly(); try { // 因為被消費者喚醒后可能會被其它的生產者再度填滿隊列,需要循環的判斷 while (this.count == elements.length) { // put操作時,如果隊列已滿則進入notFull條件變量的等待隊列,並釋放條件變量對應的互斥鎖 notFull.await(); // 消費者進行出隊操作時 } // 走到這里,說明當前隊列不滿,可以執行入隊操作 enqueue(e); // 喚醒可能等待在notEmpty中的一個消費者線程 notEmpty.signal(); } finally { // 入隊完畢,釋放鎖 reentrantLock.unlock(); } } @Override public E take() throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 reentrantLock.lockInterruptibly(); try { // 因為被生產者喚醒后可能會被其它的消費者消費而使得隊列再次為空,需要循環的判斷 while(this.count == 0){ notEmpty.await(); } E headElement = dequeue(); // 喚醒可能等待在notFull中的一個生產者線程 notFull.signal(); return headElement; } finally { // 出隊完畢,釋放鎖 reentrantLock.unlock(); } } @Override public boolean isEmpty() { return this.count == 0; } }
2.5 v5版本(引入雙鎖令生產者和消費者能並發操作阻塞隊列)
v4版本的阻塞隊列采用雙條件變量之后,其性能已經不錯了,但仍存在進一步優化的空間。
v4版本單鎖的性能問題
v4版本中阻塞隊列的出隊、入隊操作是使用同一個互斥鎖進行並發同步的,這意味着生產者線程和消費者線程無法並發工作,消費者線程必須等待生產者線程操作完成退出臨界區之后才能繼續執行,反之亦然。單鎖的設計在生產者和消費者都很活躍的高並發場景下會一定程度限制阻塞隊列的吞吐量。
因此v5版本在v4版本的基礎上,將出隊和入隊操作使用兩把鎖分別管理,使得生產者線程和消費者線程可以並發的操作阻塞隊列,達到進一步提高吞吐量的目的。
使用兩把鎖分別控制出隊、入隊后,還需要一些調整來解決生產者/消費者並發操作隊列所帶來的問題。
存在並發問題的雙鎖版本出隊、入隊實現第一版(v4基礎上的微調):
/** this.takeLock = new ReentrantLock(); this.notEmpty = this.takeLock.newCondition(); this.putLock = new ReentrantLock(); this.notFull = this.putLock.newCondition(); */ @Override public void put(E e) throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 putLock.lockInterruptibly(); try { // 因為被消費者喚醒后可能會被其它的生產者再度填滿隊列,需要循環的判斷 while (this.count == elements.length) { // put操作時,如果隊列已滿則進入notFull條件變量的等待隊列,並釋放條件變量對應的互斥鎖 notFull.await(); } // 走到這里,說明當前隊列不滿,可以執行入隊操作 enqueue(e); // 喚醒可能等待在notEmpty中的一個消費者線程 notEmpty.signal(); } finally { // 入隊完畢,釋放鎖 putLock.unlock(); } } @Override public E take() throws InterruptedException { // 先嘗試獲得互斥鎖,以進入臨界區 takeLock.lockInterruptibly(); try { // 因為被生產者喚醒后可能會被其它的消費者消費而使得隊列再次為空,需要循環的判斷 while(this.count == 0){ notEmpty.await(); } E headElement = dequeue(); // 喚醒可能等待在notFull中的一個生產者線程 notFull.signal(); return headElement; } finally { // 出隊完畢,釋放鎖 takeLock.unlock(); } }
上面基於v4版本微調的雙鎖實現雖然容易理解,但由於允許消費者和生產者線程並發的訪問隊列而存在幾個嚴重問題。
1. count屬性線程不安全
隊列長度count字段是一個用於判斷隊列是否已滿,隊列是否為空的重要屬性。在v5之前的版本count屬性一直被唯一的同步鎖保護着,任意時刻至多只有一個線程可以進入臨界區修改count的值。而引入雙鎖令消費者線程/生產者線程能並發訪問后,count變量的自增/自減操作會出現線程不安全的問題。
解決方案:將int類型的count修改為AtomicInteger來解決生產者/消費者同時訪問、修改count時導致的並發問題。
2. 生產者/消費者線程死鎖問題
在上述的代碼示例中,生產者線程首先獲得生產者鎖去執行入隊操作,然后喚醒可能阻塞在notEmpty上的消費者線程。由於使用條件變量前首先需要獲得其所屬的互斥鎖,如果生產者線程不先釋放生產者鎖就去獲取消費者的互斥鎖,那么就存在出現死鎖的風險。消費者線程和生產者線程可以並發的先分別獲得消費者鎖和生產者鎖,並且也同時嘗試着獲取另一把鎖,這樣雙方都在等待着對方釋放鎖,互相阻塞出現死鎖現象。
解決方案:先釋放已獲得的鎖之后再去獲得另一個鎖執行喚醒操作
存在並發問題的雙鎖版本出隊、入隊實現第二版(在上述第一版基礎上進行微調):
/** private final AtomicInteger count = new AtomicInteger(); this.takeLock = new ReentrantLock(); this.notEmpty = this.takeLock.newCondition(); this.putLock = new ReentrantLock(); this.notFull = this.putLock.newCondition(); */ @Override public void put(E e) throws InterruptedException { int currentCount; // 先嘗試獲得互斥鎖,以進入臨界區 putLock.lockInterruptibly(); try { // 因為被消費者喚醒后可能會被其它的生產者再度填滿隊列,需要循環的判斷 while (count.get() == elements.length) { // put操作時,如果隊列已滿則進入notFull條件變量的等待隊列,並釋放條件變量對應的互斥鎖 notFull.await(); // 消費者進行出隊操作時 } // 走到這里,說明當前隊列不滿,可以執行入隊操作 enqueue(e); currentCount = count.getAndIncrement(); } finally { // 入隊完畢,釋放鎖 putLock.unlock(); } // 如果插入之前隊列為空,才喚醒等待彈出元素的線程 if (currentCount == 0) { signalNotEmpty(); } } @Override public E take() throws InterruptedException { E headElement; int currentCount; // 先嘗試獲得互斥鎖,以進入臨界區 takeLock.lockInterruptibly(); try { // 因為被生產者喚醒后可能會被其它的消費者消費而使得隊列再次為空,需要循環的判斷 while(this.count.get() == 0){ notEmpty.await(); } headElement = dequeue(); currentCount = this.count.getAndDecrement(); } finally { // 出隊完畢,釋放鎖 takeLock.unlock(); } // 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程 if (currentCount == elements.length) { signalNotFull(); } return headElement; } /** * 喚醒等待隊列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock takeLock.lock(); try { // 喚醒一個等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 喚醒等待隊列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock putLock.lock(); try { // 喚醒一個等待隊列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } }
3. lost wakeup問題
在上述待改進的雙鎖實現第二版中,阻塞在notFull中的生產者線程完全依賴相對應的消費者線程來將其喚醒(阻塞在notEmpty中的消費者線程也同樣依賴對應的生產者線程將其喚醒),這在生產者線程和消費者線程並發時會出現lost wakeup的問題。
下面構造一個簡單而不失一般性的例子來說明,為什么上述第二版的實現中會出現問題。
時序圖(假設阻塞隊列的長度為5(element.length=5),且一開始時隊列已滿)
生產者線程P1 | 生產者線程P2 | 消費者線程C | |
1 | 執行put操作,此時隊列已滿。 執行while循環中的notfull.await陷入阻塞狀態 (await會釋放putLock) |
||
2 | 執行take操作,隊列未滿,成功執行完dequeue。 此時currentCount=5,this.count=4, 執行takeLock.unLock釋放takeLock鎖 |
||
3 | 執行put操作,拿到putLock鎖,由於消費者C已經執行完出隊操作, 成功執行enqueue。 此時currentCount=4,this.count=5, 執行putLock.unLock釋放putLock鎖 |
||
4 | 判斷currentCount == elements.length為真, 執行signalNotFull,並成功拿到putLock。 notFull.signal喚醒等待在其上的生產者線程P1。 take方法執行完畢,return返回 |
||
5 | 被消費者C喚醒,但此時count=5,無法跳出while循環, 繼續await阻塞在notFull條件變量中 |
||
6 | 判斷currentCount == 0為假,進行處理。 put方法執行完畢 ,return返回 |
可以看到,雖然生產者線程P1由於隊列已滿而先被阻塞,而消費者線程C在出隊后也確實通知喚醒了生產者線程P1。但是由於生產者線程P2和消費者線程C的並發執行,導致了生產者線程P1在被喚醒后依然無法成功執行入隊操作,只能繼續的阻塞下去。在一些情況下,P1生產者線程可能再也不會被喚醒而永久的阻塞在條件變量notFull上。
為了解決這一問題,雙鎖版本的阻塞隊列其生產者線程不能僅僅依靠消費者線程來將其喚醒,而是需要在其它生產者線程在入隊操作完成后,發現隊列未滿時也嘗試着喚醒由於上述並發場景發生lost wakeup問題的生產者線程(消費者線程在出隊時的優化亦是如此)。
最終優化的V5版本的出隊、入隊實現:
/** private final AtomicInteger count = new AtomicInteger(); this.takeLock = new ReentrantLock(); this.notEmpty = this.takeLock.newCondition(); this.putLock = new ReentrantLock(); this.notFull = this.putLock.newCondition(); */ @Override public void put(E e) throws InterruptedException { int currentCount; // 先嘗試獲得互斥鎖,以進入臨界區 putLock.lockInterruptibly(); try { // 因為被消費者喚醒后可能會被其它的生產者再度填滿隊列,需要循環的判斷 while (count.get() == elements.length) { // put操作時,如果隊列已滿則進入notFull條件變量的等待隊列,並釋放條件變量對應的互斥鎖 notFull.await(); // 消費者進行出隊操作時 } // 走到這里,說明當前隊列不滿,可以執行入隊操作 enqueue(e); currentCount = count.getAndIncrement(); // 如果在插入后隊列仍然沒滿,則喚醒其他等待插入的線程 if (currentCount + 1 < elements.length) { notFull.signal(); } } finally { // 入隊完畢,釋放鎖 putLock.unlock(); } // 如果插入之前隊列為空,才喚醒等待彈出元素的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock if (currentCount == 0) { signalNotEmpty(); } } @Override public E take() throws InterruptedException { E headElement; int currentCount; // 先嘗試獲得互斥鎖,以進入臨界區 takeLock.lockInterruptibly(); try { // 因為被生產者喚醒后可能會被其它的消費者消費而使得隊列再次為空,需要循環的判斷 while(this.count.get() == 0){ notEmpty.await(); } headElement = dequeue(); currentCount = this.count.getAndDecrement(); // 如果隊列在彈出一個元素后仍然非空,則喚醒其他等待隊列非空的線程 if (currentCount - 1 > 0) { notEmpty.signal(); } } finally { // 出隊完畢,釋放鎖 takeLock.unlock(); } // 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock if (currentCount == elements.length) { signalNotFull(); } return headElement; } /** * 喚醒等待隊列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock takeLock.lock(); try { // 喚醒一個等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 喚醒等待隊列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock putLock.lock(); try { // 喚醒一個等待隊列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } }
3. 不同版本阻塞隊列的性能測試
前面從v2版本開始,對所實現的阻塞隊列進行了一系列的優化,一直到最終的V5版本實現了一個基於雙鎖,雙條件變量的高性能版本。
下面對v3-v5版本進行一輪基礎的性能測試(v2無限輪詢性能太差),看看其實際性能是否真的如博客第二章中所說的那般,高版本的性能是更優秀的。同時令jdk中的ArrayBlockingQueue和LinkedBlockingQueue也實現MyBlockingQueue,也加入測試。
測試工具類BlockingQueueTestUtil:

1 public class BlockingQueueTestUtil { 2 public static long statisticBlockingQueueRuntime( 3 MyBlockingQueue<Integer> blockingQueue, int workerNum, int perWorkerProcessNum, int repeatTime) throws InterruptedException { 4 ExecutorService executorService = Executors.newFixedThreadPool(workerNum * 2); 5 // 第一次執行時存在一定的初始化開銷,不進行統計 6 oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum); 7 8 long totalTime = 0; 9 for(int i=0; i<repeatTime; i++){ 10 long oneTurnTime = oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum); 11 totalTime += oneTurnTime; 12 } 13 14 executorService.shutdown(); 15 16 assert blockingQueue.isEmpty(); 17 18 return totalTime/repeatTime; 19 } 20 21 private static long oneTurnExecute(ExecutorService executorService, MyBlockingQueue<Integer> blockingQueue, 22 int workerNum, int perWorkerProcessNum) throws InterruptedException { 23 long startTime = System.currentTimeMillis(); 24 CountDownLatch countDownLatch = new CountDownLatch(workerNum * 2); 25 26 // 創建workerNum個生產者/消費者 27 for(int i=0; i<workerNum; i++){ 28 executorService.execute(()->{ 29 produce(blockingQueue,perWorkerProcessNum); 30 countDownLatch.countDown(); 31 }); 32 33 executorService.execute(()->{ 34 consume(blockingQueue,perWorkerProcessNum); 35 countDownLatch.countDown(); 36 }); 37 } 38 countDownLatch.await(); 39 long endTime = System.currentTimeMillis(); 40 41 return endTime - startTime; 42 } 43 44 private static void produce(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){ 45 try { 46 // 每個生產者生產perWorkerProcessNum個元素 47 for(int j=0; j<perWorkerProcessNum; j++){ 48 blockingQueue.put(j); 49 } 50 } catch (InterruptedException e) { 51 throw new RuntimeException(e); 52 } 53 } 54 55 private static void consume(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){ 56 try { 57 // 每個消費者消費perWorkerProcessNum個元素 58 for(int j=0; j<perWorkerProcessNum; j++){ 59 blockingQueue.take(); 60 } 61 } catch (InterruptedException e) { 62 throw new RuntimeException(e); 63 } 64 } 65 }
jdk的ArrayBlockingQueue簡單包裝(JDKArrayBlockingQueue):

public class JDKArrayBlockingQueue<E> implements MyBlockingQueue<E> { private final BlockingQueue<E> jdkBlockingQueue; /** * 指定隊列大小的構造器 * * @param capacity 隊列大小 */ public JDKArrayBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); jdkBlockingQueue = new ArrayBlockingQueue<>(capacity); } @Override public void put(E e) throws InterruptedException { jdkBlockingQueue.put(e); } @Override public E take() throws InterruptedException { return jdkBlockingQueue.take(); } @Override public boolean isEmpty() { return jdkBlockingQueue.isEmpty(); } @Override public String toString() { return "JDKArrayBlockingQueue{" + "jdkBlockingQueue=" + jdkBlockingQueue + '}'; } }
jdk的LinkedBlockingQueue簡單包裝(JDKLinkedBlockingQueue):

public class JDKLinkedBlockingQueue<E> implements MyBlockingQueue<E> { private final BlockingQueue<E> jdkBlockingQueue; /** * 指定隊列大小的構造器 * * @param capacity 隊列大小 */ public JDKLinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); jdkBlockingQueue = new LinkedBlockingQueue<>(capacity); } @Override public void put(E e) throws InterruptedException { jdkBlockingQueue.put(e); } @Override public E take() throws InterruptedException { return jdkBlockingQueue.take(); } @Override public boolean isEmpty() { return jdkBlockingQueue.isEmpty(); } @Override public String toString() { return "JDKLinkedBlockingQueue{" + "jdkBlockingQueue=" + jdkBlockingQueue + '}'; } }
測試主體代碼:
public class BlockingQueuePerformanceTest { /** * 隊列容量 * */ private static final int QUEUE_CAPACITY = 3; /** * 並發線程數(消費者 + 生產者 = 2 * WORKER_NUM) * */ private static final int WORKER_NUM = 30; /** * 單次測試中每個線程訪問隊列的次數 * */ private static final int PER_WORKER_PROCESS_NUM = 3000; /** * 重復執行的次數 * */ private static final int REPEAT_TIME = 5; public static void main(String[] args) throws InterruptedException { { MyBlockingQueue<Integer> myArrayBlockingQueueV3 = new MyArrayBlockingQueueV3<>(QUEUE_CAPACITY); long avgCostTimeV3 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV3, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME); System.out.println(costTimeLog(MyArrayBlockingQueueV3.class, avgCostTimeV3)); } { MyBlockingQueue<Integer> myArrayBlockingQueueV4 = new MyArrayBlockingQueueV4<>(QUEUE_CAPACITY); long avgCostTimeV4 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV4, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME); System.out.println(costTimeLog(MyArrayBlockingQueueV4.class, avgCostTimeV4)); } { MyBlockingQueue<Integer> myArrayBlockingQueueV5 = new MyArrayBlockingQueueV5<>(QUEUE_CAPACITY); long avgCostTimeV5 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV5, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME); System.out.println(costTimeLog(MyArrayBlockingQueueV5.class, avgCostTimeV5)); } { MyBlockingQueue<Integer> jdkArrayBlockingQueue = new JDKArrayBlockingQueue<>(QUEUE_CAPACITY); long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkArrayBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME); System.out.println(costTimeLog(JDKArrayBlockingQueue.class, avgCostTimeJDK)); } { MyBlockingQueue<Integer> jdkLinkedBlockingQueue = new JDKLinkedBlockingQueue<>(QUEUE_CAPACITY); long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkLinkedBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME); System.out.println(costTimeLog(JDKLinkedBlockingQueue.class, avgCostTimeJDK)); } } private static String costTimeLog(Class blockQueueCLass,long costTime){ return blockQueueCLass.getSimpleName() + " avgCostTime=" + costTime + "ms"; } }
上述代碼指定的參數為基於最大容量為3的阻塞隊列,生產者、消費者線程各30個,每個線程執行3000次出隊或入隊操作,重復執行5次用於統計平均時間。
我的機器上的運行結果如下:
MyArrayBlockingQueueV3 avgCostTime=843ms MyArrayBlockingQueueV4 avgCostTime=530ms MyArrayBlockingQueueV5 avgCostTime=165ms JDKArrayBlockingQueue avgCostTime=506ms JDKLinkedBlockingQueue avgCostTime=163ms
執行時長v3 > v4 > JDKArrayBlockingQueue > MyArrayBlockingQueueV5 > JDKLinkedBlockingQueue,且v4耗時大致等於JDKArrayBlockingQueue、v5耗時大致等於JDKLinkedBlockingQueue。
究其原因是因為jdk的ArrayBlockingQueue實現和V4版本一樣,是基於單鎖,雙條件變量的;而jdk的LinkedBlockingQueue實現和V5版本一樣,是基於雙鎖,雙條件變量的(V4、V5版本的實現就是參考的jdk源碼)。
雖然測試的用例不是很全面,但測試結果和理論大致是吻合的,希望大家通過測試結果來加深對不同版本間性能差異的背后原理的理解。
4. 為什么jdk中的ArrayBlockingQueue不基於性能更好的雙鎖實現 ?
看到這里,不知你是否和我一樣對為什么jdk的ArrayBlockingQueue使用單鎖而不使用性能更好的雙鎖實現而感到疑惑。所幸網上也有不少小伙伴有類似的疑問,這里將相關內容簡單梳理一下。
1. 基於數組實現的阻塞隊列(ABQ)是可以采用雙鎖實現更加高效率的出隊、入隊的。但由於jdk中阻塞隊列是屬於集合Collection的一個子類,雙鎖版本的ABQ其迭代器會比單鎖的復雜很多很多,但在性能上的改善並不那么的可觀。ABQ的實現在復雜度和性能上做了一個折中,選擇了容易實現但性能稍低的單鎖實現。
http://jsr166-concurrency.10961.n7.nabble.com/ArrayBlockingQueue-concurrent-put-and-take-tc1306.html
2. 如果對性能有更加苛刻要求的話,可以考慮使用jdk中基於雙鎖實現的LinkedBlockingQueue(LBQ)。需要注意的是,在高吞吐量的出隊、入隊的場景下,LBQ鏈式的結構在垃圾回收時性能會略低於基於數組的,緊湊結構的ABQ。
3. jdk提供了一個龐大而全面的集合框架,每個具體的數據結構都需要盡可能多的實現高層的接口和抽象方法。這樣的設計對於使用者來說確實很友好,但也令實現者背上了沉重的負擔,必須為實現一些可能極少使用的接口而花費巨大的精力,甚至反過來影響到特定數據結構的本身的實現。ABQ受制於雙鎖版本迭代器實現的復雜度,而被迫改為效率更低的單鎖實現就是一個典型的例子。
5. 總結
前段時間迷上了MIT6.830的數據庫課程,在理解了課程所提供的實驗后(共6個lab)收獲很大,因此想着自己再動手實現一個更加全面的版本(並發的B+樹,MVCC多版本控制、行級鎖以及sql解釋器、網絡協議等等)。但一段時間后發現上述的功能難度很大且實現起來細節很多,這將耗費我過多的時間而被迫放棄了(膨脹了Orz)。在被打擊后,清醒的意識到對於現階段的我來說還是應該穩扎穩打,着眼於更小的知識點,通過自己動手造輪子的方式加深對知識點的理解,至於擼一個完善的關系型數據庫這種宏大的目標受制於我目前的水平還是暫時先放放吧。
本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(blocking queue模塊)。后續應該會陸續更新關於自己動手實現線程池、抽象同步隊列AQS等的博客。
還存在很多不足之處,請多多指教。
主要參考文章:
https://zhuanlan.zhihu.com/p/64156753 從0到1實現自己的阻塞隊列(上)
https://zhuanlan.zhihu.com/p/64156910 從0到1實現自己的阻塞隊列(下)
https://blog.csdn.net/liubenlong007/article/details/102823081 為什么ArrayBlockingQueue單鎖實現,LinkedBlockingQueue雙鎖實現?