阻塞隊列(Blocking Queue)
一、隊列的定義
說的阻塞隊列,就先了解下什么是隊列,隊列也是一種特殊的線性表結構,在線性表的基礎上加了一條限制:那就是一端入隊列,一端出隊列,且需要遵循FIFO(先進先出)的原則
隊列的入口一端叫做隊尾(rear),出口一端叫做隊頭(front),最簡單的比如就是排隊買火車票,新加入的排隊者必須在隊尾插入,而下一個排隊結束的永遠的隊伍的第一個人
如下圖示:

二、阻塞隊列的定義
了解了隊列,就很好理解什么是阻塞隊列,既然是阻塞的,那就是需要等待,而隊列的操作只有入隊和出隊,那就是在入隊和出隊的時候等待了,所以總結下阻塞隊列就是:
1、入隊時,如果隊列已經滿了,就阻塞等待直到隊列中有位置可以插入
2、出隊時,如果隊列中為空,就阻塞等待直到隊列中有數據可以出隊列
三、Java隊列的定義
Java集合體系分為Collection和Map兩大陣營,而隊列Queue則屬於Collection系列
3.1、Queue接口
Queue接口繼承之Collection接口,在Collection的繼承上增加了隊列獨有的方法
boolean offer(E e):向隊列中插入元素,插入成功返回true;插入失敗返回false
E peek():獲取隊列頭部的元素,如果隊列為空則返回null
E poll():獲取並移除頭部的元素,隊列為空則返回null,和peek的區別是獲取頭部元素之后將頭部元素刪除,符合隊列的出隊操作
E remove():獲取並移除頭部的元素,如果為空則拋異常
3.2、BlockingQueue接口
BlockingQueue接口是繼承之Queue,在Queue方法的繼承上添加了拋出異常操作,因為BlockingQueue上阻塞隊列,所以就存在阻塞時間過長需要中斷阻塞操作,或者是超時中斷阻塞操作
而BlockingQueue也就是阻塞隊列的頂級接口,BlockingQueue不同的實現類就是實現了不同的阻塞隊列的效果
阻塞隊列的入隊和出隊操作可以分為多種操作方式:
1、拋異常:入隊時隊列滿了直接拋異常;出隊時隊列為空直接拋異常 如:add方法和remove方法
2、返回boolean:入隊或出隊成功返回true;失敗返回false 如:offer方法和poll方法
3、阻塞:入隊或出隊失敗則一直阻塞直到成功或者被其他線程喚醒 如:put方法和take方法
4、超時阻塞:入隊和出隊失敗則阻塞指定的時間,超時了還是失敗則取消阻塞 如:offer(timeout)和poll(timeout)方法
四、Java中阻塞隊列的不同實現
4.1、ArrayBlockingQueue
基於數組實現的阻塞隊列,初始化時需要定義數組的大小,也就是隊列的大小,所以這個隊列是一個有界隊列。
ArrayBlockingQueue的主要屬性有:
1 //存儲元素的數組 2 final Object[] items; 3 4 //下一個出隊的索引 5 int takeIndex; 6 7 //下一個入隊的索引 8 int putIndex; 9 10 //隊列的大小 11 int count; 12 13 //可重入鎖 14 final ReentrantLock lock; 15 16 //出隊操作的等待 17 private final Condition notEmpty; 18 19 //入隊操作的等待 20 private final Condition notFull;
實現原理:通過可重入鎖ReenTrantLock+Condition 來實現多線程之間的同步效果
入隊過程:
add方法:插入成功返回true;插入失敗拋異常
put方法:插入元素到尾部,如果失敗則調用Condition.await()方法進行阻塞等待,直到被喚醒;
offer方法:插入元素到尾部,如果失敗則直接返回false,
offer(timeout):插入元素到尾部,如果失敗則調用Condition.await(timeout)方法進行阻塞等待指定時間,直到被喚醒或阻塞超時,還是失敗就返回false
而一旦插入成功,就會喚醒出隊的等待操作,執行出隊的Condition的signal()方法
出隊過程:
主要方法為:poll()、take()、remove()
基本上和入隊過程類似,出隊結束會喚醒入隊的等待操作,執行入隊的Condition的signal()方法
而不管是入隊操作還是出隊操作,都會通過ReentrantLock來控制同步效果,通過兩個Condition來控制線程之間的通信效果
另外入隊和出隊操作分別通過兩個索引 takeIndex 和putIndex來指定數組的位置,默認從0開始分別遞增,如果達到數組的容量大小,就表示到了數組的邊界了,此時再設置index=0,相當於數組是一個環形數組
環形數組的好處是增刪數據時不需要挪動數組中的其他數據,只需要改變入隊和出隊的指針即可。而如果不是環形數組而是順序數組的話,入隊和出隊就需要大量移動數據,否則數組空間一下就被用完了,性能較差
4.2、LinkedBlockingQueue
基於鏈表實現的阻塞隊列,既然是鏈表,那么就可以看出這種阻塞隊列含有鏈表的特性,那就是無界。但是實際上LinkedBlockingQueue是有界隊列,默認大小是Integer的最大值,而也可以通過構造方法傳入固定的capacity大小設置
LinkedBlockingQueue有一個內部類Node,屬性有:E ite和Node next,所以可以看出LinkedBlockingQueue是一個單向鏈表
基本屬性為:
1 //隊列大小,默認為Integer的最大值 2 private final int capacity; 3 4 //當前隊列元素個數 5 private final AtomicInteger count = new AtomicInteger(); 6 7 //隊列的頭部元素 8 transient LinkedBlockingQueue.Node<E> head; 9 10 //隊列的尾部元素 11 private transient LinkedBlockingQueue.Node<E> last; 12 13 //出隊鎖 14 private final ReentrantLock takeLock = new ReentrantLock(); 15 16 //出隊的condition 17 private final Condition notEmpty = takeLock.newCondition(); 18 19 //入隊鎖 20 private final ReentrantLock putLock = new ReentrantLock(); 21 22 //入隊的condition 23 private final Condition notFull = putLock.newCondition();
可以看出LinkedBlockingQueue的屬性和ArrayBlockingQueue的屬性大致差不多,都是通過ReentrantLock和Condition來實現多線程之間的同步,而LinkedBlockingQueue卻多了一個ReentrantLock,而不是入隊和出隊共用同一個鎖
那么為什么ArrayBlockingQueue只需要一個ReentrantLock而LinkedBlockingQueue需要兩個ReentrantLock呢?
個人想法;
首先,ReentrantLock肯定是越多越好,鎖越多那么相同鎖的競爭就越少;LinkedBlockingQueue分別有入隊鎖和出隊鎖,所以入隊和出隊的時候不會有競爭鎖的關系;而ArrayBlockingQueue只有一個Lock,那么不管是入隊還是出隊,都需要競爭同一個鎖,所以效率會低點。ArrayBlockingQueue是環形數組結構,入隊的地址和出隊的地址可能是同一個,比如數組table大小為1,那么第一次入隊和出隊需要操作的位置都是table[0]這個元素,所以入隊和出隊必須共用同一把鎖;而LinkedBlockingQueue是鏈表形式,內存地址是散列的,入隊的元素地址和出隊的元素地址永遠不可能會是同一個地址。所以可以采用兩個鎖,分別對入隊進行加鎖同步和對出隊進行加鎖同步即可。
4.3、DelayQueue
延遲隊列,顧名思義就是只有當元素達到指定的時間后才可以從隊列中取出。
根據這個思路可以滿足下面幾種需求:
1.定時任務:將任務放入隊列中設置時間,循環阻塞地從隊列中取任務,當從隊列中取出數據就表示時間到了
2.緩存過期:循環從隊列中取數據,一旦取出數據就表示數據過期了,直接刪除即可
DelayQueue主要也是通過ReentrantLock+Condition來保證線程安全,而內部還采用了ProrityQueue來保證隊列的優先級,實際就是按延時的時間來進行排序,延遲時間最短的排在隊列的頭部,
所以每次從頭部獲取的元素都是最先會過期的數據。
4.4、PriorityBlockingQueue
有優先級的阻塞隊列,底層也是通過數組實現,默認初始容量為11,容量不夠會自動擴容,擴容的最大值為Integer的最大值-8(有些虛擬機再實現數組頭部存儲內容所預留的空間),所以基本上可以認為是無界阻塞隊列
擴容時的線程安全通過ReentrantLock+CAS+volatine實現
用法基本上和ArrayBlockingQueue差不多,只不過PriorityBlockingQueue相當於是無界,另外最重要的一點是它是有優先級的,既然有優先級就涉及到排序
PriorityBlockingQueue默認采用Comparator,或者存儲的元素有自定義的比較器。
而存儲數據的數組也不是簡單的數組,而是采用了二叉堆的數據結構,同時滿足完全二叉樹+堆的數據結構(最大堆上層的元素必須大於所有下層的元素;最小堆或者上層的元素必須小於所有下層的元素)
而PriorityBlockingQueue默認是采用的最小堆,即每次取出的元素都是優先級最小的。那么問題來了,如果通過數組來實現一個二叉堆呢?見下面的圖解:
先看下二叉堆的數據結構:

可以看出二叉堆的上層永遠比下層的優先級要高,而且可以發現上層節點和子節點的關系:父節點序號=(子節點序號-1)/2;左字節點序號=父節點序號*2+1;右字節點序號=父節點序號*2+2
那么上面的數據結構就可以通過數組來存儲,如下圖示:

插入操作:將元素直接插入到最底層的節點,如上圖插入節點88
1. 節點88成為節點40到左子節點,新加入的節點遍歷和自己的父節點進行比較,
2.節點88比節點40大,兩者互換位置;繼續88比80大,互換位置,88比100小,位置不動,
3.結果是新插入的節點88成為節點100的左子節點,節點80成為節點88點右子節點,節點40成功節點80點右子節點
刪除操作:阻塞隊列的刪除是只刪除位置為0的元素,也就是節點100
1.直接取出數組[0]位置的元素
2.將隊列尾部節點放到頭部來,如上圖就是把節點5放到最頂層,
3.然后將頭節點依次和子節點進行比較,然后進行位置互換操作
所以每次插入和刪除元素最多操作次數就是 二叉堆的高度
代碼不再分析,主要就是實現來二叉堆的邏輯,並且通過ReentrantLock+Condition來保證線程間的同步效果
4.5、SynchronousQueue
SynchonousQueue是比較特殊的阻塞隊列,特殊之處就是這個叫隊列的隊列沒有容量,又或者說容量為0,所以一旦有元素插入此隊列,由於沒有容量,就必須被阻塞直到元素被取出
所以SynchronousQueue更像是一個通道,一端發數據,一端消費數據,數據不可以被堆積,發送方或消費方處理不過來或者是不處理都會導致阻塞
五、阻塞隊列的應用
5.1、線程池
線程池的構造函數就包含了阻塞隊列
