Java集合--阻塞隊列及各種實現的解析


阻塞隊列(Blocking Queue)

一、隊列的定義

說的阻塞隊列,就先了解下什么是隊列,隊列也是一種特殊的線性表結構,在線性表的基礎上加了一條限制:那就是一端入隊列,一端出隊列,且需要遵循FIFO(先進先出)的原則

隊列的入口一端叫做隊尾(rear),出口一端叫做隊頭(front),最簡單的比如就是排隊買火車票,新加入的排隊者必須在隊尾插入,而下一個排隊結束的永遠的隊伍的第一個人

如下圖示:

 

二、阻塞隊列的定義

了解了隊列,就很好理解什么是阻塞隊列,既然是阻塞的,那就是需要等待,而隊列的操作只有入隊和出隊,那就是在入隊和出隊的時候等待了,所以總結下阻塞隊列就是:

1、入隊時,如果隊列已經滿了,就阻塞等待直到隊列中有位置可以插入

2、出隊時,如果隊列中為空,就阻塞等待直到隊列中有數據可以出隊列

三、Java隊列的定義

Java集合體系分為Collection和Map兩大陣營,而隊列Queue則屬於Collection系列

3.1、Queue接口

Queue接口繼承之Collection接口,在Collection的繼承上增加了隊列獨有的方法

boolean offer(E e):向隊列中插入元素,插入成功返回true;插入失敗返回false

E peek():獲取隊列頭部的元素,如果隊列為空則返回null

E poll():獲取並移除頭部的元素,隊列為空則返回null,和peek的區別是獲取頭部元素之后將頭部元素刪除,符合隊列的出隊操作

E remove():獲取並移除頭部的元素,如果為空則拋異常

3.2、BlockingQueue接口

BlockingQueue接口是繼承之Queue,在Queue方法的繼承上添加了拋出異常操作,因為BlockingQueue上阻塞隊列,所以就存在阻塞時間過長需要中斷阻塞操作,或者是超時中斷阻塞操作

而BlockingQueue也就是阻塞隊列的頂級接口,BlockingQueue不同的實現類就是實現了不同的阻塞隊列的效果

阻塞隊列的入隊和出隊操作可以分為多種操作方式:

1、拋異常:入隊時隊列滿了直接拋異常;出隊時隊列為空直接拋異常  如:add方法和remove方法

2、返回boolean:入隊或出隊成功返回true;失敗返回false 如:offer方法和poll方法

3、阻塞:入隊或出隊失敗則一直阻塞直到成功或者被其他線程喚醒 如:put方法和take方法

4、超時阻塞:入隊和出隊失敗則阻塞指定的時間,超時了還是失敗則取消阻塞 如:offer(timeout)和poll(timeout)方法

四、Java中阻塞隊列的不同實現

4.1、ArrayBlockingQueue

基於數組實現的阻塞隊列,初始化時需要定義數組的大小,也就是隊列的大小,所以這個隊列是一個有界隊列

ArrayBlockingQueue的主要屬性有:

 1     //存儲元素的數組
 2     final Object[] items;
 3 
 4     //下一個出隊的索引
 5     int takeIndex;
 6 
 7     //下一個入隊的索引
 8     int putIndex;
 9 
10     //隊列的大小
11     int count;
12 
13     //可重入鎖
14     final ReentrantLock lock;
15 
16     //出隊操作的等待
17     private final Condition notEmpty;
18 
19     //入隊操作的等待
20     private final Condition notFull;

實現原理:通過可重入鎖ReenTrantLock+Condition 來實現多線程之間的同步效果

入隊過程:

add方法:插入成功返回true;插入失敗拋異常

put方法:插入元素到尾部,如果失敗則調用Condition.await()方法進行阻塞等待,直到被喚醒;

offer方法:插入元素到尾部,如果失敗則直接返回false,

offer(timeout):插入元素到尾部,如果失敗則調用Condition.await(timeout)方法進行阻塞等待指定時間,直到被喚醒或阻塞超時,還是失敗就返回false

而一旦插入成功,就會喚醒出隊的等待操作,執行出隊的Condition的signal()方法

 

出隊過程:

主要方法為:poll()、take()、remove()

基本上和入隊過程類似,出隊結束會喚醒入隊的等待操作,執行入隊的Condition的signal()方法

而不管是入隊操作還是出隊操作,都會通過ReentrantLock來控制同步效果,通過兩個Condition來控制線程之間的通信效果

另外入隊和出隊操作分別通過兩個索引 takeIndex 和putIndex來指定數組的位置,默認從0開始分別遞增,如果達到數組的容量大小,就表示到了數組的邊界了,此時再設置index=0,相當於數組是一個環形數組

環形數組的好處是增刪數據時不需要挪動數組中的其他數據,只需要改變入隊和出隊的指針即可。而如果不是環形數組而是順序數組的話,入隊和出隊就需要大量移動數據,否則數組空間一下就被用完了,性能較差

 

4.2、LinkedBlockingQueue

基於鏈表實現的阻塞隊列,既然是鏈表,那么就可以看出這種阻塞隊列含有鏈表的特性,那就是無界。但是實際上LinkedBlockingQueue是有界隊列,默認大小是Integer的最大值,而也可以通過構造方法傳入固定的capacity大小設置

LinkedBlockingQueue有一個內部類Node,屬性有:E ite和Node next,所以可以看出LinkedBlockingQueue是一個單向鏈表

基本屬性為:

 

 1 //隊列大小,默認為Integer的最大值
 2     private final int capacity;
 3 
 4     //當前隊列元素個數
 5     private final AtomicInteger count = new AtomicInteger();
 6 
 7     //隊列的頭部元素
 8     transient LinkedBlockingQueue.Node<E> head;
 9 
