本文介紹LinkedBlockingQueue,這個隊列在線程池中常用到。(請結合源碼,看本文)
1. 介紹
LinkedBlockingQueue, 不支持null,基於單向鏈表的可選有界阻塞隊列。隊列的順序是FIFO。基於鏈表的隊列通常比基於數組的隊列有更高的吞吐量, 但在大多數的並發應用中具有更低的可預測性能較差(這句話,在最后解釋一下)
如果不選擇隊列的容量,默認值是Integer.MAX_VALUE,為了防止隊列的過度擴張.
還實現了Collection接口與Iterator接口中所有的可選方法。
1.1 結構
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
LinkedBlockingQueue類圖
LinkedBlockingQueue的構造器
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// head與last指向哨兵節點
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c)
...
1.2 保證線程安全
LinkedBlockingQueue的底層使用ReetrantLock保證線程安全,其實就是一個"消費-生產"模型,通過本文我們還可以學到ReetrantLock的實際使用場景。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
2. 源碼分析
在講LinkedBlockingQueue前,我們先看看需要它實現的接口BlockingQueue
2.1 BlockingQueue
實現了BlockingQueue的類,必須額外支持在查找元素時,等待隊列直到非空為止的操作;在儲存元素的時候,要等待隊列的空間可用為止。
它的方法有四種形式,處理不能立即滿足但是未來可能滿足的操作的方式各有不同。
- 直接拋出異常
- 返回一個特殊值(null 或者 false)
- 一直等待,直到操作成功
- 超時設定,超過時間就放棄
我們知道了不同方法,不能立即滿足的不同的處理方式,這樣我們下面就更好理解LinkedBlockingQueue的源碼了。
下面我們從
- offer(e)
- offer(e, time, unit)
- put(e)
- poll()
去分析一下LinkedBlockingQueue
2.2 offer(e)
添加成功就返回true; 插入值為null,報錯;或隊列已滿,直接返回false,不會等待隊列空閑
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 隊列的容量滿了,就直接返回了
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
代碼較簡單,就不細講了。
2.3 offer(e, time, unit)
若隊列已滿,還沒超過設定的時間,就等待,等待時,會對中斷作出反應;若超過了設定的時間,操作就跟offer(E e)一樣了
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 隊列已滿,超過了特定的時間才會返回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
2.4 put(e)
一直等待直到成功或者被中斷
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 防止虛假喚醒
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
我沒有想到,上面發生虛假喚醒的場景(如果知道的同學,請告訴我一下,謝謝了)。
it is recommended that applications programmers always assume that they can occur and so always wait in a loop. --Condition
反正使用Condition在循環里等待就對了
2.5 poll()
隊列為空時,直接返回null,不會await;非阻塞方法
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
其余的方法其實都是類似的,直接看上面BlockingQueue的四種方式。
最后再講一個方法remove(Object o), 將會提及一個知識點。
2.6 remove
刪除o, 若成功就返回true,反之.
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
// 算法題,刪除某一個鏈表的結點,可以看一下源碼,記錄兩個結點,一個在前,一個在后。
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
上面的代碼是簡單的對鏈表的操作,我們主要是看fullyLock() 與 fullyUnlock()的代碼
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
我們都知道解鎖順序應該與獲取鎖順序相反,那么是為什么啦
其實我並不覺得,上面的fullyUnlock解鎖順序與獲取鎖的順序如果是相同的會出什么問題,也並不會出現死鎖(如果釋放鎖與獲取鎖,中間還存在其他操作就另當別論了)。那它僅僅是為了代碼的好看?
假如,我們有下面這段代碼(解鎖與獲取鎖中間有其他操作)
A.lock();
B.lock();
Foo();
A.unlock();
Bar();
B.unlock();
假設Bar()是要去重新獲取A鎖的。
時刻一: 線程X運行到了Bar(),A鎖沒有被其他線程獲取,此時線程X持有B鎖,要去獲取A鎖
時刻二: 線程Y運行到代碼的最前面,A.lock(),獲取到了A鎖,此時線程Y持有A鎖,要去獲取B鎖,此時才會造成死鎖。
解鎖順序與獲取鎖順序相反,為的是
- 避免上面那種情況造成死鎖
- 為了美觀
3. 總結
LinkedBlockingQueue線程安全的隊列
- 使用putLock、takeLock分別對增加和刪除操作保證其線程安全性
- 是一個有界(默認值為Integer.MAX_VALUE)的底層基於單向鏈表的隊列
解釋一下,相對於基於數組的隊列,鏈表隊列的可預測性能較差(less predictable performance in most concurrent applications)這句話
我認為是,數組是在初始化會分配"一塊連續的內存",而鏈表是在隊列添加元素時動態的分配內存地址的,是不連續的;還有就是它將會處理更多的內存結構,每個元素存在一個鏈表的節點。
This means that it has to flush more dirty memory pages between processors when synchronizing.