【JUC】JDK1.8源碼分析之ArrayBlockingQueue(三)


一、前言

  在完成Map下的並發集合后,現在來分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一個阻塞型隊列,支持多任務並發操作,有了之前看源碼的積累,再看ArrayBlockingQueue源碼會很容易,下面開始正文。

二、ArrayBlockingQueue數據結構

  通過源碼分析,並且可以對比ArrayList可知,ArrayBlockingQueue的底層數據結構是數組,數據結構如下

  說明:ArrayBlockingQueue底層采用數據才存放數據,對數組的訪問添加了鎖的機制,使其能夠支持多線程並發。

三、ArrayBlockingQueue源碼分析

  3.1 類的繼承關系  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {}

  說明:可以看到ArrayBlockingQueue繼承了AbstractQueue抽象類,AbstractQueue定義了對隊列的基本操作;同時實現了BlockingQueue接口,BlockingQueue表示阻塞型的隊列,其對隊列的操作可能會拋出異常;同時也實現了Searializable接口,表示可以被序列化。

  3.2 類的屬性  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 版本序列號
    private static final long serialVersionUID = -817911632652898426L;
    // 存放實際元素的數組
    final Object[] items;
    // 取元素索引
    int takeIndex;
    // 獲取元素索引
    int putIndex;
    // 隊列中的項
    int count;
    // 可重入鎖
    final ReentrantLock lock;
    // 等待獲取條件
    private final Condition notEmpty;
    // 等待存放條件
    private final Condition notFull;
    // 迭代器
    transient Itrs itrs = null;
}
View Code

  說明:從類的屬性中可以清楚的看到其底層的結構是Object類型的數組,取元素和存元素有不同的索引,有一個可重入鎖ReentrantLock,兩個條件Condition。對ReentrantLock和Condition不太熟悉的讀者可以參考筆者的這篇博客,【JUC】JDK1.8源碼分析之ReentrantLock(三)

  3.3 類的構造函數

  1. ArrayBlockingQueue(int)型構造函數 

    public ArrayBlockingQueue(int capacity) {
        // 調用兩個參數的構造函數
        this(capacity, false);
    }
View Code

  說明:該構造函數用於創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。

  2. ArrayBlockingQueue(int, boolean)型構造函數  

    public ArrayBlockingQueue(int capacity, boolean fair) {
        // 初始容量必須大於0
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化數組
        this.items = new Object[capacity];
        // 初始化可重入鎖
        lock = new ReentrantLock(fair);
        // 初始化等待條件
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
View Code

  說明:該構造函數用於創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。

  3. ArrayBlockingQueue(int, boolean, Collection<? extends E>)型構造函數 

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        // 調用兩個參數的構造函數
        this(capacity, fair);
        // 可重入鎖
        final ReentrantLock lock = this.lock;
        // 上鎖
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) { // 遍歷集合
                    // 檢查元素是否為空
                    checkNotNull(e);
                    // 存入ArrayBlockingQueue中
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) { // 當初始化容量小於傳入集合的大小時,會拋出異常
                throw new IllegalArgumentException();
            }
            // 元素數量
            count = i;
            // 初始化存元素的索引
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:該構造函數用於創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。

  3.4 核心函數分析

  1. put函數  

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        // 獲取可重入鎖
        final ReentrantLock lock = this.lock;
        // 如果當前線程未被中斷,則獲取鎖
        lock.lockInterruptibly();
        try {
            while (count == items.length) // 判斷元素是否已滿
                // 若滿,則等待
                notFull.await();
            // 入隊列
            enqueue(e);
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:put函數用於存放元素,在當前線程被中斷時會拋出異常,並且當隊列已經滿時,會阻塞一直等待。其中,put會調用enqueue函數,enqueue函數源碼如下  

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 獲取數組
        final Object[] items = this.items;
        // 將元素放入
        items[putIndex] = x;
        if (++putIndex == items.length) // 放入后存元素的索引等於數組長度(表示已滿)
            // 重置存索引為0
            putIndex = 0;
        // 元素數量加1
        count++;
        // 喚醒在notEmpty條件上等待的線程
        notEmpty.signal();
    }
View Code

  說明:enqueue函數用於將元素存入底層Object數組中,並且會喚醒等待notEmpty條件的線程。

  2. offer函數  

    public boolean offer(E e) {
        // 檢查元素不能為空
        checkNotNull(e);
        // 可重入鎖
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lock();
        try {
            if (count == items.length) // 元素個數等於數組長度,則返回
                return false; 
            else { // 添加進數組
                enqueue(e);
                return true;
            }
        } finally {
            // 釋放數組
            lock.unlock();
        }
    }