10     //隊列的尾部元素
11     private transient LinkedBlockingQueue.Node<E> last;
12 
13     //出隊鎖
14     private final ReentrantLock takeLock = new ReentrantLock();
15 
16     //出隊的condition
17     private final Condition notEmpty = takeLock.newCondition();
18 
19     //入隊鎖
20     private final ReentrantLock putLock = new ReentrantLock();
21 
22     //入隊的condition
23     private final Condition notFull = putLock.newCondition();

 

 

 

可以看出LinkedBlockingQueue的屬性和ArrayBlockingQueue的屬性大致差不多,都是通過ReentrantLock和Condition來實現多線程之間的同步,而LinkedBlockingQueue卻多了一個ReentrantLock,而不是入隊和出隊共用同一個鎖

那么為什么ArrayBlockingQueue只需要一個ReentrantLock而LinkedBlockingQueue需要兩個ReentrantLock呢?

個人想法;

首先,ReentrantLock肯定是越多越好,鎖越多那么相同鎖的競爭就越少;LinkedBlockingQueue分別有入隊鎖和出隊鎖,所以入隊和出隊的時候不會有競爭鎖的關系;而ArrayBlockingQueue只有一個Lock,那么不管是入隊還是出隊,都需要競爭同一個鎖,所以效率會低點。ArrayBlockingQueue是環形數組結構,入隊的地址和出隊的地址可能是同一個,比如數組table大小為1,那么第一次入隊和出隊需要操作的位置都是table[0]這個元素,所以入隊和出隊必須共用同一把鎖;而LinkedBlockingQueue是鏈表形式,內存地址是散列的,入隊的元素地址和出隊的元素地址永遠不可能會是同一個地址。所以可以采用兩個鎖,分別對入隊進行加鎖同步和對出隊進行加鎖同步即可。

 

4.3、DelayQueue

延遲隊列,顧名思義就是只有當元素達到指定的時間后才可以從隊列中取出。

根據這個思路可以滿足下面幾種需求:

1.定時任務:將任務放入隊列中設置時間,循環阻塞地從隊列中取任務,當從隊列中取出數據就表示時間到了

2.緩存過期:循環從隊列中取數據,一旦取出數據就表示數據過期了,直接刪除即可

 

DelayQueue主要也是通過ReentrantLock+Condition來保證線程安全,而內部還采用了ProrityQueue來保證隊列的優先級,實際就是按延時的時間來進行排序,延遲時間最短的排在隊列的頭部,

所以每次從頭部獲取的元素都是最先會過期的數據。

 

4.4、PriorityBlockingQueue

有優先級的阻塞隊列,底層也是通過數組實現,默認初始容量為11,容量不夠會自動擴容,擴容的最大值為Integer的最大值-8(有些虛擬機再實現數組頭部存儲內容所預留的空間),所以基本上可以認為是無界阻塞隊列

擴容時的線程安全通過ReentrantLock+CAS+volatine實現

用法基本上和ArrayBlockingQueue差不多,只不過PriorityBlockingQueue相當於是無界,另外最重要的一點是它是有優先級的,既然有優先級就涉及到排序

PriorityBlockingQueue默認采用Comparator,或者存儲的元素有自定義的比較器。

而存儲數據的數組也不是簡單的數組,而是采用了二叉堆的數據結構,同時滿足完全二叉樹+堆的數據結構(最大堆上層的元素必須大於所有下層的元素;最小堆或者上層的元素必須小於所有下層的元素)

而PriorityBlockingQueue默認是采用的最小堆,即每次取出的元素都是優先級最小的。那么問題來了,如果通過數組來實現一個二叉堆呢?見下面的圖解:

 先看下二叉堆的數據結構:

可以看出二叉堆的上層永遠比下層的優先級要高,而且可以發現上層節點和子節點的關系:父節點序號=(子節點序號-1)/2;左字節點序號=父節點序號*2+1;右字節點序號=父節點序號*2+2

 那么上面的數據結構就可以通過數組來存儲,如下圖示:

插入操作:將元素直接插入到最底層的節點,如上圖插入節點88

1. 節點88成為節點40到左子節點,新加入的節點遍歷和自己的父節點進行比較,

2.節點88比節點40大,兩者互換位置;繼續88比80大,互換位置,88比100小,位置不動,

3.結果是新插入的節點88成為節點100的左子節點,節點80成為節點88點右子節點,節點40成功節點80點右子節點

 

刪除操作:阻塞隊列的刪除是只刪除位置為0的元素,也就是節點100

1.直接取出數組[0]位置的元素

2.將隊列尾部節點放到頭部來,如上圖就是把節點5放到最頂層,

3.然后將頭節點依次和子節點進行比較,然后進行位置互換操作

 

所以每次插入和刪除元素最多操作次數就是 二叉堆的高度 

代碼不再分析,主要就是實現來二叉堆的邏輯,並且通過ReentrantLock+Condition來保證線程間的同步效果

4.5、SynchronousQueue

SynchonousQueue是比較特殊的阻塞隊列,特殊之處就是這個叫隊列的隊列沒有容量,又或者說容量為0,所以一旦有元素插入此隊列,由於沒有容量,就必須被阻塞直到元素被取出

所以SynchronousQueue更像是一個通道,一端發數據,一端消費數據,數據不可以被堆積,發送方或消費方處理不過來或者是不處理都會導致阻塞

 

五、阻塞隊列的應用

5.1、線程池

線程池的構造函數就包含了阻塞隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM