轉自:https://mp.weixin.qq.com/s?__biz=MzI4Njc5NjM1NQ==&mid=2247487078&idx=2&sn=315f39b6d53862dcb732390729951628&chksm=ebd6314adca1b85c33db1134fbe98bf7526943b02dfc23021781abff265bb231ad9bcd3e1ad1&mpshare=1&scene=23&srcid=0113Gq7QC98Q8Q59Ez6SkUdC#rd
鏈接:https://www.omgleoo.top
BlockingQueues在java.util.concurrent包下,提供了線程安全的隊列訪問方式,當阻塞隊列插入數據時,如果隊列已經滿了,線程則會阻塞,等待隊列中元素被取出后再插入,當從阻塞隊列中取數據時,如果隊列是空的,則線程會阻塞,等待隊列中有新元素。
BlockingQueue的核心方法
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
public interface BlockingQueue<E> extends Queue<E> {
// 添加成功返回true,失敗拋IllegalStateException異常
boolean add(E e);
// 成功返回 true,如果此隊列已滿,則返回 false。
boolean offer(E e);
// 將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞
void put(E e) throws InterruptedException;
//將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,
//則在到達指定的等待時間之前等待可用的空間,該方法可中斷
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 獲取並移除此隊列頭元素,若沒有元素則一直阻塞。
E take() throws InterruptedException;
// 獲取並移除此隊列的頭元素,若隊列為空,則返回 null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
// 移除指定元素,成功返回true,失敗返回false
boolean remove(Object o);
public boolean contains(Object o);
// 從該隊列中移除所有可用元素,並將它們添加到給定集合中
int drainTo(Collection<? super E> c);
// 從該隊列中移除最多給定數量的可用元素,並將它們添加到給定集合中。
int drainTo(Collection<? super E> c, int maxElements);
}
Java並發包中的阻塞隊列一共7個:
-
ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
-
LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
-
PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
-
DealyQueue:一個使用優先級隊列實現的無界阻塞隊列。
-
SynchronousQueue:一個不存儲元素的阻塞隊列。
-
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
-
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
主要關注兩個 ArrayBlockingQueue 和 LinkedBlockingQueue
ArrayBlockingQueu
主要參數:
/** 存儲數據的數組 */
final Object[] items;
/**獲取數據的索引,主要用於take,poll,peek,remove方法 */
int takeIndex;
/**添加數據的索引,主要用於 put, offer, or add 方法*/
int putIndex;
/** 隊列元素的個數 */
int count;
/** 控制並非訪問的鎖 */
final ReentrantLock lock;
/**notEmpty條件對象,用於通知take方法隊列已有元素,可執行獲取操作 */
private final Condition notEmpty;
/**notFull條件對象,用於通知put方法隊列未滿,可執行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);//默認構造非公平鎖的阻塞隊列
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);//初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖
notEmpty = lock.newCondition;//初始化非空等待隊列
notFull = lock.newCondition;//初始化非滿等待隊列
}
public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 這個鎖的操作並不是為了互斥操作,而是保證其可見性。
// 假如線程1是實例化ArrayBlockingQueue對象,線程2是對實例化的ArrayBlockingQueue對象做入隊操作
// (當然要保證線程1和線程2的執行順序),如果不對它進行加鎖操作(加鎖會保證其可見性,也就是寫回主存)
// 線程1的集合有可能只存在線程1維護的緩存中,並沒有寫回主存
// 線程2中實例化的ArrayBlockingQueue維護的緩存以及主存中並沒有集合存在
// 此時就因為可見性造成數據不一致的情況,引發線程安全問題。
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
item[i++] = e;//將集合添加進數組構成的隊列中
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;//隊列中的實際數據數量
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue內部通過數組對象items來存儲所有的數據並通過一個ReentrantLock來同時控制添加線程與移除線程的並發訪問。
notEmpty條件對象用於存放等待或喚醒調用take方法的線程,告訴他們隊列已有元素,可以執行獲取操作。
notFull條件對象是用於等待或喚醒調用put方法的線程,告訴它們,隊列未滿,可以執行添加元素的操作。
takeIndex代表的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引。
putIndex則代表下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。
添加元素時阻塞
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);//檢查元素是否為null
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
if (count == items.length)//判斷隊列是否滿
return false;
else {
enqueue(e);//添加元素到隊列
return true;
}
} finally {
lock.unlock(); // 釋放鎖
}
}
//入隊操作
private void enqueue(E x) {
//獲取當前數組
final Object[] items = this.items;
//通過putIndex索引對數組進行賦值
items[putIndex] = x;
//索引自增,如果已是最后一個位置,重新設置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//隊列中元素數量加1
//喚醒調用take()方法的線程,執行元素獲取操作。
notEmpty.signal();
}
enqueue方法內部通過putIndex索引直接將元素添加到數組items中。
這里需要注意的是當putIndex索引大小等於數組長度時,需要將putIndex重新設置為0。這是因為當前隊列執行元素獲取時總是從隊列頭部獲取,而添加元素是從隊列尾部添加。所以當隊列索引(從0開始)與數組長度相等時,需要從數組頭部開始添加。
//put方法,阻塞時可中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//該方法可中斷
try {
//當隊列元素個數與數組長度相等時,無法添加元素
while (count == items.length)
//將當前調用線程掛起,添加到notFull條件隊列中等待喚醒
notFull.await();
enqueue(e);//如果隊列沒有滿直接添加。。
} finally {
lock.unlock();
}
}
put方法是一個阻塞的方法,如果隊列元素已滿,那么當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。
但如果隊列沒有滿,那么就直接調用enqueue(e)方法將元素加入到數組隊列中。
移除元素時阻塞
移除元素有這些方法:poll、remove、take
poll方法獲取並移除此隊列的頭元素,若隊列為空,則返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判斷隊列是否為null,不為null執行dequeue()方法,否則返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//刪除隊列頭元素並返回
private E dequeue() {
//拿到當前數組的數據
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取要刪除的對象
E x = (E) items[takeIndex];
將數組中takeIndex索引位置設置為null
items[takeIndex] = null;
//takeIndex索引加1並判斷是否與數組長度相等,
//如果相等說明已到盡頭,恢復為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//隊列個數減1
if (itrs != null)
itrs.elementDequeued();//同時更新迭代器中的元素數據
//刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作
notFull.signal();
return x;
}
remove(Object o)方法的刪除過程相對復雜些,因為該方法並不是直接從隊列頭部刪除元素。
首先線程先獲取鎖,接着判斷隊列count>0,這點是保證並發情況下刪除操作安全執行。
接下來獲取下一個要添加源的索引putIndex以及takeIndex索引 ,作為后續循環的結束判斷,因為只要putIndex與takeIndex不相等就說明隊列沒有結束。
然后通過while循環找到要刪除的元素索引,執行removeAt(i)方法刪除。
在removeAt(i)方法中實際上做了兩件事:
判斷隊列頭部元素是否為刪除元素,如果是直接刪除,並喚醒添加線程。
如果要刪除的元素並不是隊列頭元素,那么執行循環操作,從要刪除元素的索引removeIndex之后的元素都往前移動一個位置,那么要刪除的元素就被removeIndex之后的元素替換,從而也就完成了刪除操作。
public boolean remove(Object o) {
if (o == null) return false;
//獲取數組數據
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
//如果此時隊列不為null,這里是為了防止並發情況
if (count > 0) {
//獲取下一個要添加元素時的索引
final int putIndex = this.putIndex;
//獲取當前要被刪除元素的索引
int i = takeIndex;
//執行循環查找要刪除的元素
do {
//找到要刪除的元素
if (o.equals(items[i])) {
removeAt(i);//執行刪除
return true;//刪除成功返回true
}
//當前刪除索引執行加1后判斷是否與數組長度相等
//若為true,說明索引已到數組盡頭,將i設置為0
if (++i == items.length)
i = 0;
} while (i != putIndex);//繼承查找
}
return false;
} finally {
lock.unlock();
}
}
//根據索引刪除元素,實際上是把刪除索引之后的元素往前移動一個位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//先判斷要刪除的元素是否為當前隊列頭元素
if (removeIndex == takeIndex) {
//如果是直接刪除
items[takeIndex] = null;
//當前隊列頭元素加1並判斷是否與數組長度相等,若為true設置為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//隊列元素減1
if (itrs != null)
itrs.elementDequeued();//更新迭代器中的數據
} else {
//如果要刪除的元素不在隊列頭部,
//那么只需循環迭代把刪除元素后面的所有元素往前移動一個位置
//獲取下一個要被添加的元素的索引,作為循環判斷結束條件
final int putIndex = this.putIndex;
//執行循環
for (int i = removeIndex;;) {
//獲取要刪除節點索引的下一個索引
int next = i + 1;
//判斷是否已為數組長度,如果是從數組頭部(索引為0)開始找
if (next == items.length)
next = 0;
//如果查找的索引不等於要添加元素的索引,說明元素可以再移動
if (next != putIndex) {
items[i] = items[next];//把后一個元素前移覆蓋要刪除的元
i = next;
} else {
//在removeIndex索引之后的元素都往前移動完畢后清空最后一個元素
items[i] = null;
this.putIndex = i;
break;//結束循環
}
}
count--;//隊列元素減1
if (itrs != null)
itrs.removedAt(removeIndex);//更新迭代器數據
}
notFull.signal();//喚醒添加線程
}
take方法其實很簡單,有就刪除沒有就阻塞,只不過這個阻塞是可以中斷的,如果隊列沒有數據那么就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),如果有新的put線程添加了數據,那么put操作將會喚醒take線程,執行take操作。
//從隊列頭部刪除,隊列沒有元素就阻塞,可中斷
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中斷
try {
//如果隊列沒有元素
while (count == 0)
//執行阻塞操作
notEmpty.await();
return dequeue();//如果隊列有元素執行刪除操作
} finally {
lock.unlock();
}
}
peek方法直接返回當前隊列的頭元素但不刪除任何元素。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接返回當前隊列的頭元素,但不刪除
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
LinkedBlockingQueue
LinkedBlockingQueue是一個基於鏈表的阻塞隊列,其內部維持一個基於鏈表的數據隊列,使用兩把鎖(takeLock,putLock)允許讀寫並行,remove和iterator時需要同時獲取兩把鎖。
LinkedBlockingQueue默認為無界隊列,即大小為Integer.MAX_VALUE,如果消費者速度慢於生產者速度,可能造成內存空間不足,建議手動設置隊列大小。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 節點類,用於存儲數據
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞隊列的大小,默認為Integer.MAX_VALUE */
private final int capacity;
/** 當前阻塞隊列中的元素個數 */
private final AtomicInteger count = new AtomicInteger();
/** 阻塞隊列的頭結點 */
transient Node<E> head;
/** 阻塞隊列的尾節點 */
private transient Node<E> last;
/** 獲取並移除元素時使用的鎖,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素時使用的鎖如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 */
private final Condition notFull = putLock.newCondition();
}
每次插入操作都將動態構造Linked nodes。
每個添加到LinkedBlockingQueue隊列中的數據都將被封裝成Node節點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結點和尾結點。
與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對並發進行控制,也就是說,添加和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。
構造函數
1、LinkedBlockingQueue():初始化容量為Integer.MAX_VALUE的隊列;
2、LinkedBlockingQueue(int capacity):指定隊列容量並初始化頭尾節點
3、LinkedBlockingQueue(Collection c):初始化一個容量為Integer.MAX_VALUE且包含集合c所有元素的隊列,且阻塞隊列的迭代順序同集合c。若集合c元素包含null,將throwNullPointerException;若集合c元素個數達到Integer.MAX_VALUE,將throwIllegalStateException("Queue full")。
// 將node鏈接到隊列尾部
private void enqueue(Node<E> node) { // 入隊
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node; // 等價於last.next = node;last = last.next(即node)
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended(競爭), but necessary for visibility(可見性)
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e)); // 執行last = last.next = node;
++n;
}
count.set(n); // 設置隊列元素個數
} finally {
putLock.unlock();
}
}
添加元素阻塞
添加元素主要有這幾個方法add、offer、put
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
//添加元素為null直接拋出異常
if (e == null) throw new NullPointerException();
//獲取隊列的個數
final AtomicInteger count = this.count;
//判斷隊列是否已滿
if (count.get() == capacity)
return false;
int c = -1;
//構建節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//再次判斷隊列是否已滿,考慮並發情況
if (count.get() < capacity) {
enqueue(node);//添加元素
c = count.getAndIncrement();//拿到當前未添加新元素時的隊列長度
//如果容量還沒滿
if (c + 1 < capacity)
notFull.signal();//喚醒下一個添加線程,執行添加操作
}
} finally {
putLock.unlock();
}
// 由於存在添加鎖和消費鎖,而消費鎖和添加鎖都會持續喚醒等到線程,因此count肯定會變化。
//這里的if條件表示如果隊列中還有1條數據
if (c == 0)
signalNotEmpty();//如果還存在數據那么就喚醒消費鎖
return c >= 0; // 添加成功返回true,否則返回false
}
//入隊操作
private void enqueue(Node<E> node) {
//隊列尾節點指向新的node節點
last = last.next = node;
}
//signalNotEmpty方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
//喚醒獲取並刪除元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
這里的Offer()方法做了兩件事:
判斷隊列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,然后再次判斷隊列添加完成后是否已滿,不滿就繼續喚醒等到在條件對象notFull上的添加線程。
判斷是否需要喚醒等到在notEmpty條件對象上的消費線程。
為什么添加完成后是繼續喚醒在條件對象notFull上的添加線程而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件對象上的消費線程?為什么要當if (c == 0)時才去喚醒消費線程呢?
第一個疑問:在添加新元素完成后,會判斷隊列是否已滿,不滿就繼續喚醒在條件對象notFull上的添加線程,這點與前面分析的ArrayBlockingQueue很不相同,在ArrayBlockingQueue內部完成添加操作后,會直接喚醒消費線程對元素進行獲取,這是因為ArrayBlockingQueue只用了一個ReenterLock同時對添加線程和消費線程進行控制,這樣如果在添加完成后再次喚醒添加線程的話,消費線程可能永遠無法執行。
而對於LinkedBlockingQueue來說就不一樣了,其內部對添加線程和消費線程分別使用了各自的ReenterLock鎖對並發進行控制,也就是說添加線程和消費線程是不會互斥的,所以添加鎖只要管好自己的添加線程即可,添加線程自己直接喚醒自己的其他添加線程,如果沒有等待的添加線程,直接結束了。
如果有就直到隊列元素已滿才結束掛起,當然offer方法並不會掛起,而是直接結束,只有put方法才會當隊列滿時才執行掛起操作。注意消費線程的執行過程也是如此。這也是為什么LinkedBlockingQueue的吞吐量要相對大些的原因。
第二個疑問:消費線程一旦被喚醒是一直在消費的(前提是有數據),所以c值是一直在變化的,c值是添加完元素前隊列的大小,此時c只可能是0或c>0,如果是c=0,那么說明之前消費線程已停止,條件對象上可能存在等待的消費線程,添加完數據后應該是c+1,那么有數據就直接喚醒等待消費線程,如果沒有就結束啦,等待下一次的消費操作。如果c>0那么消費線程就不會被喚醒,只能等待下一個消費操作(poll、take、remove)的調用,那為什么不是條件c>0才去喚醒呢?我們要明白的是消費線程一旦被喚醒會和添加線程一樣,一直不斷喚醒其他消費線程,如果添加前c>0,那么很可能上一次調用的消費線程后,數據並沒有被消費完,條件隊列上也就不存在等待的消費線程了,所以c>0喚醒消費線程得意義不是很大,當然如果添加線程一直添加元素,那么一直c>0,消費線程執行的換就要等待下一次調用消費操作了(poll、take、remove)。
根據時間阻塞
//在指定時間內阻塞添加的方法,超時就結束
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//將時間轉換成納秒
long nanos = unit.toNanos(timeout);
int c = -1;
//獲取鎖
final ReentrantLock putLock = this.putLock;
//獲取當前隊列大小
final AtomicInteger count = this.count;
//鎖中斷(如果需要)
putLock.lockInterruptibly();
try {
//判斷隊列是否滿
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//如果隊列滿根據阻塞的等待
nanos = notFull.awaitNanos(nanos);
}
//隊列沒滿直接入隊
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//喚醒條件對象上等待的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//喚醒消費線程
if (c == 0)
signalNotEmpty();
return true;
}
//CoditionObject(Codition的實現類)中的awaitNanos方法
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//這里是將當前添加線程封裝成NODE節點加入Condition的等待隊列中
//注意這里的NODE是AQS的內部類Node
Node node = addConditionWaiter();
//加入等待,那么就釋放當前線程持有的鎖
int savedState = fullyRelease(node);
//計算過期時間
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//主要看這里!!由於是while 循環,這里會不斷判斷等待時間
//nanosTimeout 是否超時
//static final long spinForTimeoutThreshold = 1000L;
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//掛起線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//重新計算剩余等待時間,while循環中繼續判斷下列公式
//nanosTimeout >= spinForTimeoutThreshold
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
據傳遞進來的時間計算超時阻塞nanosTimeout,然后通過while循環中判斷nanosTimeout >= spinForTimeoutThreshold 該公式是否成立,當其為true時則說明超時時間nanosTimeout 還未到期,再次計算nanosTimeout = deadline - System.nanoTime();即nanosTimeout ,持續判斷,直到nanosTimeout 小於spinForTimeoutThreshold結束超時阻塞操作,方法也就結束。
這里的spinForTimeoutThreshold其實更像一個經驗值,因為非常短的超時等待無法做到十分精確,因此采用了spinForTimeoutThreshold這樣一個臨界值。offer(E e, long timeout, TimeUnit unit)方法內部正是利用這樣的Codition的超時等待awaitNanos方法實現添加方法的超時阻塞操作。同樣對於poll(long timeout, TimeUnit unit)方法也是一樣的道理。
移除元素阻塞
移除的方法主要有remove、poll、take
remove方法刪除指定的對象。由於remove方法刪除的數據的位置不確定,為了避免造成並非安全問題,所以需要同時對putLock和takeLock加鎖。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();//同時對putLock和takeLock加鎖
try {
//循環查找要刪除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {//找到要刪除的節點
unlink(p, trail);//直接刪除
return true;
}
}
return false;
} finally {
fullyUnlock();//解鎖
}
}
//兩個同時加鎖
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
poll方法比較簡單,如果隊列沒有數據就返回null,如果隊列有數據,那么就取出來,如果隊列還有數據那么喚醒等待在條件對象notEmpty上的消費線程。然后判斷if (c == capacity)為true就喚醒添加線程,這點與前面分析if(c==0)是一樣的道理。因為只有可能隊列滿了,notFull條件對象上才可能存在等待的添加線程。
public E poll() {
//獲取當前隊列的大小
final AtomicInteger count = this.count;
if (count.get() == 0)//如果沒有元素直接返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//判斷隊列是否有數據
if (count.get() > 0) {
//如果有,直接刪除並獲取該元素值
x = dequeue();
//當前隊列大小減一
c = count.getAndDecrement();
//如果隊列未空,繼續喚醒等待在條件對象notEmpty上的消費線程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//判斷c是否等於capacity,這是因為如果滿說明NotFull條件對象上
//可能存在等待的添加線程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;//獲取頭結點
Node<E> first = h.next; 獲取頭結的下一個節點(要刪除的節點)
h.next = h; // help GC//自己next指向自己,即被刪除
head = first;//更新頭結點
E x = first.item;//獲取刪除節點的值
first.item = null;//清空數據,因為first變成頭結點是不能帶數據的,這樣也就刪除隊列的帶數據的第一個節點
return x;
}
take方法是一個可阻塞可中斷的移除方法主要做了兩件事
如果隊列沒有數據就掛起當前線程到 notEmpty條件對象的等待隊列中一直等待,如果有數據就刪除節點並返回數據項,同時喚醒后續消費線程,嘗試喚醒條件對象notFull上等待隊列中的添加線程。
移除方法中只有take方法具備阻塞功能。remove方法是成功返回true失敗返回false,poll方法成功返回被移除的值,失敗或沒數據返回null。
public E take() throws InterruptedException {
E x;
int c = -1;
//獲取當前隊列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中斷
try {
//如果隊列沒有數據,掛機當前線程到條件對象的等待隊列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在數據直接刪除並返回該數據
x = dequeue();
c = count.getAndDecrement();//隊列大小減1
if (c > 1)
notEmpty.signal();//還有數據就喚醒后續的消費線程
} finally {
takeLock.unlock();
}
//滿足條件,喚醒條件對象上等待隊列中的添加線程
if (c == capacity)
signalNotFull();
return x;
}
lock 與 lockInterruptibly的區別
lock優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。
lockInterruptibly 優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。
詳細區別:
ReentrantLock.lockInterruptibly允許在等待時由其它線程調用等待線程的Thread.interrupt方法來中斷等待線程的等待而直接返回,這時不用獲取鎖,而會拋出一個InterruptedException。
ReentrantLock.lock方法不允許Thread.interrupt中斷,即使檢測到Thread.isInterrupted,一樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最后獲取鎖成功后再把當前線程置為interrupted狀態,然后再中斷線程。
線程喚醒
對notEmpty和notFull的喚醒操作均使用的是signal()而不是signalAll()。
signalAll() 雖然能喚醒Condition上所有等待的線程,但卻並不見得會節省資源,相反,喚醒操作會帶來上下文切換,且會有鎖的競爭。此外,由於此處獲取的鎖均是同一個(putLock或takeLock),同一時刻被鎖的線程只有一個,也就無從談起喚醒多個線程了。
LinkedBlockingQueue與ArrayBlockingQueue比較
ArrayBlockingQueue底層基於數組,創建時必須指定隊列大小,“有界”LinkedBlockingQueue“無界”,節點動態創建,節點出隊后可被GC,故伸縮性較好;
ArrayBlockingQueue入隊和出隊使用同一個lock(但數據讀寫操作已非常簡潔),讀取和寫入操作無法並行,LinkedBlockingQueue使用雙鎖可並行讀寫,其吞吐量更高。
ArrayBlockingQueue在插入或刪除元素時直接放入數組指定位置(putIndex、takeIndex),不會產生或銷毀任何額外的對象實例;而LinkedBlockingQueue則會生成一個額外的Node對象,在高效並發處理大量數據時,對GC的影響存在一定的區別。
