Java多線程15:Queue、BlockingQueue以及利用BlockingQueue實現生產者/消費者模型


Queue是什么

隊列,是一種數據結構。除了優先級隊列和LIFO隊列外,隊列都是以FIFO(先進先出)的方式對各個元素進行排序的。無論使用哪種排序方式,隊列的頭都是調用remove()或poll()移除元素的。在FIFO隊列中,所有新元素都插入隊列的末尾。

 

Queue中的方法

Queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下JDK API就知道了:

注意一點就好,Queue通常不允許插入Null,盡管某些實現(比如LinkedList)是允許的,但是也不建議。

 

BlockingQueue

1、BlockingQueue概述

只講BlockingQueue,因為BlockingQueue是Queue中的一個重點,並且通過BlockingQueue我們再次加深對於生產者/消費者模型的理解。其他的Queue都不難,通過查看JDK API和簡單閱讀源碼完全可以理解他們的作用。

BlockingQueue,顧名思義,阻塞隊列。BlockingQueue是在java.util.concurrent下的,因此不難理解,BlockingQueue是為了解決多線程中數據高效安全傳輸而提出的。

多線程中,很多場景都可以使用隊列實現,比如經典的生產者/消費者模型,通過隊列可以便利地實現兩者之間數據的共享,定義一個生產者線程,定義一個消費者線程,通過隊列共享數據就可以了。

當然現實不可能都是理想的,比如消費者消費速度比生產者生產的速度要快,那么消費者消費到 一定程度上的時候,必須要暫停等待一下了(使消費者線程處於WAITING狀態)。BlockingQueue的提出,就是為了解決這個問題的,他不用程序員去控制這些細節,同時還要兼顧效率和線程安全。

阻塞隊列所謂的"阻塞",指的是某些情況下線程會掛起(即阻塞),一旦條件滿足,被掛起的線程又會自動喚醒。使用BlockingQueue,不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,這些內容BlockingQueue都已經做好了

2、BlockingQueue中的方法

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已經列了。看一下BlockingQueue中特有的方法:

(1)void put(E e) throws InterruptedException

把e添加進BlockingQueue中,如果BlockingQueue中沒有空間,則調用線程被阻塞,進入等待狀態,直到BlockingQueue中有空間再繼續

(2)void take() throws InterruptedException

取走BlockingQueue里面排在首位的對象,如果BlockingQueue為空,則調用線程被阻塞,進入等待狀態,直到BlockingQueue有新的數據被加入

(3)int drainTo(Collection<? super E> c, int maxElements)

一次性取走BlockingQueue中的數據到c中,可以指定取的個數。通過該方法可以提升獲取數據效率,不需要多次分批加鎖或釋放鎖

3、ArrayBlockingQueue

基於數組的阻塞隊列,必須指定隊列大小。比較簡單。ArrayBlockingQueue中只有一個ReentrantLock對象,這意味着生產者和消費者無法並行運行(見下面的代碼)。另外,創建ArrayBlockingQueue時,可以指定ReentrantLock是否為公平鎖,默認采用非公平鎖。

/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

4、LinkedBlockingQueue

基於鏈表的阻塞隊列,和ArrayBlockingQueue差不多。不過LinkedBlockingQueue如果不指定隊列容量大小,會默認一個類似無限大小的容量,之所以說是類似是因為這個無限大小是Integer.MAX_VALUE,這么說就好理解ArrayBlockingQueue為什么必須要制定大小了,如果ArrayBlockingQueue不指定大小的話就用Integer.MAX_VALUE,那將造成大量的空間浪費,但是基於鏈表實現就不一樣的,一個一個節點連起來而已。另外,LinkedBlockingQueue生產者和消費者都有自己的鎖(見下面的代碼),這意味着生產者和消費者可以"同時"運行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

5、SynchronousQueue

比較特殊,一種沒有緩沖的等待隊列。什么叫做沒有緩沖區,ArrayBlocking中有:

/** The queued items  */
private final E[] items;

數組用以存儲隊列。LinkedBlockingQueue中有:

/**
 * Linked list node class
 */
static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
    volatile E item;
    Node<E> next;
    Node(E x) { item = x; }
}

將隊列以鏈表形式連接。

生產者/消費者操作數據實際上都是通過這兩個"中介"來操作數據的,但是SynchronousQueue則是生產者直接把數據給消費者(消費者直接從生產者這里拿數據),好像又回到了沒有生產者/消費者模型的老辦法了。換句話說,每一個插入操作必須等待一個線程對應的移除操作。SynchronousQueue又有兩種模式:

1、公平模式

采用公平鎖,並配合一個FIFO隊列(Queue)來管理多余的生產者和消費者

2、非公平模式

采用非公平鎖,並配合一個LIFO棧(Stack)來管理多余的生產者和消費者,這也是SynchronousQueue默認的模式

 

利用BlockingQueue實現生產者消費者模型

上一篇我們寫的生產者消費者模型有局限,局限體現在:

  • 緩沖區內只能存放一個數據,實際生產者/消費者模型中的緩沖區內可以存放大量生產者生產出來的數據
  • 生產者和消費者處理數據的速度幾乎一樣

OK,我們就用BlockingQueue來簡單寫一個例子,並且讓生產者、消費者處理數據速度不同。子類選擇的是ArrayBlockingQueue,大小定為10:

public static void main(String[] args)
{
    final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
    Runnable producerRunnable = new Runnable()
    {
        int i = 0;
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我生產了一個" + i++);
                    bq.put(i + "");
                    Thread.sleep(1000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Runnable customerRunnable = new Runnable()
    {
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我消費了一個" + bq.take());
                    Thread.sleep(3000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread producerThread = new Thread(producerRunnable);
    Thread customerThread = new Thread(customerRunnable);
    producerThread.start();
    customerThread.start();
}

代碼的做法是讓生產者生產速度快於消費者消費速度的,看一下運行結果:

 1 我生產了一個0
 2 我消費了一個1
 3 我生產了一個1
 4 我生產了一個2
 5 我消費了一個2
 6 我生產了一個3
 7 我生產了一個4
 8 我生產了一個5
 9 我消費了一個3
10 我生產了一個6
11 我生產了一個7
12 我生產了一個8
13 我消費了一個4
14 我生產了一個9
15 我生產了一個10
16 我生產了一個11
17 我消費了一個5
18 我生產了一個12
19 我生產了一個13
20 我生產了一個14
21 我消費了一個6
22 我生產了一個15
23 我生產了一個16
24 我消費了一個7
25 我生產了一個17
26 我消費了一個8
27 我生產了一個18

分兩部分來看輸出結果:

1、第1行~第23行。這塊BlockingQueue未滿,所以生產者隨便生產,消費者隨便消費,基本上都是生產3個消費1個,消費者消費速度慢

2、第24行~第27行,從前面我們可以看出,生產到16,消費到6,說明到了ArrayBlockingQueue的極限10了,這時候沒辦法,生產者生產一個ArrayBlockingQueue就滿了,所以不能繼續生產了,只有等到消費者消費完才可以繼續生產。所以之后的打印內容一定是一個生產者、一個消費者

這就是前面一章開頭說的"通過平衡生產者和消費者的處理能力來提高整體處理數據的速度",這給例子應該體現得很明顯。另外,也不要擔心非單一生產者/消費者場景下的系統假死問題,緩沖區空、緩沖區滿的場景BlockingQueue都是定義了不同的Condition,所以不會喚醒自己的同類。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM