ArrayBlockingQueue原理分析(一)


概述

ArrayBlockingQueue是一個阻塞隊列,其實底層就是一個數組,說到底層是數組,ArrayList底層也是數組,那它其實也可以作為隊列,但是是非阻塞的,那阻塞和非阻塞的區別是什么?區別在於當隊列中沒有元素的時候就阻塞等待,直到隊列中有數據再消費,而如果隊列滿了之后(隊列有界),生產者就要阻塞。下面就總結一下ArrayBlockingQueue的特性。

  • 是一個有界的隊列,初始化隊列的時候傳入隊列大小
  • 采用ReentrantLock + Condition實現線程安全和阻塞
  • 底層采用數組存儲
  • 生產者和消費者共用一把鎖,所以效率一般

總結來說就是效率一般,容量有限,那既然這么差還要搞一個這個對象,原因就是這個實現起來簡單。ArrayBlockingQueue的插入和刪除操作都比較簡單,但是里面有一個東西其實還挺復雜的,就是Itrs,迭代器,我打算寫兩篇博客,本篇介紹插入和刪除等一般的方法,下一篇介紹迭代器。

繼承結構圖

 這幅圖畫出了Java中常用隊列的繼承結構圖,可以看出所有的隊列都實現了AbstartQueue,這個抽象類實現了Queue接口,提供了Queue接口中方法的默認實現,繼承它可以少寫一些不必要的代碼,但是Queue接口中沒有提供大多數的阻塞方法,所以有了BlockingQueue接口,這個接口中提供很多的阻塞的插入刪除方法,而最底層的實現者都是阻塞隊列,所以都會實現這個接口。

屬性分析

    /** 隊列中元素保存的地方 */
    final Object[] items;

    /** items index for next take, poll, peek or remove,這個英文注釋很詳細 */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes,處理消費者線程的 */
    private final Condition notEmpty;

    /** Condition for waiting puts,處理生產者線程的 */
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
* 迭代器,由於一個隊列可以有多個迭代器,所以在該隊列中,迭代器通過鏈表連接起來,itrs屬性就是鏈表頭
* 通過這個頭,就可以找到所有的迭代器,下一篇文章會具體分析
*/
transient Itrs itrs = null;

構造方法

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //初始化數組,指定容量,不會擴容,固定
        this.items = new Object[capacity];
        //默認是非公平鎖,下面會解釋一下這個
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

關於這里的公平鎖,在網上看資料時,發現有的胖友寫的文章中說,這里如果是公平鎖,如果隊列滿了,生產者阻塞了非常多,那等待最久的生產者線程會先競爭到鎖,別的阻塞線程不能跟他競爭鎖,其實這么解釋的結論是對的,就是如果是公平鎖,那等最久的確實先競爭到鎖,但是原因並不是上面說的原因,即便是非公平鎖,如果沒有新的生產者加入競爭鎖,也是等最久先競爭到鎖,這里公平鎖的作用是說,當多個消費者線程消費多個元素之后,喚醒多個生產者,這時候如果有新的生產者加入,這些生產者需要等待剛剛喚醒的生產者執行完之后才可以競爭鎖。

Queue

public interface Queue<E> extends Collection<E> {
    //添加元素
    boolean add(E e);
    //添加元素
    boolean offer(E e);
    //刪除
    E remove();
    //消費元素,其實也是刪除
    E poll();
    //當前元素
    E element();
    //當前消費要消費的元素,不會移除隊列,只是獲取
    E peek();
}

 

BlockingQueue

//其實這個接口繼承於Queue,只不過多添加了幾個方法
public interface BlockingQueue<E> extends Queue<E> {
    //添加元素
    boolean add(E e);
    //添加元素
    boolean offer(E e);
    //添加元素
    void put(E e) throws InterruptedException;
    //添加元素
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    //消費元素
    E take() throws InterruptedException;
    //消費元素
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    int remainingCapacity();
    //刪除元素
    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    public boolean contains(Object o);

}

搞不懂有些方法在Queue中已經定義了,這里還要重復定義。

ArrayBlockedQueue

ArrayBlockedQueue實現了上面的隊列,下面就對其實現的方法做一個對比。

生產者方法

消費者方法

add(E e):直接調用offer,和offer方法返回不同,如果插入成功,返回true,否則拋出異常

 

offer(E e):向隊尾插入數據,如果隊列滿了,就返回插入失敗

 poll():從隊頭獲取元素,如果隊列為空,返回失敗    

offer(E e, long timeout, TimeUnit unit):向隊尾插入數據,

如果隊列滿了,阻塞等待一段時間,超時返回插入失敗  

poll(long timeout, TimeUnit unit):從隊頭獲取元素,如果隊列為空,等待一段時間,如果超時返回失敗

put(E e):向隊尾插入元素,如果隊列滿了就一直等待,直到隊列中有空閑空間

take():從隊頭獲取元素,如果隊列為空,阻塞等待,直到隊列中有元素再消費

 

remove(Object o):刪除隊列中的元素,可以是隊列中的任意元素

從上面的對比可知,中間三個方法生產者和消費者是相對的。

add(E e)

public boolean add(E e) {
        return super.add(e);
    }

//AbstractQueue的實現,直接調用offer方法,不過和offer不同的是,如果插入隊列失敗
//直接拋出異常
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

offer(E e)

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //先加鎖,防止多線程出問題
        lock.lock();
        try {
            //如果數組滿了,返回false
            if (count == items.length)
                return false;
            else {
                //下面分析這個方法
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

進入#enqueue(E e)

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //隊尾插入
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        //元素數量自增
        count++;
        //通知阻塞的消費者線程
        notEmpty.signal();
    }

這個方法實現很簡單,不過生產者的幾個方法基本都是調用這個方法。

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        //獲取傳入的超時時間,轉為納秒
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //獲取可中斷鎖
        lock.lockInterruptibly();
        try {
            //如果隊列滿了,進入循環
            while (count == items.length) {
               //下面那個返回負數,這里直接return一個false結束
                if (nanos <= 0)
                    return false;
                //阻塞固定的時間,如果超時之后會返回一個0或者負數
                nanos = notFull.awaitNanos(nanos);
            }
            //還是調用這個方法,就不分析了
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

這個方法和offer(E e)有兩點不同:

  • 使用的鎖是可中斷鎖,就是說如果在等待過程中,線程被中斷了會拋出一個異常
  • 使用了超時等待,如果超時才會返回false

put(E e)

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //可中斷鎖
        lock.lockInterruptibly();
        try {
           //隊列滿了
            while (count == items.length)
                //等待,不設置超時時間
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

ok,到此就把生產者的方法分析完了,其實都是調用的enqueue方法,很簡單。

用最帥的陳永仁做分割線。。。

      

poll(E e)

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果隊列為空返回null,否則之后后面的方法
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

進入dequeue()方法

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
       //從隊列中獲取
        E x = (E) items[takeIndex];
        //刪除這個元素
        items[takeIndex] = null;
        //判斷是不是到隊尾了,如果到隊尾了就從頭開始
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //如果該隊列存在迭代器,更新里面一些信息,后面會稍微說明一下
        if (itrs != null)
            itrs.elementDequeued();
        //通知生產者線程
        notFull.signal();
        return x;
    }

上面的過程都非常簡單,這里提一下itrs這部分代碼,舉個例子,假設通過如下代碼獲取了一個迭代器

ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(10);
Iterator<Integer> iterator = queue.iterator();

在迭代器初始化時候,其第一個要迭代的元素和消費者要消費的元素是同一個,在上面的代碼中如果消費者消費元素,把隊列中所有的元素消費完了,但是迭代器還沒有運行,這個時候就需要更新迭代器中的一些參數,不讓它迭代了,因為隊列已經為空了,這里只是提一下,下一篇文章會詳細介紹。

poll(long timeout, TimeUnit unit)

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

take(E e)

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

remove(Object o)

public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                   //遍歷隊列,尋找要刪除的元素
                    if (o.equals(items[i])) {
                        //執行刪除,之后移動數組中的元素,把刪除的元素的空位置給擠占掉
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

進入removeAt(i)

void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        //如果刪除的就是下一個要消費的元素
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            //移動元素
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                //下一篇文章分析
                itrs.removedAt(removeIndex);
        }
        //通知生產者線程
        notFull.signal();
    }

以上就是ArrayBlockingQueue常用方法,還有幾個比如contains,toString,toArray都很簡單,就不貼代碼了。

總結

ArrayBlockingQueue總的來說在上面列的結構圖中算是最簡單的一個,在概述中也說了,其實這里面復雜的是迭代,隊列其實很簡單,下一篇文章分析迭代過程。

                              


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM