聊聊並發(四)——阻塞隊列


一、概述

1、介紹

  強烈建議讀者看這篇之前,先了解隊列相關知識,以及生產者與消費者模式

  concurrent 包中,BlockingQueue 很好的解決了多線程中,如何高效安全"傳輸"數據的問題。通過這些高效並且線程安全的隊列類,為快速搭建高質量的多線程程序帶來極大的便利。
  阻塞隊列,首先它是一個隊列(先進先出),通過一個共享的隊列,可以使得數據從隊列的一端輸入,從另外一端輸出。

  當隊列是的,從隊列中獲取元素的操作將會被阻塞。
  當隊列是滿的,從隊列中添加元素的操作將會被阻塞。

  api文檔:https://www.matools.com/api/java8

2、為什么需要阻塞隊列?

  理由:需要結合生產者與消費者來說。
  好處就是不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue 都一手包辦了。在 concurrent 包發布之前,多線程環境下,程序員必須自己控制這些細節,尤其還要兼顧效率和線程安全,這給程序帶來不小的復雜度。

3、相關API

  BlockingQueue 核心方法

方法類型
拋出異常
特殊值
阻塞
超時
插入
add(e)
offer(e)
put(e)
offer(e,time,unit)
刪除
remove()
poll()
take()
poll(time,unit)
檢查
element()
peek()
不可用
不可用

 

拋出異常
隊滿時:再插入元素會拋出異常
隊空時:再刪除元素會拋出異常
特殊值
插入:成功 true,失敗false
刪除:成功,返回隊元素。沒有就返回null
阻塞
隊滿時:再 put 元素會一直阻塞,或響應中斷退出。
隊空時:再 take 元素會一直阻塞。
超時
隊滿時:再插入元素,會阻塞一定時間,超時后退出。
隊空時:再刪除元素,會阻塞一定時間,超時后退出。

4、作用

  API文檔中明確提出,阻塞隊列被設計主要就是用於生產者與消費者模式。這樣並不需要用到Lock 或 synchronized 以及等待喚醒機制。
  而是僅僅用到了阻塞隊列以及原子變量類,就可以實現生產者消費者模型。不用程序員關心具體的加鎖解鎖過程,而是更關注具體的業務邏輯。

二、分類

1、ArrayBlockingQueue

  數組結構組成的有界阻塞隊列。
  基於數組的阻塞隊列,在 ArrayBlockingQueue 內部,維護了一個定長數組,以便緩存隊列中的數據對象。這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue 內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
  ArrayBlockingQueue 在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue。
按照實現原理來分析,ArrayBlockingQueue 完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea 之所以沒這樣去做,也許是因為 ArrayBlockingQueue 的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的 Node 對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於 GC 的影響還是存在一定的區別。而在創建 ArrayBlockingQueue 時,還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。

2、LinkedBlockingQueue

  鏈表結構組成的有界(大小默認值為integer.MAX_VALUE)阻塞隊列。
  基於鏈表的阻塞隊列,同 ArrayBlockingQueue 類似,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時,才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。

  LinkedBlockingQueue 之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

3、DelayQueue

  使用優先級隊列實現的延遲無界阻塞隊列。

  DelayQueue 中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue 是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

4、PriorityBlockingQueue

  支持優先級排序無界阻塞隊列。
  基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的 Compator 對象來決定),但需要注意的是 PriorityBlockingQueue 並不會阻塞生產者,而只會在沒有可消費的數據時,阻塞消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現 PriorityBlockingQueue 時,內部控制線程同步的鎖采用的是非公平鎖。

5、SynchronousQueue

  不存儲元素的阻塞隊列,即單個元素的隊列。單個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態。
  一種無緩沖的等待隊列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么大家都在集市等待。
  相對於有緩沖的 BlockingQueue 來說,少了一個中間經銷商的環節(緩沖區)。如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給哪些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣)。
  但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。聲明一個 SynchronousQueue 有兩種不同的方式,它們之間有着不太一樣的行為。

  源碼示例:構造器

 1 /**
 2  * Creates a {@code SynchronousQueue} with nonfair access policy.
 3  */
 4 public SynchronousQueue() {
 5     this(false);
 6 }
 7 
 8 /**
 9  * Creates a {@code SynchronousQueue} with the specified fairness policy.
10  *
11  * @param fair if true, waiting threads contend in FIFO order for
12  *        access; otherwise the order is unspecified.
13  */
14  // 如果fair是true,那么等待的線程通過一個FIFO保證順序;否則順序是不明確的
15 public SynchronousQueue(boolean fair) {
16     transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
17 }

  公平模式和非公平模式的區別:
  公平模式:SynchronousQueue 會采用公平鎖,並配合一個 FIFO 隊列來阻塞多余的生產者和消費者,從而體現整體的公平策略。
  非公平模式(默認):SynchronousQueue 采用非公平鎖,同時配合一個 LIFO 棧來管理多余的生產者和消費者。
  后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或消費者的數據永遠得不到處理。

6、LinkedTransferQueue

  鏈表組成的無界阻塞隊列。一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。
  相對於其他阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。LinkedTransferQueue 采用一種預占模式。意思就是消費者線程取元素時,如果隊列不為空,則直接取走數據,若隊列為空,那就生成一個節點(節點元素為 null)入隊,然后消費者線程被等待在這個節點上,后面生產者線程入隊時發現有一個元素為 null 的節點,生產者線程就不入隊了,直接就將元素填充到該節點,並喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用的方法返回。

7、LinkedBlockingDeque

  鏈表組成的雙向阻塞隊列。
  LinkedBlockingDeque 是一個由鏈表結構組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素。對於一些指定的操作,在插入或者獲取隊列元素時如果隊列狀態不允許該操作,可能會阻塞該線程直到隊列狀態變更為允許操作,這里的阻塞一般有兩種情況
  插入元素時:如果當前隊列已滿將會進入阻塞狀態,一直等到隊列有空的位置時再將該元素插入。該操作可以通過設置超時參數,超時后返回 false 表示操作失敗,也可以不設置超時參數一直阻塞,中斷后拋出 InterruptedException 異常。
  讀取元素時:如果當前隊列為空會阻塞,直到隊列不為空然后返回元素,同樣可以通過設置超時參數。

三、SynchronousQueue

  沒有容量,與其他BlockingQueue不同,SychronousQueue是一個不存儲元素的BlockingQueue,每一個put操作必須要等待一個take操作,否則不能繼續添加元素,反之亦然。
  代碼示例:

 1 public class SynchronousQueueDemo {
 2     public static void main(String[] args) {
 3         BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
 4 
 5         // AAA線程主要用來對阻塞隊列進行put操作
 6         new Thread(() -> {
 7             try {
 8                 System.out.println(Thread.currentThread().getName() + "\t" + "put 1");
 9                 blockingQueue.put(1);
10                 System.out.println(Thread.currentThread().getName() + "\t" + "put 2");
11                 blockingQueue.put(2);
12                 System.out.println(Thread.currentThread().getName() + "\t" + "put 3");
13                 blockingQueue.put(3);
14             } catch (InterruptedException e) {
15                 e.printStackTrace();
16             }
17         }, "AAA").start();
18 
19         // BBB線程主要從阻塞隊列當中進行take操作
20         new Thread(() -> {
21             try {
22                 Thread.sleep(5000);
23                 System.out.println(Thread.currentThread().getName() + "\t take:" + blockingQueue.take());
24                 Thread.sleep(5000);
25                 System.out.println(Thread.currentThread().getName() + "\t take:" + blockingQueue.take());
26                 Thread.sleep(5000);
27                 System.out.println(Thread.currentThread().getName() + "\t take:" + blockingQueue.take());
28             } catch (InterruptedException e) {
29                 e.printStackTrace();
30             }
31         }, "BBB").start();
32     }
33 }

  參考文檔:https://www.matools.com/api/java8


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM