一:概念
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
與ArrayBlockingQueue的異同:
ArrayBlockingQueue: 必須設置長度容量 底層數組結構 單鎖控制
LinkedBlockingQueue:默認Integer最大值 底層鏈表結構 雙鎖
二:LinkedBlockingQueue源碼實現
不設置容量,默認為Integer的最大值
也支持設置容量
也支持預先將集合設置入隊列
兩把鎖,一個take鎖,控制消費者並發,一個put鎖,控制生產者並發:
內部維護單向鏈表結構:
來看一下主要方法:offer與poll
offer方法:
如果e為null或者對列已滿,返回false, 然后加鎖,其他的生產者會被阻塞,再次判斷如果對列里面元素數量小於容量,那么入隊,對列的數量也自加,
如果這時對列仍然有空間,會喚醒正在等待的其他生產者,向對列里面放數據。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
入隊方法:
如果是第一次放入數據,效果圖:
主要是建立兩個連接,讓最后一個元素last指向新來的元素,然后將last指針指向新來的。
再來看一下poll方法:取數據
如果對列為空,返回null ,然后加鎖,其他想取數據的消費者線程會被阻塞, 如果沒有數據釋放鎖,返回null,對列有數據,則出隊,對列自減,
如果出隊后對列中還有數據,那么會喚醒正在等待的其他消費者線程來取數據。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
出隊方法:
返回first的item元素,這個鏈表的頭結點維護的都是空節點,效果圖如下:
出隊前:
出隊后:
add 和remove:
add方法: 直接使用父類AbstractQueue的方法:
在offer的基礎上進行了保證,成功返回true,false的時候返回異常。
remove方法:
兩把鎖同時上鎖,兩把鎖同時解鎖:
來看一下刪除元素的動作:因為數據結構是鏈表,所以只需要把指向該節點的上一個節點的next變量不指向該節點即可,然后
gc的時候就會把該節點回收掉:
trial.next = p.next 的作用就是讓p節點的前一個元素直接指向p的后一個元素,而數組結構就是把該下標置為null object[takeIndex] == null
put和take方法:
put方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take方法:
take方法的判斷邏輯與poll基本相同,唯一區別是,如果對列沒有元素,take為阻塞消費者線程,而poll會返回false。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }