介紹
阻塞隊列(BlockingQueue)是指當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿;當隊列空時,隊列會阻塞獲得元素的線程,直到隊列變非空。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。
當線程 插入/獲取 動作由於隊列 滿/空 阻塞后,隊列也提供了一些機制去處理,或拋出異常,或返回特殊值,或者線程一直等待...
方法/處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除方法 | remove(o) | poll() | take() | poll(timeout, unit) |
檢查方法 | element() | peek() — 不移除元素 | 不可用 | 不可用 |
tips: 如果是無界阻塞隊列,則 put 方法永遠不會被阻塞;offer 方法始終返回 true。
Java 中的阻塞隊列:
ArrayBlockingQueue
ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序,默認情況下不保證線程公平的訪問。
通過可重入的獨占鎖 ReentrantLock 來控制並發,Condition 來實現阻塞。
public class ArrayBlockingQueueTest {
/**
* 1. 由於是有界阻塞隊列,需要設置初始大小
* 2. 默認不保證阻塞線程的公平訪問,可設置公平性
*/
private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2, true);
public static void main(String[] args) throws InterruptedException {
Thread put = new Thread(() -> {
// 3. 嘗試插入元素
try {
QUEUE.put("java");
QUEUE.put("javaScript");
// 4. 元素已滿,會阻塞線程
QUEUE.put("c++");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
put.start();
Thread take = new Thread(() -> {
try {
// 5. 獲取一個元素
System.out.println(QUEUE.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
take.start();
// 6 javaScript、c++
System.out.println(QUEUE.take());
System.out.println(QUEUE.take());
}
}
LinkedBlockingQueue
LinkedBlockingQueue 是一個用單向鏈表實現的有界阻塞隊列。此隊列的默認最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序,吞吐量通常要高於ArrayBlockingQueue。Executors.newFixedThreadPool() 就使用了這個隊列。
和 ArrayBlockingQueue 一樣,采用 ReentrantLock 來控制並發,不同的是它使用了兩個獨占鎖來控制消費和生產,通過 takeLock 和 putLock 兩個鎖來控制生產和消費,互不干擾,只要隊列未滿,生產線程可以一直生產;只要隊列不空,消費線程可以一直消費,不會相互因為獨占鎖而阻塞。
tips:因為使用了雙鎖,避免並發計算不准確,使用了一個 AtomicInteger 變量統計元素總量。
LinkedBlockingDeque
LinkedBlockingDeque 是一個由雙向鏈表結構組成的有界阻塞隊列,可以從隊列的兩端插入和移出元素。它實現了BlockingDeque接口,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以 First 單詞結尾的方法,表示插入、獲取或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入、獲取或移除雙端隊列的最后一個元素。
LinkedBlockingDeque 的 Node 實現多了指向前一個節點的變量 prev,以此實現雙向隊列。並發控制上和 ArrayBlockingQueue 類似,采用單個 ReentrantLock 來控制並發。因為雙端隊列頭尾都可以消費和生產,所以使用了一個共享鎖。
雙向阻塞隊列可以運用在“工作竊取”模式中。
public class LinkedBlockingDequeTest {
private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2);
public static void main(String[] args) {
DEQUE.addFirst("java");
DEQUE.addFirst("c++");
// java
System.out.println(DEQUE.peekLast());
// java
System.out.println(DEQUE.pollLast());
DEQUE.addLast("php");
// c++
System.out.println(DEQUE.pollFirst());
}
}
tips: take() 方法調用的是 takeFirst(),使用時候需注意。
PriorityBlockingQueue
PriorityBlockingQueue 是一個底層由數組實現的無界阻塞隊列,並帶有排序功能。由於是無界隊列,所以插入永遠不會被阻塞。默認情況下元素采取自然順序升序排列。也可以自定義類實現 compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。
底層同樣采用 ReentrantLock 來控制並發,由於只有獲取會阻塞,所以只采用一個Condition(只通知消費)來實現。
public class PriorityBlockingQueueTest {
private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>();
public static void main(String[] args) {
QUEUE.add("java");
QUEUE.add("javaScript");
QUEUE.add("c++");
QUEUE.add("python");
QUEUE.add("php");
Iterator<String> it = QUEUE.iterator();
while (it.hasNext()) {
// c++ javaScript java python php
// 同優先級不保證排序順序
System.out.print(it.next() + " ");
}
}
}
DelayQueue
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口(Delayed 接口的設計可以參考 ScheduledFutureTask 類),元素按延遲優先級排序,延遲時間短的排在前面,只有在延遲期滿時才能從隊列中提取元素。
DelayQueue 中的 PriorityQueue 會對隊列中的任務進行排序。排序時,time 小的排在前面(時間早的任務將被先執行)。如果兩個任務的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面(也就是說,如果兩個任務的執行時間相同,那么先提交的任務將被先執行)。
和 PriorityBlockingQueue 相似,底層也是數組,采用一個 ReentrantLock 來控制並發。
應用場景:
- 緩存系統的設計:可以用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
- 定時任務調度:使用 DelayQueue 保存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,比如 TimerQueue 就是使用 DelayQueue 實現的。
public class DelayElement implements Delayed, Runnable {
private static final AtomicLong SEQUENCER = new AtomicLong();
/**
* 標識元素先后順序
*/
private final long sequenceNumber;
/**
* 延遲時間,單位納秒
*/
private long time;
public DelayElement(long time) {
this.time = System.nanoTime() + time;
this.sequenceNumber = SEQUENCER.getAndIncrement();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
// compare zero if same object
if (other == this) {
return 0;
}
if (other instanceof DelayElement) {
DelayElement x = (DelayElement) other;
long diff = time - x.time;
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
} else if (sequenceNumber < x.sequenceNumber) {
return -1;
} else {
return 1;
}
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
@Override
public void run() {
System.out.println("sequenceNumber" + sequenceNumber);
}
@Override
public String toString() {
return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}';
}
}
public class DelayQueueTest {
private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>();
public static void main(String[] args) {
// 1. 添加 10 個參數
for (int i = 1; i < 10; i++) {
// 2. 5 秒內隨機延遲
int nextInt = new Random().nextInt(5);
long convert = TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS);
QUEUE.offer(new DelayElement(convert));
}
// 3. 查詢元素排序 —— 延遲短的排在前面
Iterator<DelayElement> iterator = QUEUE.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
// 4. 可觀察到元素延遲輸出
while (!QUEUE.isEmpty()) {
Thread thread = new Thread(QUEUE.poll());
thread.start();
}
}
}
LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。
並發控制上采用了大量的 CAS 操作,沒有使用鎖。
相對於其他阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
- transfer : Transfers the element to a consumer, waiting if necessary to do so. 存入的元素必須等到有消費者消費才返回。
- tryTransfer:Transfers the element to a waiting consumer immediately, if possible. 如果有消費者正在等待消費元素,則把傳入的元素傳給消費者。否則立即返回 false,不用等到消費。
SynchronousQueue
SynchronousQueue 是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則繼續 put 操作會被阻塞。Executors.newCachedThreadPool 就使用了這個隊列。
SynchronousQueue 默認情況下線程采用非公平性策略訪問隊列,未使用鎖,全部通過 CAS 操作來實現並發,吞吐量非常高,高於 LinkedBlockingQueue 和 ArrayBlockingQueue,非常適合用來處理一些高效的傳遞性場景。Executors.newCachedThreadPool() 就使用了 SynchronousQueue 進行任務傳遞。
public class SynchronousQueueTest {
private static class SynchronousQueueProducer implements Runnable {
private BlockingQueue<String> blockingQueue;
private SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName() + " Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class SynchronousQueueConsumer implements Runnable {
private BlockingQueue<String> blockingQueue;
private SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
new Thread(queueProducer, "producer - 1").start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer1, "consumer — 1").start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer2, "consumer — 2").start();
}
}
- 參考書籍:《Java 並發編程的藝術》
- 參考博文:https://www.cnblogs.com/konck/p/9473677.html