背景
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法
1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。
多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把准備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)
這也是我們在多線程環境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。
阻塞隊列常用於生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。
在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式

1.放入數據
(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法
的線程);
(2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
(3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.
2. 獲取數據
(1)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null;
(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間
超時還沒有數據可取,返回失敗。
(3)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;
(4)drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出 IllegalStateException("Queue full") 異常。當隊列為空時,從隊列里獲取元素時會拋出 NoSuchElementException 異常 。
返回特殊值:插入方法會返回是否成功,成功則返回 true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回 null
一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里 put 元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里 take 元素,隊列也會阻塞消費者線程,直到隊列可用。
超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
如果是無界阻塞隊列,隊列不可能會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。
阻塞隊列
JDK7 提供了 7 個阻塞隊列。分別是
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列
詳細說明
1. ArrayBlockingQueue
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
2.LinkedBlockingQueue
基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
3. DelayQueue
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
使用場景:
DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。
4. PriorityBlockingQueue
基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。
5. SynchronousQueue
一種無緩沖的等待隊列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對於有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。
聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。公平模式和非公平模式的區別:
如果采用公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
基於阻塞隊列的生產者消費者Demo
package com.hanwl.juc; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class MyResource{ private volatile boolean FLAG = true;//默認開啟,進行生產+消費 private AtomicInteger atomicInteger = new AtomicInteger(); private BlockingQueue<String> blockingQueue=null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } //生產 public void myProd() throws InterruptedException { String data=null; boolean retValue; while(FLAG){ data = atomicInteger.incrementAndGet()+""; retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS); if(retValue){ System.out.println(Thread.currentThread().getName() + "\t" + "插入隊列" + data + "成功"); }else{ System.out.println(Thread.currentThread().getName() + "\t" + "插入隊列" + data + "失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + "\tFLAG==false,停止生產"); } //消費 public void myCons() throws InterruptedException { String res; while (FLAG) { res = blockingQueue.poll(2L, TimeUnit.SECONDS); if (null == res || res.equalsIgnoreCase("")) { FLAG = false; System.out.println(Thread.currentThread().getName() + "\t超過2秒鍾沒有消費,退出消費"); return; } System.out.println(Thread.currentThread().getName() + "\t消費隊列" + res + "成功"); } } public void stop() { this.FLAG = false; } } /** * @author hanwl * @date 2021/3/2-20:08 * volatile/CAS/atomicInteger/BlockQueue/線程交互 * BlockQueue的生產者消費者 */ public class ProdConsBlockQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); //傳接口 new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t生產線程啟動"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t消費線程啟動"); try { myResource.myCons(); } catch (Exception e) { e.printStackTrace(); } },"cons").start(); try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } System.out.println("5秒鍾后,叫停"); myResource.stop(); } }