學習BlockingQueue之LinkedBlockingQueue實現原理


 

一:概念

  LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

與ArrayBlockingQueue的異同:

ArrayBlockingQueue:   必須設置長度容量      底層數組結構           單鎖控制

LinkedBlockingQueue:默認Integer最大值       底層鏈表結構           雙鎖

 

二:LinkedBlockingQueue源碼實現

 

不設置容量,默認為Integer的最大值

 

也支持設置容量

 

 

 

也支持預先將集合設置入隊列

 

 

 

兩把鎖,一個take鎖,控制消費者並發,一個put鎖,控制生產者並發:

 

 

 

 內部維護單向鏈表結構:

 

 

 

來看一下主要方法:offer與poll

offer方法:

如果e為null或者對列已滿,返回false, 然后加鎖,其他的生產者會被阻塞,再次判斷如果對列里面元素數量小於容量,那么入隊,對列的數量也自加,

如果這時對列仍然有空間,會喚醒正在等待的其他生產者,向對列里面放數據。

 public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

  

入隊方法:

 

 

 

如果是第一次放入數據,效果圖:

 

 主要是建立兩個連接,讓最后一個元素last指向新來的元素,然后將last指針指向新來的。

 

再來看一下poll方法:取數據

如果對列為空,返回null ,然后加鎖,其他想取數據的消費者線程會被阻塞, 如果沒有數據釋放鎖,返回null,對列有數據,則出隊,對列自減,

如果出隊后對列中還有數據,那么會喚醒正在等待的其他消費者線程來取數據。

 public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

  

出隊方法:

 

 

返回first的item元素,這個鏈表的頭結點維護的都是空節點,效果圖如下:

出隊前:

 

出隊后:

 

 

 

add 和remove:

 

add方法: 直接使用父類AbstractQueue的方法:

在offer的基礎上進行了保證,成功返回true,false的時候返回異常。

 

 

remove方法:

 

 

兩把鎖同時上鎖,兩把鎖同時解鎖:

 

 

 

來看一下刪除元素的動作:因為數據結構是鏈表,所以只需要把指向該節點的上一個節點的next變量不指向該節點即可,然后

gc的時候就會把該節點回收掉:

trial.next = p.next 的作用就是讓p節點的前一個元素直接指向p的后一個元素,而數組結構就是把該下標置為null  object[takeIndex] == null

 

 

 

put和take方法:

put方法:

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

  

 take方法:

take方法的判斷邏輯與poll基本相同,唯一區別是,如果對列沒有元素,take為阻塞消費者線程,而poll會返回false。

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM