線程池阻塞隊列之ArrayBlockingQueue


ArrayBlockingQueue介紹

ArrayBlockingQueue是數組實現線程安全有界的阻塞隊列。

線程安全是指,ArrayBlockingQueue內部通過“互斥鎖”保護競爭資源,實現了多線程對競爭資源的互斥訪問。

有界是指,ArrayBlockingQueue對應的數組是有界限的。

阻塞隊列是指,多線程訪問競爭資源時,當競爭資源已被某線程獲取時,其它要獲取該資源的線程需要阻塞等待。

ArrayBlockingQueue是按 FIFO原則對元素進行排序,元素都是從尾部插入到隊列,從頭部開始返回。

ArrayBlockingQueue原理和數據結構

  1. ArrayBlockingQueue繼承於AbstractQueue,並且它實現了BlockingQueue接口。

  2. ArrayBlockingQueue內部是通過Object[]數組保存數據的,也就是說ArrayBlockingQueue本質上是通過數組實現的。ArrayBlockingQueue的大小即數組的容量,是創建ArrayBlockingQueue時指定的。

  3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關於具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue默認會使用非公平鎖。

  4. ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象(notEmpty和notFull)。而且,Condition又依賴於ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問

 

若某線程(線程A)要取數據時,數組正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向數組中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。

若某線程(線程H)要插入數據時,數組已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。

ArrayBlockingQueue函數列表

// 創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
​
// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則拋出 IllegalStateException。
boolean add(E e)
// 自動移除此隊列中的所有元素。
void clear()
// 如果此隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在此隊列中的元素上按適當順序進行迭代的迭代器。
Iterator<E> iterator()
// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false。
boolean offer(E e)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。
E peek()
// 獲取並移除此隊列的頭,如果此隊列為空,則返回 null。
E poll()
// 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則等待可用的空間。
void put(E e)
// 返回在無阻塞的理想情況下(不存在內存或資源約束)此隊列能接受的其他元素數量。
int remainingCapacity()
// 從此隊列中移除指定元素的單個實例(如果存在)。
boolean remove(Object o)
// 返回此隊列中元素的數量。
int size()
// 獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。
E take()
// 返回一個按適當順序包含此隊列中所有元素的數組。
Object[] toArray()
// 返回一個按適當順序包含此隊列中所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

 

ArrayBlockingQueue源碼分析

下面從ArrayBlockingQueue的創建,添加,取出,遍歷這幾個方面對ArrayBlockingQueue進行分析。

1. 創建

下面以ArrayBlockingQueue(int capacity, boolean fair)來進行說明。

說明:

(01) items是保存“阻塞隊列”數據的數組。它的定義如下:

(02) fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。 notEmpty和notFull是鎖的兩個Condition條件。它們的定義如下:

Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細的對鎖進行控制,它依賴於Lock,通過某個條件對多線程進行控制。 notEmpty表示“鎖的非空條件”。當某線程想從隊列中取數據時,而此時又沒有數據,則該線程通過notEmpty.await()進行等待;當其它線程向隊列中插入了元素之后,就調用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態的線程”。 同理,notFull表示“鎖的滿條件”。當某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當其它線程從隊列中取出元素之后,就喚醒該等待的線程。

2. 添加

下面以offer(E e)為例,對ArrayBlockingQueue的添加方法進行說明。

說明:offer(E e)的作用是將e插入阻塞隊列的尾部。如果隊列已滿,則返回false,表示插入失敗;否則,插入元素,並返回true。

(01) count表示”隊列中的元素個數“。除此之外,隊列中還有另外兩個遍歷takeIndex和putIndex。takeIndex表示下一個被取出元素的索引,putIndex表示下一個被添加元素的索引。它們的定義如下:

(02) insert()的源碼如下:

insert()在插入元素之后,會喚醒notEmpty上面的等待線程。inc()的源碼如下:

若i+1的值等於“隊列的長度”,即添加元素之后,隊列滿;則設置“下一個被添加元素的索引”為0。

3. 取出

下面以take()為例,對ArrayBlockingQueue的取出方法進行說明。

說明:take()的作用是取出並返回隊列的頭。若隊列為空,則一直等待。 extract()的源碼如下:

說明:extract()在刪除元素之后,會喚醒notFull上的等待線程。

4. 遍歷

下面對ArrayBlockingQueue的遍歷方法進行說明。

Itr是實現了Iterator接口的類,它的源碼如下:

private class Itr implements Iterator<E> {
    // 隊列中剩余元素的個數
    private int remaining; // Number of elements yet to be returned
    // 下一次調用next()返回的元素的索引
    private int nextIndex; // Index of element to be returned by next
    // 下一次調用next()返回的元素
    private E nextItem;    // Element to be returned by next call to next
    // 上一次調用next()返回的元素
    private E lastItem;    // Element returned by last call to next
    // 上一次調用next()返回的元素的索引
    private int lastRet;   // Index of last element returned, or -1 if none
​
    Itr() {
        // 獲取“阻塞隊列”的鎖
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            lastRet = -1;
            if ((remaining = count) > 0)
                nextItem = itemAt(nextIndex = takeIndex);
        } finally {
            // 釋放“鎖”
            lock.unlock();
        }
    }
​
    public boolean hasNext() {
        return remaining > 0;
    }
​
    public E next() {
        // 獲取“阻塞隊列”的鎖
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            // 若“剩余元素<=0”,則拋出異常。
            if (remaining <= 0)
                throw new NoSuchElementException();
            lastRet = nextIndex;
            // 獲取第nextIndex位置的元素
            E x = itemAt(nextIndex);  // check for fresher value
            if (x == null) {
                x = nextItem;         // we are forced to report old value
                lastItem = null;      // but ensure remove fails
            }
            else
                lastItem = x;
            while (--remaining > 0 && // skip over nulls
                   (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
                ;
            return x;
        } finally {
            lock.unlock();
        }
    }
​
    public void remove() {
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            int i = lastRet;
            if (i == -1)
                throw new IllegalStateException();
            lastRet = -1;
            E x = lastItem;
            lastItem = null;
            // only remove if item still at index
            if (x != null && x == items[i]) {
                boolean removingHead = (i == takeIndex);
                removeAt(i);
                if (!removingHead)
                    nextIndex = dec(nextIndex);
            }
        } finally {
            lock.unlock();
        }
    }
}

 

ArrayBlockingQueue示例

import java.util.*;
import java.util.concurrent.*;
/*
 *   ArrayBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 *
 *   下面是“多個線程同時操作並且遍歷queue”的示例
 *   (01) 當queue是ArrayBlockingQueue對象時,程序能正常運行。
 *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
 */
public class ArrayBlockingQueueDemo1{
    // TODO: queue是LinkedList對象時,程序會出錯。
    //private static Queue<String> queue = new LinkedList<String>();
    private static Queue<String> queue = new ArrayBlockingQueue<String>(20);
    public static void main(String[] args) {
    
        // 同時啟動兩個線程對queue進行操作!
        new MyThread("ta").start();
        new MyThread("tb").start();
    }
​
    private static void printAll() {
        String value;
        Iterator iter = queue.iterator();
        while(iter.hasNext()) {
            value = (String)iter.next();
            System.out.print(value+", ");
        }
        System.out.println();
    }
​
    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }
        @Override
        public void run() {
                int i = 0;
            while (i++ < 6) {
                // “線程名” + "-" + "序號"
                String val = Thread.currentThread().getName()+i;
                queue.add(val);
                // 通過“Iterator”遍歷queue。
                printAll();
            }
        }
    }
}

 

其中一次運行結果:

ta1, ta1, 
tb1, ta1,
tb1, ta1, ta2,
tb1, ta1, ta2, tb1, tb2,
ta2, ta1, tb2, tb1, ta3,
ta2, ta1, tb2, tb1, ta3, ta2, tb3,
tb2, ta1, ta3, tb1, tb3, ta2, ta4,
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4,
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5,
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5,
tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, tb5, ta3, ta6,
tb3, ta4, tb4, ta5, tb5, ta6, tb6,

結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。

LinkedBlockingQueue VS ArrayBlockingQueue 

1. LinkedBlockingQueue 由於擁有兩把鎖,它的操作粒度更細,在並發程度高的時候,相對於只有一把鎖的 ArrayBlockingQueue 性能會更好。

2. ArrayBlockingQueue的內部結構是“數組”的形式,LinkedBlockingQueue 的內部是用鏈表實現的,ArrayBlockingQueue 沒有鏈表所需要的“節點”,空間利用率更高。

 

參考:https://github.com/wangzhiwubigdata


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM