一:概念
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
與ArrayBlockingQueue的異同:
ArrayBlockingQueue: 必須設置長度容量 底層數組結構 單鎖控制
LinkedBlockingQueue:默認Integer最大值 底層鏈表結構 雙鎖
二:LinkedBlockingQueue源碼實現
不設置容量,默認為Integer的最大值

也支持設置容量

也支持預先將集合設置入隊列

兩把鎖,一個take鎖,控制消費者並發,一個put鎖,控制生產者並發:


內部維護單向鏈表結構:

來看一下主要方法:offer與poll
offer方法:
如果e為null或者對列已滿,返回false, 然后加鎖,其他的生產者會被阻塞,再次判斷如果對列里面元素數量小於容量,那么入隊,對列的數量也自加,
如果這時對列仍然有空間,會喚醒正在等待的其他生產者,向對列里面放數據。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
入隊方法:

如果是第一次放入數據,效果圖:

主要是建立兩個連接,讓最后一個元素last指向新來的元素,然后將last指針指向新來的。
再來看一下poll方法:取數據
如果對列為空,返回null ,然后加鎖,其他想取數據的消費者線程會被阻塞, 如果沒有數據釋放鎖,返回null,對列有數據,則出隊,對列自減,
如果出隊后對列中還有數據,那么會喚醒正在等待的其他消費者線程來取數據。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
出隊方法:

返回first的item元素,這個鏈表的頭結點維護的都是空節點,效果圖如下:
出隊前:

出隊后:

add 和remove:
add方法: 直接使用父類AbstractQueue的方法:
在offer的基礎上進行了保證,成功返回true,false的時候返回異常。

remove方法:

兩把鎖同時上鎖,兩把鎖同時解鎖:

來看一下刪除元素的動作:因為數據結構是鏈表,所以只需要把指向該節點的上一個節點的next變量不指向該節點即可,然后
gc的時候就會把該節點回收掉:
trial.next = p.next 的作用就是讓p節點的前一個元素直接指向p的后一個元素,而數組結構就是把該下標置為null object[takeIndex] == null

put和take方法:
put方法:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
take方法:
take方法的判斷邏輯與poll基本相同,唯一區別是,如果對列沒有元素,take為阻塞消費者線程,而poll會返回false。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
