Java 並發編程 --- LinkedBlockingQueue與ArrayBlockingQueue (七)


阻塞隊列與普通的隊列(LinkedList/ArrayList)相比,支持在向隊列中添加元素時,隊列的長度已滿阻塞當前添加線程,直到隊列未滿或者等待超時;從隊列中獲取元素時,隊列中元素為空 ,會將獲取元素的線程阻塞,直到隊列中存在元素 或者等待超時。

在JUC包中常用的阻塞隊列包含ArrayBlockingQueue/LinkedBlockingQueue/LinkedBlockingDeque等,從結構來看都繼承了AbstractQueue實現了BlockingQueue接口(LinkedBlockingDeque是雙向阻塞隊列,實現的是BlockingDeque接口),在BlockingQueue接口中定義了幾個供子類實現的接口,可以分為3部分,puts操作、takes操作、其他操作。

puts操作
add(E e) : 添加成功返回true,失敗拋IllegalStateException異常
offer(E e) : 成功返回 true,如果此隊列已滿,則返回 false(如果添加了時間參數,且隊列已滿也會阻塞)
put(E e) :將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞

takes操作
remove(Object o) :移除指定元素,成功返回true,失敗返回false
poll() : 獲取並移除此隊列的頭元素,若隊列為空,則返回 null(如果添加了時間參數,且隊列中沒有數據也會阻塞)
take():獲取並移除此隊列頭元素,若沒有元素則一直阻塞。
peek() :獲取但不移除此隊列的頭;若隊列為空,則返回 null。

other操作
contains(Object o):隊列中是否包含指定元素
drainTo(Collection<? super E> c):隊列轉化為集合

關於阻塞隊列,我們主要看LinkedBlockingQueue與ArrayBlockingQueue.

ArrayBlockingQueue

ArrayBlockingQueue是基於數組的、有界的、遵循FIFO原則的阻塞隊列,隊列初始化時必須指定隊列的長度。
這是一個經典的“有界緩沖區”,其中固定大小的數組包含由生產者插入並由消費者提取的元素。創建后,無法更改容量。此類支持用於排序等待生產者和消費者線程的可選公平策略。默認情況下,不保證此順序。但是,將fairness設置為true構造的隊列以FIFO順序授予線程訪問權限。公平性通常會降低吞吐量,但會降低可變性並避免飢餓。

結構

相關變量

final Object[] items; //一個數組,用來存放隊列中的變量(隊列的基礎)
int count; //隊列中元素的數量
int takeIndex; //下一次take、poll、remove、peek操作的下標值
int putIndex; //下次add、offer、put操作的下標值

構造函數

ArrayBlockingQueue提供了三個構造函數,在只傳遞初始化大小值時,默認使用的鎖是非公平鎖,對比三個不同的構造函數而言,真正初始化隊列的構造方法是ArrayBlockingQueue(int capacity, boolean fair)方法,傳入集合的構造方法會在調用該方法后將集合遍歷存入隊列中

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    //使用同一個鎖對象,此處是與LinkedBlockingQueue(使用兩個不同的鎖來控制添加,取出操作)不同的地方
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                //向數組中添加數據
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        //設置下次添加操作對應的數組下標值
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

offer/add操作

add本質上調用的是offer操作,通過返回值true/false可以判斷隊列中添加元素是否成功,隊列已滿返回false

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
     //隊列已滿,直接返回false
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

//入隊列操作
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //喚醒take/poll(有時間參數)方法獲取數據的線程
    notEmpty.signal();
}

put操作

沒有返回值,隊列已滿則等待,知道被喚醒

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            //隊列已滿線程掛起等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

poll操作

隊列為空返回null,對於有時間參數的poll操作,在隊列為空時,會被掛起等待

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    //歸零操作
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //喚醒puts線程
    notFull.signal();
    return x;
}

take操作

隊列為空等待,直到隊列中存在元素被喚醒

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

peek操作

獲取隊列中第一個不為空的元素(每次takes操作,或者puts操作都會設置下次takes/puts操作的下標)

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //通過下標值獲取元素
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

remove操作

刪除內部元素操作是一種本質上緩慢且具有破壞性的操作,需要將刪除元素后的元素統一遷移一個單位,並且在操作過程中會獲得鎖,對性能有影響,因此不應輕易執行remove操作

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            //遍歷隊列
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    //如果需要移除的元素下標值為下一次取數的下標值,執行類似取數的操作
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
     //如果takes下標到達隊列最大長度,歸零
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        //如果不相等,將需刪除元素的后續元素統一遷移一位
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
          //移動完成,設置puts操作下標
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    //喚醒put操作等待的線程
    notFull.signal();
}

綜上,ArrayBlockingQueue隊列的邏輯並不復雜,但需要注意一下幾點

1.ArrayBlockingQueue是以數組來實現隊列功能的,在執行puts或者takes操作時一旦下一個操作的下標值大於隊列的長度,類中用來記錄存取下標值會歸零,已達到循環使用隊列的目的

2.ArrayBlockingQueue是通過一個鎖控制takes以及puts操作,說明在同一時間內只能執行takes操作或者puts操作中的一種,對於阻塞隊列來說,保證了線程安全,
但是會影響隊列的消費和生產效率,並發性會下降
3.ArrayBlockingQueue在執行remove操作時,會將整個數組進行移動(最壞情況下),同時還會獲得鎖,對性能的影響比較大

LinkedBlockingQueue

LinkedBlockingQueue是基於鏈表的、有界的、遵循FIFO原則的阻塞隊列,隊列默認的最大長度為Integer.MAX_VALUE

結構

重要屬性

private final int capacity;//隊列的大小,可以自定義,默認為Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger(); //當前隊列中元素的數量
//take、poll操作需要持有的鎖,LinkedBlockingQueue支持並發操作,對於從隊列中獲取數據需要加鎖(會阻塞,ConcurrentLinkedQueue/ConcurrentLinkedDeque
是使用CAS操作來控制的,不會出現阻塞的問題)
private final ReentrantLock takeLock = new ReentrantLock();
//put、offer操作需要持有的鎖,同上 private final ReentrantLock putLock = new ReentrantLock();
//notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 private final Condition notEmpty = takeLock.newCondition();
//notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 private final Condition notFull = putLock.newCondition();

Node類

相對於其他(ConcurrentLinkedQueue/ConcurrentLinkedDeque)類來說,LinkedBlockingQueue的Node類要簡單好多,由於是基於單鏈表實現的,只有一個next屬性(保存后繼節點),一個item屬性(存放值),一個構造函數

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

構造函數

LinkedBlockingQueue提供了三個構造函數,在不傳參數的情況下,默認隊列的大小為Integer.MAX_VALUE,對比三個不同的構造函數而言,真正初始化 隊列的構造方法是LinkedBlockingQueue(int capacity)方法,傳入集合的構造方法會在調用該方法后將集合遍歷存入隊列中

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    //設置隊列大小,new一個null節點,head、tail節點指向改節點
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //獲取put、offer操作需要的鎖,可重入
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //將隊列的last節點指向該節點
            enqueue(new Node<E>(e));
            ++n;
        }
        //隊列元素計數
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

offer操作

通過返回值true/false判斷是否成功,offer操作當隊列滿后並不會阻塞(有時間參數的offer操作也會阻塞),而是直接返回false,put操作是沒有返回值的,並且會一直阻塞,等待被喚醒(或者超過時間拋出異常)

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //是否超過最大值
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //獲取鎖
    putLock.lock();
    try {
        //再次判斷隊列是否存放滿(可能存在多線程的情況)
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //如果隊列沒有放滿,喚醒下一個添加線程
            if (c + 1 < capacity)
                //其實這個地方,喚醒的添加線程是執行put方法(或者offer有時間參數的操作)時被阻塞的線程,如果僅僅只是執行offer操作應該不會執行任何操作,
                  沒有對應的添加線程添加到條件隊列中(個人理解,也是不太理解的地方)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //當c=0時,表明當前隊列中存在一個元素,通知消費線程去消費
        signalNotEmpty();
    return c >= 0;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //喚醒條件等待隊列中的消費者去消費數據
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

put操作

 put方法不會像offer方法那樣去檢查隊列大小是否超過設定值,put操作一個元素入隊列時,如果隊列已滿,當前線程會進入nofull的條件等待隊列中等待,直到隊列中元素個數小於隊列大小時被喚醒,才繼續put操作

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //隊列已滿存放線程阻塞,等待被喚醒
        while (count.get() == capacity) {
            notFull.await();
        }
        //入隊列
        enqueue(node);
        c = count.getAndIncrement();
        //隊列沒滿,喚醒notfull等待隊列中的添加線程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //當c=0時,表明當前隊列中存在一個元素,通知消費線程去消費
        signalNotEmpty();
}

poll操作

必定會有返回值(異常除外),但包含null(隊列中沒有數據),與take操作比較可以發現,take操作在隊列中沒有數據時執行take操作的線程會被掛起,直到隊列中有數據(有時間參數的poll操作也會被掛起,等待喚醒或者超時)

public E poll() {
    final AtomicInteger count = this.count;
    //如果隊列沒有數據,返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                //喚醒其他消費線程
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//出隊列操作,因為隊列的head節點為null節點,在出隊列時,會始終保持head節點為空,next節點為真正意義上的首節點
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    //自指向,該節點已經無用,便於GC
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

take操作

take操作不會向poll操作去檢查隊列中有沒有數據,隊列中沒有數據時會被掛起,等待被喚醒

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //隊列中是否有數據,沒有等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            //如果隊列中有數據存在,喚醒notempty等待隊列中的消費線程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //喚醒添加線程
        signalNotFull();
    return x;
}

peek操作

獲取隊列中頭部元素,可能存在其他線程執行的刪除、take(poll)操作,所以要加鎖,獲取數據不一定准確

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //head節點不存放數據,所以取的是next節點
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

remove操作

在執行刪除操作時,會將puts以及takes操作都上鎖,保證線程安全,然后執行遍歷刪除操作,在刪除后,會去喚醒等待中的添加線程執行 添加操作

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        //遍歷單項鏈表
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                //移除數據
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    //喚醒添加線程
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

綜上,LinkedBlockingQueue的api與ArrayBlockingQueue並無太大差別,在實現思想上,LinkedBlockingQueue使用了鎖分離以及鏈表,其他與ArraBlockingQueue(一個鎖統一管理、數組)沒太大區別

LinkedBlockingQueue與ArrayBlockingQueue異同

1.LinkedBlockingQueue是基於鏈表實現的初始化是可以不用指定隊列大小(默認是Integer.MAX_VALUE);ArrayBlockingQueue是基於數組實現的初始化時必須指定隊列大小

2.LinkedBlockingQueue在puts操作都會生成新的Node對象,takes操作Node對象在某一時間會被GC,可能會影響GC性能;ArrayBlockingQueue是固定的數組長度循環使用,
不會出現對象的產生與回收
3.LinkedBlockingQueue是基於鏈表的形式,在執行remove操作時,不用移動其他數據;ArrayBlockingQueue是基於鏈表,在remove時需要移動數據,影響性能 4.LinkedBlockingQueue使用兩個鎖將puts操作與takes操作分開;ArrayBlockingQueue使用一個鎖來控制,在高並發高吞吐的情況下,LinkedBlockingQueue的性能較好

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM