阻塞隊列
概念
隊列
隊列就可以想成是一個數組,從一頭進入,一頭出去,排隊買飯
阻塞隊列
BlockingQueue 阻塞隊列,排隊擁堵,首先它是一個隊列,而一個阻塞隊列在數據結構中所起的作用大致如下圖所示:
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素
-
當阻塞隊列是空時,從隊列中獲取元素的操作將會被阻塞
- 當蛋糕店的櫃子空的時候,無法從櫃子里面獲取蛋糕
-
當阻塞隊列是滿時,從隊列中添加元素的操作將會被阻塞
- 當蛋糕店的櫃子滿的時候,無法繼續向櫃子里面添加蛋糕了
也就是說 試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其它線程往空的隊列插入新的元素
同理,試圖往已經滿的阻塞隊列中添加新元素的線程,直到其它線程往滿的隊列中移除一個或多個元素,或者完全清空隊列后,使隊列重新變得空閑起來,並后續新增
為什么要用?
去海底撈吃飯,大廳滿了,需要進候廳等待,但是這些等待的客戶能夠對商家帶來利潤,因此我們非常歡迎他們阻塞
在多線程領域:所謂的阻塞,在某些清空下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動喚醒
為什么需要BlockingQueue
好處是我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都幫你一手包辦了
在concurrent包發布以前,在多線程環境下,我們每個程序員都必須自己取控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。
架構
// 你用過List集合類
// ArrayList集合類熟悉么?
// 還用過 CopyOnWriteList 和 BlockingQueue
BlockingQueue阻塞隊列是屬於一個接口,底下有七個實現類
- ArrayBlockQueue:由數組結構組成的有界阻塞隊列
- LinkedBlockingQueue:由鏈表結構組成的有界(但是默認大小 Integer.MAX_VALUE)的阻塞隊列
- 有界,但是界限非常大,相當於無界,可以當成無界
- PriorityBlockQueue:支持優先級排序的無界阻塞隊列
- DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列
- SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列
- 生產一個,消費一個,不存儲元素,不消費不生產
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列
- LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列
這里需要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
BlockingQueue核心方法
拋出異常 | 當阻塞隊列滿時:在往隊列中add插入元素會拋出 IIIegalStateException:Queue full 當阻塞隊列空時:再往隊列中remove移除元素,會拋出NoSuchException |
---|---|
特殊性 | 插入方法,成功true,失敗false 移除方法:成功返回出隊列元素,隊列沒有就返回空 |
一直阻塞 | 當阻塞隊列滿時,生產者繼續往隊列里put元素,隊列會一直阻塞生產線程直到put數據or響應中斷退出, 當阻塞隊列空時,消費者線程試圖從隊列里take元素,隊列會一直阻塞消費者線程直到隊列可用。 |
超時退出 | 當阻塞隊列滿時,隊里會阻塞生產者線程一定時間,超過限時后生產者線程會退出 |
拋出異常組
但執行add方法,向已經滿的ArrayBlockingQueue中添加元素時候,會拋出異常
// 阻塞隊列,需要填入默認值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("XXX"));
運行后:
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
同時如果我們多取出元素的時候,也會拋出異常,我們假設只存儲了3個值,但是取的時候,取了四次
// 阻塞隊列,需要填入默認值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
那么出現異常
true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
at java.util.AbstractQueue.remove(AbstractQueue.java:117)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
布爾類型組
我們使用 offer的方法,添加元素時候,如果阻塞隊列滿了后,會返回false,否者返回true
同時在取的時候,如果隊列已空,那么會返回null
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
運行結果
true
true
true
false
a
b
c
null
阻塞隊列組
我們使用 put的方法,添加元素時候,如果阻塞隊列滿了后,添加消息的線程,會一直阻塞,直到隊列元素減少,會被清空,才會喚醒
一般在消息中間件,比如RabbitMQ中會使用到,因為需要保證消息百分百不丟失,因此只有讓它阻塞
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("================");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
同時使用take取消息的時候,如果內容不存在的時候,也會被阻塞
不見不散組
offer( ) , poll 加時間
使用offer插入的時候,需要指定時間,如果2秒還沒有插入,那么就放棄插入
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
同時取的時候也進行判斷
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
如果2秒內取不出來,那么就返回null
SynchronousQueue
SynchronousQueue沒有容量,與其他BlockingQueue不同,SynchronousQueue是一個不存儲的BlockingQueue,每一個put操作必須等待一個take操作,否者不能繼續添加元素
下面我們測試SynchronousQueue添加元素的過程
首先我們創建了兩個線程,一個線程用於生產,一個線程用於消費
生產的線程分別put了 A、B、C這三個字段
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
消費線程使用take,消費阻塞隊列中的內容,並且每次消費前,都等待5秒
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
最后結果輸出為:
t1 put A
t2 take A
5秒后...
t1 put B
t2 take B
5秒后...
t1 put C
t2 take C
我們從最后的運行結果可以看出,每次t1線程向隊列中添加阻塞隊列添加元素后,t1輸入線程就會等待 t2消費線程,t2消費后,t2處於掛起狀態,等待t1在存入,從而周而復始,形成 一存一取的狀態
阻塞隊列的用處
生產者消費者模式
一個初始值為0的變量,兩個線程對其交替操作,一個加1,一個減1,來5輪
關於多線程的操作,我們需要記住下面幾句
- 線程 操作 資源類
- 判斷 干活 通知
- 防止虛假喚醒機制
我們下面實現一個簡單的生產者消費者模式,首先有資源類ShareData
/**
* 資源類
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代碼塊,加鎖
lock.lock();
try {
// 判斷
while(number != 0) {
// 等待不能生產
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 喚醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代碼塊,加鎖
lock.lock();
try {
// 判斷
while(number == 0) {
// 等待不能消費
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 喚醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
里面有一個number變量,同時提供了increment 和 decrement的方法,分別讓number 加1和減1
但是我們在進行判斷的時候,為了防止出現虛假喚醒機制,不能使用if來進行判斷,而應該使用while
// 判斷
while(number != 0) {
// 等待不能生產
condition.await();
}
不能使用 if判斷
// 判斷
if(number != 0) {
// 等待不能生產
condition.await();
}
完整代碼
/**
* 生產者消費者 傳統版
* 題目:一個初始值為0的變量,兩個線程對其交替操作,一個加1,一個減1,來5輪
*/
/**
* 線程 操作 資源類
* 判斷 干活 通知
* 防止虛假喚醒機制
*/
/**
* 資源類
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代碼塊,加鎖
lock.lock();
try {
// 判斷
while(number != 0) {
// 等待不能生產
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 喚醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代碼塊,加鎖
lock.lock();
try {
// 判斷
while(number == 0) {
// 等待不能消費
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 喚醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// 高內聚,低耦合 內聚指的是,一個空調,自身帶有調節溫度高低的方法
ShareData shareData = new ShareData();
// t1線程,生產
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1").start();
// t2線程,消費
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
最后運行成功后,我們一個進行生產,一個進行消費
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
生成者和消費者3.0
在concurrent包發布以前,在多線程環境下,我們每個程序員都必須自己去控制這些細節,尤其還要兼顧效率和線程安全,則這會給我們的程序帶來不小的時間復雜度
現在我們使用新版的阻塞隊列版生產者和消費者,使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用
/**
* 生產者消費者 阻塞隊列版
* 使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用
*
*/
class MyResource {
// 默認開啟,進行生產消費
// 這里用到了volatile是為了保持數據的可見性,也就是當TLAG修改時,要馬上通知其它線程進行修改
private volatile boolean FLAG = true;
// 使用原子包裝類,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 這里不能為了滿足條件,而實例化一個具體的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而應該采用依賴注入里面的,構造注入方法傳入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查詢出傳入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
/**
* 生產
* @throws Exception
*/
public void myProd() throws Exception{
String data = null;
boolean retValue;
// 多線程環境的判斷,一定要使用while進行,防止出現虛假喚醒
// 當FLAG為true的時候,開始生產
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 2秒存入1個data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data + "成功" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data + "失敗" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t 停止生產,表示FLAG=false,生產介紹");
}
/**
* 消費
* @throws Exception
*/
public void myConsumer() throws Exception{
String retValue;
// 多線程環境的判斷,一定要使用while進行,防止出現虛假喚醒
// 當FLAG為true的時候,開始生產
while(FLAG) {
// 2秒存入1個data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消費隊列:" + retValue + "成功" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消費失敗,隊列中已為空,退出" );
// 退出消費隊列
return;
}
}
}
/**
* 停止生產的判斷
*/
public void stop() {
this.FLAG = false;
}
}
public class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) {
// 傳入具體的實現類, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生產線程啟動");
System.out.println("");
System.out.println("");
try {
myResource.myProd();
System.out.println("");
System.out.println("");
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消費線程啟動");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
// 5秒后,停止生產和消費
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("");
System.out.println("");
System.out.println("5秒中后,生產和消費線程停止,線程結束");
myResource.stop();
}
}
最后運行結果
java.util.concurrent.ArrayBlockingQueue
prod 生產線程啟動
consumer 消費線程啟動
prod 插入隊列:1成功
consumer 消費隊列:1成功
prod 插入隊列:2成功
consumer 消費隊列:2成功
prod 插入隊列:3成功
consumer 消費隊列:3成功
prod 插入隊列:4成功
consumer 消費隊列:4成功
prod 插入隊列:5成功
consumer 消費隊列:5成功
5秒中后,生產和消費線程停止,線程結束
prod 停止生產,表示FLAG=false,生產介紹