💛原文地址為https://www.cnblogs.com/haixiang/p/12354520.html,轉載請注明出處!
什么是阻塞隊列
原文地址為,轉載請注明出處!
阻塞隊列是一個支持阻塞的插入和移除的隊列。
- 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 支持阻塞的移除方法:意思是隊列為空時,獲取元素(同時移除元素)的線程會被阻塞,等到隊列變為非空。
阻塞隊列用法
阻塞隊列常用於生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是從隊列里獲取元素的線程。
當阻塞隊列不可用時,會有四種相應的處理方式:
處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入操作 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除操作 | remove() | poll() | take() | poll(time, unit) |
獲取操作 | element() | peek() | 不可用 | 不可用 |
- 返回特殊值:插入元素時,會返回是否插入成功,成功返回true。如果是移除方法,則是從隊列中取出一個元素,沒有則返回null。
- 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里面put元素,則生產者線程會被阻塞,知道隊列不滿或者響應中斷退出。當隊列為空時,如果消費者線程從隊列里take元素。
- 超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程一段時間,如果超過了指定時間,生產者線程就會退出。
如果是無界阻塞隊列,隊列則不會出現滿的情況。
阻塞隊列
-
ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列
-
LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列
-
PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列
-
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列
-
SynchronousQueue:一個不存儲元素的阻塞隊列
-
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列
1.ArrayBlockingQueue
此隊列按照先進先出(FIFO)的原則對元素進行排序
默認情況下不保證線程公平地訪問隊列(所謂公平是指當隊列可用時,先被阻塞的線程先訪問隊列)
為了保證公平性通常會降低吞吐量。
公平鎖是利用了可重入鎖的公平鎖來實現。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.ArrayBlockingQueue
此隊列按照先進先出(FIFO)的原則對元素進行排序
默認長度為Integer.MAX_VALUE
3.PriorityBlockingQueue
默認情況下元素采取自然順序升序排列
可以自定義Comparator
或者自定義類實現compareTo()
方法來指定排序規則
不支持同優先級元素排序
4.DelayQueue
隊列使用PriorityQueue
來實現,隊列中的元素必須實現Delayed
接口
只有在延時期滿才能從隊列中提取元素
阻塞隊列原理
如果隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?
使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。
以ArrayBlockingQueue
為例子
/** items 存放隊列中的元素*/
final Object[] items;
/** take 操作的索引 */
int takeIndex;
/** put 操作的索引 */
int putIndex;
/** 隊列中元素個數 */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** 控制生產者 takes 操作的 Condition */
private final Condition notEmpty;
/** 控制消費者 put 操作的 Condition */
private final Condition notFull;
put
操作
public void put(E e) throws InterruptedException {
checkNotNull(e); //判斷 e == null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //獲取鎖,與lock不同,可以嘗試中斷阻塞
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
入隊操作,入隊之后喚醒消費者線程。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
notFull.await();
中其實調用了park
方法,先使用setBlocker
保存一下將要阻塞的線程,然后調用本地方法UNSAFE.park(boolean isAbsolute, long time)
進行阻塞。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}