View Code

  說明:offer函數也用於存放元素,在調用ArrayBlockingQueue的add方法時,會間接的調用到offer函數,offer函數添加元素不會拋出異常,當底層Object數組已滿時,則返回false,否則,會調用enqueue函數,將元素存入底層Object數組。並喚醒等待notEmpty條件的線程。

  3. take函數  

    public E take() throws InterruptedException {
        // 可重入鎖
        final ReentrantLock lock = this.lock;
        // 如果當前線程未被中斷,則獲取鎖,中斷會拋出異常
        lock.lockInterruptibly();
        try {
            while (count == 0) // 元素數量為0,即Object數組為空
                // 則等待notEmpty條件
                notEmpty.await();
            // 出隊列
            return dequeue();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:take函數用於從ArrayBlockingQueue中獲取一個元素,其與put函數相對應,在當前線程被中斷時會拋出異常,並且當隊列為空時,會阻塞一直等待。其中,take會調用dequeue函數,dequeue函數源碼如下  

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 取元素
        E x = (E) items[takeIndex];
        // 該索引的值賦值為null
        items[takeIndex] = null;
        // 取值索引等於數組長度
        if (++takeIndex == items.length)
            // 重新賦值取值索引
            takeIndex = 0;
        // 元素個數減1
        count--;
        if (itrs != null) 
            itrs.elementDequeued();
        // 喚醒在notFull條件上等待的線程
        notFull.signal();
        return x;
    }
View Code

  說明:dequeue函數用於將取元素,並且會喚醒等待notFull條件的線程。

  4. poll函數  

    public E poll() {
        // 重入鎖
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lock();
        try {
            // 若元素個數為0則返回null,否則,調用dequeue,出隊列
            return (count == 0) ? null : dequeue();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:poll函數用於獲取元素,其與offer函數相對應,不會拋出異常,當元素個數為0是,返回null,否則,調用dequeue函數,並喚醒等待notFull條件的線程。並返回。

  5. clear函數  

    public void clear() {
        // 數組
        final Object[] items = this.items;
        // 可重入鎖
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lock();
        try {
            // 保存元素個數
            int k = count;
            if (k > 0) { // 元素個數大於0
                // 存數元素索引
                final int putIndex = this.putIndex;
                // 取元素索引
                int i = takeIndex;
                do {
                    // 賦值為null
                    items[i] = null;
                    if (++i == items.length) // 重新賦值i
                        i = 0;
                } while (i != putIndex);
                // 重新賦值取元素索引
                takeIndex = putIndex;
                // 元素個數為0
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull條件的線程,則逐一喚醒
                    notFull.signal();
            }
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:clear函數用於清空ArrayBlockingQueue,並且會釋放所有等待notFull條件的線程(存放元素的線程)。

四、示例

  下面給出一個具體的示例來演示ArrayBlockingQueue的使用  

package com.hust.grid.leesf.collections;

import java.util.concurrent.ArrayBlockingQueue;

class PutThread extends Thread {
    private ArrayBlockingQueue<Integer> abq;
    public PutThread(ArrayBlockingQueue<Integer> abq) {
        this.abq = abq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("put " + i);
                abq.put(i);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class GetThread extends Thread {
    private ArrayBlockingQueue<Integer> abq;
    public GetThread(ArrayBlockingQueue<Integer> abq) {
        this.abq = abq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("take " + abq.take());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10);
        PutThread p1 = new PutThread(abq);
        GetThread g1 = new GetThread(abq);
        
        p1.start();
        g1.start();
    }
}
View Code

  運行結果:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9
View Code

  說明:示例中使用了兩個線程,一個用於存元素,一個用於讀元素,存和讀各10次,每個線程存一個元素或者讀一個元素后都會休眠100ms,可以看到結果是交替打印,並且首先打印的肯定是put線程語句(因為若取線程先取元素,此時隊列並沒有元素,其會阻塞,等待存線程存入元素),並且最終程序可以正常結束。

  ① 若修改取元素線程,將存的元素的次數修改為15次(for循環的結束條件改為15即可),運行結果如下:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9
View Code

  說明:運行結果與上面的運行結果相同,但是,此時程序無法正常結束,因為take方法被阻塞了,等待被喚醒。

五、總結

  總的來說,有了前面分析的基礎,分析ArrayBlockingQueue就會非常的簡單,ArrayBlockingQueue是通過ReentrantLock和Condition條件來保證多線程的正確訪問的。ArrayBockingQueue的分析就到這里,歡迎交流,謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM