第八章 ArrayBlockingQueue源碼解析


注意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock源碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock源碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結

1、對於ArrayBlockingQueue需要掌握以下幾點

  • 創建
  • 入隊(添加元素)
  • 出隊(刪除元素)

2、創建

  • public ArrayBlockingQueue(int capacity, boolean fair)
  • public ArrayBlockingQueue(int capacity)

使用方法:

  • Queue<String> abq = new ArrayBlockingQueue<String>(2);
  • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

通過使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平鎖模式與非公平鎖模式,對於這兩種模式,查看本文開頭的文章即可。

源代碼如下:

    private final E[] items;//底層數據結構
    private int takeIndex;//用來為下一個take/poll/remove的索引(出隊)
    private int putIndex;//用來為下一個put/offer/add的索引(入隊)
    private int count;//隊列中元素的個數

    /*
     * Concurrency control uses the classic two-condition algorithm found in any
     * textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;//
    /** Condition for waiting takes */
    private final Condition notEmpty;//等待出隊的條件
    /** Condition for waiting puts */
    private final Condition notFull;//等待入隊的條件
View Code
    /**
     * 創造一個隊列,指定隊列容量,指定模式
     * @param fair
     * true:先來的線程先操作
     * false:順序隨機
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];//初始化類變量數組items
        lock = new ReentrantLock(fair);//初始化類變量鎖lock
        notEmpty = lock.newCondition();//初始化類變量notEmpty Condition
        notFull = lock.newCondition();//初始化類變量notFull Condition
    }

    /**
     * 創造一個隊列,指定隊列容量,默認模式為非公平模式
     * @param capacity <1會拋異常
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
View Code

注意:

  • ArrayBlockingQueue的組成:一個對象數組+1把鎖ReentrantLock+2個條件Condition
  • 在查看源碼的過程中,也要模仿帶條件鎖的使用,這個雙條件鎖模式是很經典的模式

3、入隊

3.1、public boolean offer(E e)

原理:

  • 在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false

使用方法:

  • abq.offer("hello1");

源代碼:

    /**
     * 在隊尾插入一個元素,
     * 如果隊列沒滿,立即返回true;
     * 如果隊列滿了,立即返回false
     * 注意:該方法通常優於add(),因為add()失敗直接拋異常
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//數組滿了
                return false;
            else {//數組沒滿
                insert(e);//插入一個元素
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
View Code
    private void insert(E x) {
        items[putIndex] = x;//插入元素
        putIndex = inc(putIndex);//putIndex+1
        ++count;//元素數量+1
        /**
         * 喚醒一個線程
         * 如果有任意一個線程正在等待這個條件,那么選中其中的一個區喚醒。
         * 在從等待狀態被喚醒之前,被選中的線程必須重新獲得鎖
         */
        notEmpty.signal();
    }
View Code
    /**
     * i+1,數組下標+1
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }
View Code

代碼非常簡單,流程看注釋即可,只有一點注意點:

  • 在插入元素結束后,喚醒等待notEmpty條件(即獲取元素)的線程,可以發現這類似於生產者-消費者模式

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插入一個元素,
     * 如果數組已滿,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前線程被中斷
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒
        final ReentrantLock lock = this.lock;
        /*
         * lockInterruptibly():
         * 1、 在當前線程沒有被中斷的情況下獲取鎖。
         * 2、如果獲取成功,方法結束。
         * 3、如果鎖無法獲取,當前線程被阻塞,直到下面情況發生:
         * 1)當前線程(被喚醒后)成功獲取鎖
         * 2)當前線程被其他線程中斷
         * 
         * lock()
         * 獲取鎖,如果鎖無法獲取,當前線程被阻塞,直到鎖可以獲取並獲取成功為止。
         */
        lock.lockInterruptibly();//加可中斷的鎖
        try {
            for (;;) {
                if (count != items.length) {//隊列未滿
                    insert(e);
                    return true;
                }
                if (nanos <= 0)//已超時
                    return false;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)循環
                     * 2、超時-->繼續當前這個for(;;)循環
                     * 3、被中斷-->之后直接執行catch部分的代碼
                     */
                    nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,線程也可能被喚醒)
                } catch (InterruptedException ie) {//在等待的過程中線程被中斷
                    notFull.signal(); // 喚醒其他未被中斷的線程
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }
View Code

注意:

  • awaitNanos(nanos)是AQS中的一個方法,這里就不詳細說了,有興趣的自己去查看AQS的源代碼。
  • lockInterruptibly()與lock()的區別見注釋

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在隊尾插入一個元素,如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷

使用方法:

        try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插入一個元素
     * 如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null)
            throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)//隊列滿了,一直阻塞在這里
                    /*
                     * 一直等待條件notFull,即被其他線程喚醒
                     * (喚醒其實就是,有線程將一個元素出隊了,然后調用notFull.signal()喚醒其他等待這個條件的線程,同時隊列也不慢了)
                     */
                    notFull.await();
            } catch (InterruptedException ie) {//如果被中斷
                notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的線程
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }
View Code

 

4、出隊

4.1、public E poll()

原理:

  • 如果沒有元素,直接返回null;如果有元素,將隊頭元素置null,但是要注意隊頭是隨時變化的,並非一直是items[0]。

使用方法:

abq.poll();

源代碼:

    /**
     * 出隊
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)//如果沒有元素,直接返回null,而非拋出異常
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
View Code
    /**
     * 出隊
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];//獲取出隊元素
        items[takeIndex] = null;//將出隊元素位置置空
        /*
         * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1
         * (注意:這里出隊之后,並沒有將后面的數組元素向前移)
         */
        takeIndex = inc(takeIndex);
        --count;//數組元素個數-1
        notFull.signal();//數組已經不滿了,喚醒其他等待notFull條件的線程
        return x;//返回出隊的元素
    }
View Code

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 從對頭刪除一個元素,如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 從對頭刪除一個元素,
     * 如果數組不空,出隊;
     * 如果數組已空,判斷時間是否超時,如果已經超時,返回null
     * 如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前線程被中斷
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);//將時間轉換為納秒
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != 0) {//數組不空
                    E x = extract();//出隊
                    return x;
                }
                if (nanos <= 0)//時間超時
                    return null;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)循環
                     * 2、超時-->繼續當前這個for(;;)循環
                     * 3、被中斷-->之后直接執行catch部分的代碼
                     */
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    }
View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 將隊頭元素出隊,如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷

使用方法:

        try {
            abq.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 將隊頭元素出隊
     * 如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)//如果數組為空,一直阻塞在這里
                    /*
                     * 一直等待條件notEmpty,即被其他線程喚醒
                     * (喚醒其實就是,有線程將一個元素入隊了,然后調用notEmpty.signal()喚醒其他等待這個條件的線程,同時隊列也不空了)
                     */
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
View Code

  

總結:

1、具體入隊與出隊的原理圖:這里只說一種情況,見下圖,途中深色部分表示已有元素,淺色部分沒有元素。

 

上面這種情況是怎么形成的呢?當隊列滿了,這時候,隊頭元素為items[0]出隊了,就形成上邊的這種情況。

假設現在又要出隊了,則現在的隊頭元素是items[1],出隊后就形成下面的情形。

 

出隊后,對頭元素就是items[2]了,假設現在有一個元素將要入隊,根據inc方法,我們可以得知,他要插入到items[0]去,入隊了形成下圖:

以上就是整個入隊出隊的流程,inc方法上邊已經給出,這里再貼一遍:

    /**
     * i+1,數組下標+1
     * 注意:這里這樣寫的原因。
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }
View Code

 

2、三種入隊對比:

  • offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
  • put(E e):如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:-->阻塞
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

 

3、三種出對對比:

  • poll():如果沒有元素,直接返回null;如果有元素,出隊
  • take():如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷-->阻塞
  • poll(long timeout, TimeUnit unit):如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM