Java並發編程——阻塞隊列BlockingQueue


Java 並發編程系列文章

Java 並發基礎——線程安全性

Java 並發編程——Callable+Future+FutureTask

java 並發編程——Thread 源碼重新學習

java並發編程——通過ReentrantLock,Condition實現銀行存取款

Java並發編程——BlockingQueue

Java 並發編程——Executor框架和線程池原理


 

簡介

BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效並且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。

 

        阻塞隊列是一個隊列,而且是一個先進先出的隊列(FIFO)。

       多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把准備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題

          但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)

如上圖所示:當隊列中填滿數據的情況下,生產者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。
     這也是我們在多線程環境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。

BlockingQueue接口介紹

放入數據:
  offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
    則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
  offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
    加入BlockingQueue,則返回失敗。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
    直到BlockingQueue里面有空間再繼續。
獲取數據:
  poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
    取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
    隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
BlockingQueue有新的數據被加入;
  drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
    通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。

这里写图片描述

運用實踐

public class Producer implements Runnable{
    private volatile boolean  isRunning  = true;
    private BlockingQueue queue;
    private static AtomicInteger count   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    private String produceName;

    public Producer(BlockingQueue queue,String name) {
        this.queue = queue;
        this.produceName = name;
    }

    public void run() {
        String data = null;
        Random r = new Random();

        System.out.println(produceName+"    啟動生產者線程!");
        try {
            while (isRunning) {
                System.out.println(produceName+"    正在生產數據...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

                data = "data:" + count.incrementAndGet();
                System.out.println(produceName+"    將數據:" + data + "放入隊列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println(produceName+"    放入數據失敗:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(produceName+"    退出生產者線程!");
        }
    }

    public void stop() {
        isRunning = false;
    }
}

Producer

 

public class Consumer implements Runnable {

    private BlockingQueue<String> queue;
    private static final int  DEFAULT_RANGE_FOR_SLEEP = 1000;
    private String cusumerName;

    public Consumer(BlockingQueue<String> queue,String name) {
        this.queue = queue;
        this.cusumerName = name;
    }

    public void run() {
        System.out.println(cusumerName+"  啟動消費者線程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println(cusumerName+"    正從隊列獲取數據...");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println(cusumerName+"    拿到數據:" + data);
                    System.out.println(cusumerName+"    正在消費數據:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超過2s還沒數據,認為所有生產線程都已經退出,自動退出消費線程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(cusumerName+"    退出消費者線程!");
        }
    }
}

Customer

 

public class BlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        Producer producer1 = new Producer(queue,"P1");
        Producer producer2 = new Producer(queue,"P2");
        Producer producer3 = new Producer(queue,"P2");
        Consumer consumer = new Consumer(queue,"C");

        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 啟動線程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        // 執行10s
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer1.stop();
        producer2.stop();
        producer3.stop();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 退出Executor
        service.shutdown();
    }
}

同時生產與消費

上述代碼的示意圖如下,開辟了三個線作為生產者,一個線程作為消費者。生產者負責往隊列中添加數據,消費者負責從隊列中消費數據(當隊列中沒有數據時則處於阻塞狀態)

-23_thumb

C  啟動消費者線程!
C    正從隊列獲取數據...
P1    啟動生產者線程!
P1    正在生產數據...
P2    啟動生產者線程!
P2    正在生產數據...
P2    啟動生產者線程!
P2    正在生產數據...
P2    將數據:data:1放入隊列...
P2  放入數據成功:data:1
P2    正在生產數據...
C    拿到數據:data:1
C    正在消費數據:data:1
C    正從隊列獲取數據...
P2    將數據:data:2放入隊列...
P2  放入數據成功:data:2
P2    正在生產數據...
C    拿到數據:data:2
C    正在消費數據:data:2
P1    將數據:data:3放入隊列...
P1  放入數據成功:data:3
P1    正在生產數據...
P2    將數據:data:4放入隊列...
P2  放入數據成功:data:4
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:3
C    正在消費數據:data:3
P2    將數據:data:5放入隊列...
P2  放入數據成功:data:5
P2    正在生產數據...
P1    將數據:data:6放入隊列...
P1  放入數據成功:data:6
P1    正在生產數據...
P2    將數據:data:7放入隊列...
P2  放入數據成功:data:7
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:4
C    正在消費數據:data:4
P2    將數據:data:8放入隊列...
P2  放入數據成功:data:8
P2    正在生產數據...
P1    將數據:data:9放入隊列...
P1  放入數據成功:data:9
P1    正在生產數據...
P2    將數據:data:10放入隊列...
P2  放入數據成功:data:10
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:5
C    正在消費數據:data:5
P1    將數據:data:11放入隊列...
P1  放入數據成功:data:11
P1    正在生產數據...
P2    將數據:data:12放入隊列...
P2  放入數據成功:data:12
P2    正在生產數據...
P2    將數據:data:13放入隊列...
P2  放入數據成功:data:13
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:6
C    正在消費數據:data:6
P2    將數據:data:14放入隊列...
P2  放入數據成功:data:14
P2    正在生產數據...
P2    將數據:data:15放入隊列...
P2  放入數據成功:data:15
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:7
C    正在消費數據:data:7
P1    將數據:data:16放入隊列...
P1  放入數據成功:data:16
P1    正在生產數據...
P2    將數據:data:17放入隊列...
P2  放入數據成功:data:17
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:8
C    正在消費數據:data:8
P2    將數據:data:18放入隊列...
P2  放入數據成功:data:18
P2    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:9
C    正在消費數據:data:9
P2    將數據:data:19放入隊列...
P2  放入數據成功:data:19
P2    正在生產數據...
P1    將數據:data:20放入隊列...
P2    將數據:data:21放入隊列...
P2    將數據:data:22放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:10
P1  放入數據成功:data:20
P1    正在生產數據...
C    正在消費數據:data:10
P1    將數據:data:23放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:11
C    正在消費數據:data:11
P2  放入數據成功:data:21
P2    正在生產數據...
P2    將數據:data:24放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:12
C    正在消費數據:data:12
P2  放入數據成功:data:22
P2    正在生產數據...
P2    將數據:data:25放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:13
C    正在消費數據:data:13
P1  放入數據成功:data:23
P1    正在生產數據...
C    正從隊列獲取數據...
C    拿到數據:data:14
P2  放入數據成功:data:24
P2    正在生產數據...
C    正在消費數據:data:14
P2    將數據:data:26放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:15
C    正在消費數據:data:15
P2  放入數據成功:data:25
P2    正在生產數據...
P1    將數據:data:27放入隊列...
C    正從隊列獲取數據...
P2  放入數據成功:data:26
P2    正在生產數據...
C    拿到數據:data:16
C    正在消費數據:data:16
C    正從隊列獲取數據...
C    拿到數據:data:17
C    正在消費數據:data:17
P1  放入數據成功:data:27
P1    正在生產數據...
P2    將數據:data:28放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:18
C    正在消費數據:data:18
P2  放入數據成功:data:28
P2    正在生產數據...
P2    將數據:data:29放入隊列...
P1    將數據:data:30放入隊列...
P2    將數據:data:31放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:19
C    正在消費數據:data:19
P2  放入數據成功:data:29
P2    正在生產數據...
P2    將數據:data:32放入隊列...
C    正從隊列獲取數據...
C    拿到數據:data:20
C    正在消費數據:data:20
P1  放入數據成功:data:30
P1    退出生產者線程!
C    正從隊列獲取數據...
C    拿到數據:data:21
C    正在消費數據:data:21
P2  放入數據成功:data:31
P2    退出生產者線程!
C    正從隊列獲取數據...
C    拿到數據:data:22
C    正在消費數據:data:22
P2  放入數據成功:data:32
P2    退出生產者線程!
C    正從隊列獲取數據...
C    拿到數據:data:23
C    正在消費數據:data:23
C    正從隊列獲取數據...
C    拿到數據:data:24
C    正在消費數據:data:24
C    正從隊列獲取數據...
C    拿到數據:data:25
C    正在消費數據:data:25
C    正從隊列獲取數據...
C    拿到數據:data:26
C    正在消費數據:data:26
C    正從隊列獲取數據...
C    拿到數據:data:27
C    正在消費數據:data:27
C    正從隊列獲取數據...
C    拿到數據:data:28
C    正在消費數據:data:28
C    正從隊列獲取數據...
C    拿到數據:data:29
C    正在消費數據:data:29
C    正從隊列獲取數據...
C    拿到數據:data:30
C    正在消費數據:data:30
C    正從隊列獲取數據...
C    拿到數據:data:31
C    正在消費數據:data:31
C    正從隊列獲取數據...
C    拿到數據:data:32
C    正在消費數據:data:32
C    正從隊列獲取數據...
C    退出消費者線程!
執行結果

 

     從上面的運用實踐中很容易理解阻塞隊列的好處,讓設計的隔離度更好,生產者只負責生產數據消費者只負責消費數據,而不用關心隊列中具體有多少數據,如果滿和空的特殊處理也不用關心。

      可以想象一下如果沒有阻塞隊列,自己定義一個數組存放元素,生產者和消費者需要做很多額外的控制工作,並對邊界條件做特殊處理。最重要的一點是生產者和消費者還要保證多線程操作數組數據的安全性同時兼顧效率,這應該是件很頭疼的事。

      這里可能有個疑惑, 3個Producer產生數據,當隊列已經滿時,其它Producer如何再往隊列里面生產數據?

      可以看到Producer中的代碼,通過 offer(data, 2, TimeUnit.SECONDS) 往隊列中添加數據,此時如果隊列已滿則阻塞等待直到Customer從隊列中取走一個數據,然后再將數據放入,這里等待的時間不等。隊列滿時,offer()函數從開始執行到結束可能需要經歷0~2000ms。從執行結果看,所有數據都成功的加入了隊列沒有出現超時的現象。

 

ArrayBlockingQueue源碼分析

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

可以看出ArrayBlockingQueue不光實現了BlockingQueue接口還繼承了抽象類AbstractQueue,說明可以對進行隊列的操作(可以參考java容器類4:Queue深入解讀)。建議先了解可重入鎖和條件變量的概念:

java並發編程——通過ReentrantLock,Condition實現銀行存取款

下面看一下里面的主要成員變量

 /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    // 主要解決多線程訪問的線程安全性問題
    final ReentrantLock lock;

    /** Condition for waiting takes */
// 添加元素時,通過notEmpty 喚醒消費線程(在等待該條件)
    private final Condition notEmpty;

    /** Condition for waiting puts */
  // 刪除元素時,通過 notFull 喚醒生成線程(在等待該條件)
    private final Condition notFull;

通過一個數組存放隊列元素,並且通過維護一個插入元素(putIndex)和移除元素(takeIndex)的位置來控制元素的添加和刪除。

看一下里面比較復雜的函數,大概能了解ArrayBlockingQueue的具體工作原理了:

 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        Objects.requireNonNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0L)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

該函數是往阻塞隊列中添加元素,如果超過設置的時間還沒有添加成功(可能隊列已滿,且沒有其它線程從中移除元素)則返回false。源碼中可以看出,當執行添加時,首先獲取阻塞隊列的鎖,如果隊列未滿則直接添加元素返回true即可。

當隊列已滿,則調用 notFull(Condition類型)的awaitNanos()方法,該方法或釋放可重入鎖,並且讓線程進入等待狀態,知道有其它線程將該線程喚醒。enqueue的源碼中會調用 notEmpty.signal()方法喚醒阻塞的移除元素的線程。同理,當某個線程調用take()/remove()/poll()時會調用 notFull.signal()喚醒一個被阻塞的添加元素的線程。

 

LinkedBlockingQueue

構造函數

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

成員變量

 private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

可以發現LinkedBlockingQueue和ArrayBlockingQueue成員變量還是有差別的

1.它內部是通過鏈表存儲的,而ArrayBlockingQueue是通過數組存儲的

2. 它設置了兩個可重入鎖,一個控制存,一個控制取。 (感覺這樣並發性更好)

3. 它的計數器count采用: AtomicInteger ,而ArrayBlockingQueue采用的int。 可能原因: 在LinkedBlockingQueue中兩端都可以同時進行存取操作(因為不是同一個鎖,這時可能需要同時改變計數器的值,所以要保證線程安全,所有用了AtomicInteger ),而在ArrayBlockingQueue中不可能存在多個線程操作count值的情況,所以直接使用了int。

-25_thumb

上圖中畫出了LinkedBlockingQueue的工作機制,通過takeLock,putLock兩把鎖分別控制取數據和存數據,兩者可以同時進。 下面可以看一下取數據的源碼,其實很簡單:

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
// 獲取取數據的鎖
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
//如果沒有數據,則進入掛起狀態,直到“存操作”喚醒該掛起裝填
            while (count.get() == 0) {
                notEmpty.await();
            }
  
// 將數據彈出隊列,並將計數器減一
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)    
// 如果有掛起的存線程,則將其喚醒
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
使用場景:
  DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

PriorityBlockingQueue

         基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。

SynchronousQueue

一種無緩沖的等待隊列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對於有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。
  聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。公平模式和非公平模式的區別:
  如果采用公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
  但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。

小結

         BlockingQueue不光實現了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待與喚醒功能,從而使得程序員可以忽略這些細節,關注更高級的功能。

參考:

https://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html

https://blog.csdn.net/xin_jmail/article/details/26157971


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM