java多線程系列(九)---ArrayBlockingQueue源碼分析


java多線程系列(九)---ArrayBlockingQueue源碼分析

目錄

成員變量

  • 數組大小
    final Object[] items;
  • 下一個進隊元素的下標
    /** items index for next take, poll, peek or remove */
    int takeIndex;
  • 下一個出隊元素的下標
    /** items index for next put, offer, or add */
    int putIndex;
  • 隊列中元素的數目
    /** Number of elements in the queue */
    int count;
  • 出隊和入隊需要的鎖
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;
  • 出隊條件
    /** Condition for waiting takes */
    private final Condition notEmpty;

  • 入隊條件

    /** Condition for waiting puts */
    private final Condition notFull;

構造方法

  • 配置容量,先創建是否公平公平訪問
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  • 從源碼可以看到,創建一個object數組,然后創建一個公平或非公平鎖,然后創建出隊條件和入隊條件

offer方法

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
  • 首先檢查是否為null
  • 然后lock鎖住
  • 如果當前數目count已經為初始的時候容量,這時候自己返回false
  • 否則的話執行enqueue方法
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
  • 將新的元素添加到數組的下一個進隊的位置
  • 然后notEmpty出隊條件喚醒,這個時候可以進行出隊
  • 執行enqueue后然后釋放鎖

指定時間的offer方法

 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
  • 方法大致和前面的一致,不同的時候是當隊列滿的時候,會等待一段時間,此時入隊條件等待一段時間,一段時間后繼續進入循環進行判斷隊列還滿
  • 當隊列不滿的時候執行enqueue

add方法

  • 調用父類的add方法
public boolean add(E e) {
        return super.add(e);
    }
  • 父類AbstractQueue的add方法
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
  • 執行offer方法,這個時候可以對比上面直接調用offer,offer方法如果入隊失敗會直接返回false,而add方法會拋出異常

put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
  • 和限定時間的offer方法不同,當隊列滿的時候,會一直等待,直到有人喚醒

poll方法

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 首先執行lock方法鎖定
  • 如果當前隊中無元素,那么返回null,否則執行dequeue方法
 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
  • 根據出隊下標取出元素,然后將該位置置為null
  • 將出隊下標加一,如果出隊下標等於了數組的大小,出隊下標置為0
  • 隊中元素數量減一
  • notFull喚醒,此時可以喚醒入隊阻塞的線程

指定時間的poll方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 與offer的指定時間和沒有指定時間類似,poll指定時間的方法和沒有指定時間的poll思路大致是一樣的
  • 當此時隊列為空的,為等待一段時間,然后自動喚醒,繼續進入循環,直到隊列中有元素,然后執行dequeue方法

take方法

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 和前面指定時間的poll方法不同,當隊中為空的時候,會一直等待,直到被喚醒

總結

  • 入隊
方法 特點
offer(E e) 隊列滿的時候,返回false
offer(E e, long timeout, TimeUnit unit) 隊列滿的時候,等待一段時間,釋放鎖,一段時間后,進入就緒狀態
add(E e) 隊列滿的時候,直接拋出異常
put(E e) 隊列慢的時候,線程阻塞,直到被喚醒
  • 出隊
方法 特點
poll() 隊列為空的時候,直接返回null
poll(long timeout, TimeUnit unit) 隊列為空的時候,等待一段時間,釋放鎖,一段時候后,進入就緒狀態
take() 隊列為空的時候,一直等待,釋放鎖,直到被喚醒
  • 總體的設計思路,通過一個數組來模擬一個數組,出隊和入隊都是同步的,也就是同一時間只能有一個入隊或者出隊操作,然后在入隊的時候,如果隊列已滿的話,根據方法的不同有不同的策略,可以直接返回或者拋出異常,也可以阻塞一段時間,等會在嘗試入隊,或者直接阻塞,直到有人喚醒。而出隊的時候,如果為空可以直接返回,也可以等待一段時間然后再次嘗試,也可以阻塞,直到有人喚醒

我覺得分享是一種精神,分享是我的樂趣所在,不是說我覺得我講得一定是對的,我講得可能很多是不對的,但是我希望我講的東西是我人生的體驗和思考,是給很多人反思,也許給你一秒鍾、半秒鍾,哪怕說一句話有點道理,引發自己內心的感觸,這就是我最大的價值。(這是我喜歡的一句話,也是我寫博客的初衷)

作者:jiajun 出處: http://www.cnblogs.com/-new/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】,希望能夠持續的為大家帶來好的技術文章!想跟我一起進步么?那就【關注】我吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM