Java並發之BlockingQueue


  一、Queue

        Queue是隊列接口是 Collection的子接口。除了基本的 Collection操作外,隊列還提供其他的插入、提取和檢查操作。每個方法都存在兩種形式:一種拋出異常(操作失敗時),另一種返回一個特殊值(null 或 false,具體取決於操作)。插入操作的后一種形式是用於專門為有容量限制的 Queue 實現設計的;在大多數實現中,插入操作不會失敗。

 

  拋出異常 返回特殊值
插入 add(e) offer(e)
移除 remove(e) poll(e)
檢查 element() peek()

 

        隊列通常(但並非一定)以 FIFO(先進先出)的方式排序各個元素。不過優先級隊列和 LIFO 隊列(或堆棧)例外,前者根據提供的比較器或元素的自然順序對元素進行排序,后者按 LIFO(后進先出)的方式對元素進行排序。無論使用哪種排序方式,隊列的頭 都是調用 remove() 或 poll() 所移除的元素。在 FIFO 隊列中,所有的新元素都插入隊列的末尾。其他種類的隊列可能使用不同的元素放置規則。每個 Queue 實現必須指定其順序屬性。 

        如果可能,offer 方法可插入一個元素,失敗則返回 false。這與 Collection.add 方法不同,該方法只能通過拋出未經檢查的異常使添加元素失敗。offer 方法設計用於正常的失敗情況,而不是出現異常的情況,例如在容量固定(有界)的隊列中。 

        remove() 和 poll() 方法可移除和返回隊列的頭。到底從隊列中移除哪個元素是隊列排序策略的功能,而該策略在各種實現中是不同的。remove() 和 poll() 方法僅在隊列為空時其行為有所不同:remove() 方法拋出一個異常,而 poll() 方法則返回 null。 

        element() 和 peek() 獲取但不移除隊列的頭,element與 peek 唯一的不同在於:此隊列為空時將拋出一個異常。

        Queue 接口並未定義阻塞隊列的方法,而這在並發編程中是很常見的。BlockingQueue 接口則定義了那些等待元素出現或等待隊列中有可用空間的方法,這些方法擴展了此接口。 

        Queue 實現通常不允許插入 null 元素,盡管某些實現(如 LinkedList)並不禁止插入 null。即使在允許 null 的實現中,也不應該將 null 插入到 Queue 中,因為 null 也用作 poll 方法的一個特殊返回值,表明隊列不包含元素。 

        Queue 實現通常未定義 equals 和 hashCode 方法的基於元素的版本,而是從 Object 類繼承了基於身份的版本,因為對於具有相同元素但有不同排序屬性的隊列而言,基於元素的相等性並非總是定義良好的。 

        Queue 作為隊列可以實現一個按固定順序訪問其內部元素的結構,與 LinkedList等實現不同,Queue並不能獲取指定位置的元素。

        在 ThreadPoolExecutor類中創建線程池時使用的是 BlockingQueue。BlockingQueue是 Queue的子接口,BlockingQueue的實現類有很多:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。

        Deque與 Queue不同在於,Deque是一個雙端隊列,支持在兩端插入和移除元素。名稱 deque 是“double ended queue(雙端隊列)”的縮寫,通常讀為“deck”。大多數 Deque 實現對於它們能夠包含的元素數沒有固定限制,但此接口既支持有容量限制的雙端隊列,也支持沒有固定大小限制的雙端隊列。 Deque不是我們要學習的重點,下面就不提了。

        我們要用到的實現為 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,DelayedWorkQueue.其中 DelayedWorkQueue是 ScheduledThreadPoolExecutor的內部類實現。

        頂層接口為 Queue,然后是 Queue的抽象實現類 AbstractQueue和子接口 BlockingQueue。圖中四個類均繼承於 AbstractQueue並實現 BlockingQueue接口,DelayedWorkQueue同樣實現了 BlockingQueue,但DelayedWorkQueue繼承自 AbstractCollection。

        以下是Queue的源代碼:

Java代碼   收藏代碼
  1. public interface Queue<E> extends Collection<E> {  
  2.     /** 
  3.      * 將指定的元素插入此隊列(如果立即可行且不會違反容量限制),在成功時返回 true,如果當前沒有可用的空間,則拋出IllegalStateException 
  4.      */  
  5.     boolean add(E e);  
  6.   
  7.     /** 
  8.      * 將指定的元素插入此隊列(如果立即可行且不會違反容量限制),當使用有容量限制的隊列時,此方法通常要優於add(E),后者可能無法插入元素,而只是拋出一個異常 
  9.      */  
  10.     boolean offer(E e);  
  11.   
  12.     /** 
  13.      * 獲取並移除此隊列的頭。此方法與 poll 唯一的不同在於:此隊列為空時將拋出一個異常 
  14.      */  
  15.     E remove();  
  16.   
  17.     /** 
  18.      * 獲取並移除此隊列的頭,如果此隊列為空,則返回 null 
  19.      */  
  20.     E poll();  
  21.   
  22.     /** 
  23.      * 獲取,但是不移除此隊列的頭。此方法與 peek 唯一的不同在於:此隊列為空時將拋出一個異常 
  24.      */  
  25.     E element();  
  26.   
  27.     /** 
  28.      * 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null 
  29.      */  
  30.     E peek();  
  31. }  

 

        二、AbstractQueue

        AbstractQueue提供某些 Queue 操作的主要實現。此類中的實現適用於基本實現不 允許包含 null 元素時。add、remove 和 element 方法分別基於 offer、poll 和 peek 方法,但是它們通過拋出異常而不是返回 false 或 null 來指示失敗。 

        擴展此類的 Queue 實現至少必須定義一個不允許插入 null 元素的 Queue.offer(E) 方法,該方法以及 Queue.peek()、Queue.poll()、Collection.size() 和 Collection.iterator() 都支持 Iterator.remove() 方法。通常還要重寫其他方法。如果無法滿足這些要求,那么可以轉而考慮為 AbstractCollection 創建子類。 

        以下是 AbstractQueue的源代碼:

Java代碼   收藏代碼
  1. public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {  
  2.   
  3.     /** 
  4.      * 子類使用的構造方法 
  5.      */  
  6.     protected AbstractQueue() {  
  7.     }  
  8.   
  9.     /** 
  10.      * 將指定的元素插入到此隊列中(如果立即可行且不會違反容量限制),在成功時返回 true,如果當前沒有可用空間,則拋出 IllegalStateException。 
  11.      */  
  12.     public boolean add(E e) {  
  13.         if (offer(e))  
  14.             return true;  
  15.         else  
  16.             throw new IllegalStateException("Queue full");  
  17.     }  
  18.   
  19.     /** 
  20.      * 獲取並移除此隊列的頭。此方法與 poll 唯一的不同在於:此隊列為空時將拋出一個異常。 
  21.      * 除非隊列為空,否則此實現返回 poll 的結果。  
  22.      */  
  23.     public E remove() {  
  24.         E x = poll();  
  25.         if (x != null)  
  26.             return x;  
  27.         else  
  28.             throw new NoSuchElementException();  
  29.     }  
  30.   
  31.     /** 
  32.      * 獲取但不移除此隊列的頭。此方法與 peek 唯一的不同在於:此隊列為空時將拋出一個異常。 
  33.      * 除非隊列為空,否則此實現返回 peek 的結果。 
  34.      */  
  35.     public E element() {  
  36.         E x = peek();  
  37.         if (x != null)  
  38.             return x;  
  39.         else  
  40.             throw new NoSuchElementException();  
  41.     }  
  42.   
  43.     /** 
  44.      * 移除此隊列中的所有元素。此調用返回后,隊列將為空。  
  45.      * 此實現重復調用 poll,直到它返回 null 為止。  
  46.      */  
  47.     public void clear() {  
  48.         while (poll() != null)  
  49.             ;  
  50.     }  
  51.   
  52.     /** 
  53.      * 將指定 collection 中的所有元素都添加到此隊列中。 
  54.      * 如果試圖將某一隊列 addAll 到該隊列本身中,則會導致 IllegalArgumentException。 
  55.      * 此外,如果正在進行此操作時修改指定的 collection,則此操作的行為是不確定的。 
  56.      * 此實現在指定的 collection 上進行迭代,並依次將迭代器返回的每一個元素添加到此隊列中。 
  57.      * 在試圖添加某一元素(尤其是 null 元素)時如果遇到了運行時異常,則可能導致在拋出相關異常時只成功地添加了某些元素。 
  58.      */  
  59.     public boolean addAll(Collection<? extends E> c) {  
  60.         if (c == null)  
  61.             throw new NullPointerException();  
  62.         if (c == this)  
  63.             throw new IllegalArgumentException();  
  64.         boolean modified = false;  
  65.         Iterator<? extends E> e = c.iterator();  
  66.         while (e.hasNext()) {  
  67.             if (add(e.next()))  
  68.                 modified = true;  
  69.         }  
  70.         return modified;  
  71.     }  
  72. }  

 

        三、BlockingQueue

        阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用,從而產生阻塞。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的緩存容器,而消費者也只從容器里拿元素。

        BlockingQueue 的方法以四種形式出現,這四種形式的處理方式不同:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:

 

     拋出異常      返回特殊值       阻塞    超時
   插入    add(e) offer(e) put(e) offer(e, time, unit)
   移除    remove() poll() take() poll(time, unit)
   檢查    element() peek() - -

 

        • 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。 

        • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null 

        • 阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。 

        • 超時:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。 

        BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。 

        BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。 

        BlockingQueue 實現主要用於生產者-消費者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計划地偶爾使用,比如在取消排隊信息時。 

        題外話:所謂生產者消費者模式,這里簡單介紹一下。比如我們在餐廳吃飯,我們就是消費者,餐廳的廚師就是生產者,而餐廳的服務員就是一個緩沖環節。當生產者制作好菜品(生產產品),交由服務員(緩沖區),由服務員將菜品送至顧客(消費者)品用。


       

    生產者-消費者模式最重要的作用就是解耦,利用緩沖區將兩者分離。如果每一個廚師做完了菜都需要親自送到顧客桌上,那么這樣就是將廚師與顧客綁定到了一起。加入中間緩沖環節,也就是服務員,將送菜的任務交由服務員(緩沖區)去處理,這樣生產者與消費者就可以各自做自己的事情了。在此模式下兩者間支持並發操作,因為飯店的廚師肯定不止一個,顧客也是如此。再有就是支持兩者間不同步,因為兩者間的數量與效率是不同步的,這就會導致生產與消費的速度不同。

         BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。 

        BlockingQueue 實質上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。 

       注意,BlockingQueue 可以安全地與多個生產者和多個使用者一起使用。 

       以下是基於典型的生產者-消費者場景的一個用例:

Java代碼   收藏代碼
  1. class Producer implements Runnable {  
  2.     private final BlockingQueue queue;  
  3.     private int i;  
  4.   
  5.     Producer(BlockingQueue q) {  
  6.         queue = q;  
  7.     }  
  8.   
  9.     public void run() {  
  10.         try {  
  11.             while (true) {  
  12.                 queue.put(produce());// 將產品放入緩沖隊列  
  13.             }  
  14.         } catch (InterruptedException e) {  
  15.             e.printStackTrace();  
  16.         }  
  17.     }  
  18.   
  19.     int produce() {  
  20.         return i++;// 生產產品  
  21.     }  
  22. }  
  23.   
  24. class Consumer implements Runnable {  
  25.     private final BlockingQueue queue;  
  26.   
  27.     Consumer(BlockingQueue q) {  
  28.         queue = q;  
  29.     }  
  30.   
  31.     public void run() {  
  32.         try {  
  33.             while (true) {  
  34.                 consume(queue.take());// 從緩沖隊列取出產品  
  35.             }  
  36.         } catch (InterruptedException e) {  
  37.             e.printStackTrace();  
  38.         }  
  39.     }  
  40.   
  41.     void consume(Object x) {  
  42.         System.out.println("消費:"+x);// 消費產品  
  43.     }  
  44. }  
  45.   
  46. public class Runner {  
  47.     public static void main(String[] args) {  
  48.         BlockingQueue q = new LinkedBlockingQueue<Integer>(10);// 或其他實現  
  49.         Producer p = new Producer(q);  
  50.         Consumer c1 = new Consumer(q);  
  51.         Consumer c2 = new Consumer(q);  
  52.         new Thread(p).start();  
  53.         new Thread(c1).start();  
  54.         new Thread(c2).start();  
  55.     }  
  56. }  
  57. //結果:  
  58. ...  
  59. 消費:160607  
  60. 消費:160608  
  61. 消費:160609  
  62. 消費:160610  
  63. 消費:160611  
  64. ...  

        當生產者與消費者線程啟動后,首先生產者會不斷往隊列中添加產品,一旦隊列填滿則生產停止,然后消費者從隊列中取出產品使用,顯然過程中使用了類似於 wait與 notify的流程,后面會詳細分析。

        以下是 BlockingQueue的源代碼:

Java代碼   收藏代碼
  1. public interface BlockingQueue<E> extends Queue<E> {  
  2.     /** 
  3.      * 將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true, 
  4.      * 如果當前沒有可用的空間,則拋出 IllegalStateException。 
  5.      * 當使用有容量限制的隊列時,通常首選 offer 
  6.      */  
  7.     boolean add(E e);  
  8.   
  9.     /** 
  10.      * 將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true, 
  11.      * 如果當前沒有可用的空間,則返回 false。 
  12.      * 當使用有容量限制的隊列時,此方法通常要優於 add(E),后者可能無法插入元素,而只是拋出一個異常 
  13.      */  
  14.     boolean offer(E e);  
  15.   
  16.     /** 
  17.      * 將指定元素插入此隊列中,將等待可用的空間(即阻塞) 
  18.      */  
  19.     void put(E e) throws InterruptedException;  
  20.   
  21.     /** 
  22.      * 將指定元素插入此隊列中,在到達指定的等待時間前等待可用的空間 
  23.      */  
  24.     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;  
  25.   
  26.     /** 
  27.      * 獲取並移除此隊列的頭部,在元素變得可用之前一直等待 
  28.      */  
  29.     E take() throws InterruptedException;  
  30.   
  31.     /** 
  32.      * 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素 
  33.      */  
  34.     E poll(long timeout, TimeUnit unit) throws InterruptedException;  
  35.   
  36.     /** 
  37.      * 返回在無阻塞的理想情況下(不存在內存或資源約束)此隊列能接受的附加元素數量; 
  38.      * 如果沒有內部限制,則返回 Integer.MAX_VALUE 
  39.      */  
  40.     int remainingCapacity();  
  41.   
  42.     /** 
  43.      * 從此隊列中移除指定元素的單個實例(如果存在)。 
  44.      * 更確切地講,如果此隊列包含一個或多個滿足 o.equals(e) 的元素 e,則移除該元素。 
  45.      * 如果此隊列包含指定元素(或者此隊列由於調用而發生更改),則返回 true 
  46.      */  
  47.     boolean remove(Object o);  
  48.   
  49.     /** 
  50.      * 如果此隊列包含指定元素,則返回 true。更確切地講,當且僅當此隊列至少包含一個滿足 o.equals(e) 的元素 e時,返回 true 
  51.      */  
  52.     public boolean contains(Object o);  
  53.   
  54.     /** 
  55.      * 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。此操作可能比反復輪詢此隊列更有效。 
  56.      * 在試圖向 collection c 中添加元素沒有成功時,可能導致在拋出相關異常時, 
  57.      * 元素會同時在兩個 collection 中出現,或者在其中一個 collection中出現,也可能在兩個 collection 中都不出現。 
  58.      * 如果試圖將一個隊列放入自身隊列中,則會導致 IllegalArgumentException 異常。 
  59.      * 此外,如果正在進行此操作時修改指定的 collection,則此操作行為是不確定的。 
  60.      */  
  61.     int drainTo(Collection<? super E> c);  
  62.   
  63.     /** 
  64.      * 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。 
  65.      * 在試圖向 collection c中添加元素沒有成功時,可能導致在拋出相關異常時, 
  66.      * 元素會同時在兩個 collection 中出現,或者在其中一個 collection中出現,也可能在兩個 collection 中都不出現。 
  67.      * 如果試圖將一個隊列放入自身隊列中,則會導致 IllegalArgumentException 異常。 
  68.      * 此外,如果正在進行此操作時修改指定的 collection,則此操作行為是不確定的。 
  69.      */  
  70.     int drainTo(Collection<? super E> c, int maxElements);  
  71. }  

        后續幾篇介紹阻塞隊列的相關實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM