多線程當中的阻塞隊列
主要實現類有
- ArrayBlockingQueue是一個基於數組結構的有界阻塞隊列,此隊列按FIFO原則對元素進行排序
- LinkedBlockingQueue是一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue
- SynchronousQueue是一個不存儲元素的阻塞隊列,單個插入操作必須等到另一個線程調用移除操作,否則插 入操作一直處於阻塞狀態
1. 阻塞隊列概念
阻塞隊列通俗來說,是一個隊列,而一個阻塞隊列再數據結構中所起的作用大致如下圖
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素
當阻塞隊列是空時,從隊列中獲取元素的操作會被阻塞
當阻塞隊列是滿時,從隊列中添加元素的操作會被阻塞
- 試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程向空的隊列插入新的元素。
- 試圖向已滿的阻塞隊列中添加新元素的線程同樣會被阻塞,直到其他的線程從列中移除一個或者多個元素或者完全清空隊列后使隊列重新變得空閑起來並后續新增
很像生產者消費者模型!
2. 為什么要用阻塞隊列?好處是什么?
-
在多線程領域:所謂阻塞,在某些情況下會掛起線程,一旦滿足條件,被掛起的線程又會自動被喚醒
-
為什么需要BlockingQueue?
答:好處是我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一 手包辦了
-
在concurrent包發布以前,在多線程環境下,我們每個程序員都必須自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們程序帶來不小的復雜度
-
阻塞隊列圖示:
3. BlockingQueue的核心方法
方法類型 | 拋出異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take | poll(time,unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
可以看到,對於不同的方法類型,內部對應插入移除以及檢查方法對應的api都不同,所以,想要對不同的對列操作時候,需要考慮是否需要拋出異常?線程是否需要阻塞等,這樣對應不同的方法才能達到事半功倍的效果
參照源碼
對上述方法類型做描述:
方法類型 | 描述 |
---|---|
拋出異常 | 當阻塞隊列滿時,再往隊列中add會拋IllegalStateException: Queue full 當阻塞隊列空時,在網隊列里remove會拋 NoSuchElementException (這兩個都是異常) |
特殊值 | 插入方法,成功true失敗false 移除方法,成功返回出隊列的元素,隊列里沒有就返回null |
阻塞 | 當阻塞隊列滿時,生產者線程繼續往隊列里put元素,隊列會一直阻塞線程直到put數據或響應中斷退出 當阻塞隊列空時,消費者線程試圖從隊列take元素,隊列會一直阻塞消費者線程直到隊列可用 |
超時退出 | 當阻塞隊列滿時,隊列會阻塞生產者線程一定時間 超過限時后生產者線程會退出 |
4. 種類分析
4.1 接口查看
我們知道,阻塞隊列是BlockingQueue,我們打開Diagram圖查看類之間的關系可得
Queue是繼承了Queue的接口,同時Queue接口又繼承了Collection接口,那么BlockingQueue作為接口,那么一定會有實現類,不同實現類使用不同數據結構實現,完成的操作也不相同,我們這里列出BlockingQueue的7個實現類,分別為:
- ArrayBlockingQueue:由數組結構組成的有界阻塞隊列
- LinkedBlockingQueue:由鏈表結構組成的有界(但大小默認值為 Integer.MAX_VALUE )阻塞隊列
- PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
- DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列
- SychronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列
- LinkedBlockingDeque:由雙向鏈表結構組成的雙向阻塞隊列。
我們常用的實現類,在上面已經用重點符號表示了,ArrayBlockingQueue類似我們常用的ArrayList,底層數據結構是數組組成的隊列,LinkedBlockingQueue就類似我們常用的LinkedList,底層數據結構是鏈表組成的隊列,這里要注意LinkedBlockingDeque,那么Deque的接口繼承關系如下所示:
對於我們常用的實現類,我們可以發現SychronousQueue我們不常用也不了解,什么是單個元素的隊列?那么我們下面用代碼實現一下:
4.2 SychronousQueue
概念:SynchronousQueue沒有容量,與其他BlockingQueue不同,SychronousQueue是一個不存儲元素的BlockingQueue,每一個put操作必須要等待一個take操作,否則不能繼續添加元素,反之亦然
代碼實例:
package com.yuxue.juc.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new SynchronousQueue<>();
//AAA線程主要用來對阻塞隊列進行put操作
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t" + "put 1");
blockingQueue.put(1);
System.out.println(Thread.currentThread().getName() + "\t" + "put 2");
blockingQueue.put(2);
System.out.println(Thread.currentThread().getName() + "\t" + "put 3");
blockingQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
//BBB線程主要從阻塞隊列當中先操作自己的事務,休息5秒,之后拿值
new Thread(()->{
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"\t take:"+blockingQueue.take());
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"\t take:"+blockingQueue.take());
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"\t take:"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
}
}
代碼結果為:
AAA put 1
//經過5秒
BBB take:1
AAA put 2
//經過5秒
BBB take:2
AAA put 3
//經過5秒
BBB take:3
5. 用在什么地方?
講了這么多BlockingQueue的優點,那么阻塞隊列一般用在哪里?
- 生產者消費者模式
5.1 傳統版生產者消費者模式
package com.yuxue.juc.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//共享資源類
class Share1Data{
//內部共享變量
private volatile int num = 0;
//鎖
private Lock lock = new ReentrantLock();
//condition進行通知
private Condition condition = lock.newCondition();
//內部對num進行自增的方法
public void increment() {
lock.lock();
try {
//循環判斷
while (num != 0) {
condition.await();
}
//操作
num++;
System.out.println(Thread.currentThread().getName() + "\t" + num);
//通知
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//內部對num進行自減的方法
public void decrement() {
lock.lock();
try {
//循環判斷
while (num == 0) {
condition.await();
}
//操作
num--;
System.out.println(Thread.currentThread().getName() + "\t" + num);
//通知
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class TraditionalProCons {
public static void main(String[] args) {
Share1Data dataShare = new Share1Data();
//10個線程進行num增加操作,對應生產者
for (int i = 0; i < 10; i++) {
new Thread(() -> {
dataShare.increment();
}, "Productor" + i).start();
}
//10個線程進行num減少操作,對應消費者
for (int i = 0; i < 10; i++) {
new Thread(() -> {
dataShare.decrement();
}, "Consumer" + i).start();
}
}
}
運行結果為:
Productor0 1
Consumer0 0
Productor1 1
Consumer1 0
Productor3 1
Consumer2 0
Productor5 1
Consumer3 0
Productor7 1
Consumer4 0
Productor8 1
Consumer5 0
Productor2 1
Consumer6 0
Productor9 1
Consumer7 0
Productor4 1
Consumer8 0
Productor6 1
Consumer9 0
可以看到對共享變量進行增加或者減少操作的時候需要進行通知,同時對內部變量進行volatile保證變量的可見性以及禁止指令重排,可以更好地對生產者消費者模型當中操作的保證
5.2 阻塞隊列版生產者消費者模式
package com.yuxue.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class ShareData {
//標志位,true表示默認開啟生產者以及消費者
private volatile boolean flag = true;
//利用AtomicInteger類保證變量的原子性
private AtomicInteger atomicInteger = new AtomicInteger();
//建一個接口
BlockingQueue<String> blockingQueue = null;
//傳遞接口,主方法傳遞實際實現的類,可以實現代碼復用
//編程時盡量傳接口而不是傳類
public ShareData(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
//自己的生產者代碼
public void myIncrement() throws InterruptedException {
String data = null;
boolean retValue = false;
//當開啟時
while (flag) {
data = atomicInteger.getAndIncrement() + "";
//2s內添加成功返回true
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "阻塞隊列成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "阻塞隊列失敗");
}
Thread.sleep(1000);
}
//當結束時,輸出一下表示已經結束了
System.out.println(Thread.currentThread().getName() + "\t大老板叫停了, flag=false,生產結束");
}
//自己的消費者代碼
public void myDecrement() throws InterruptedException {
String result = null;
//當開啟時
while (flag) {
//2s內移除成功返回true
result = blockingQueue.poll(2, TimeUnit.SECONDS);
//內部判斷,如果移除的是null,或者移除失敗
if (null == result || result.equalsIgnoreCase("")) {
//讓工作停止
flag = false;
System.out.println(Thread.currentThread().getName() + "\t超過 2s沒有取到蛋糕,消費退出");
System.out.println();
//return的作用是不執行下面的成功語句
return;
}
System.out.println(Thread.currentThread().getName() + "\t消費隊列" + result + "成功");
}
}
//停止生產者消費者模型
public void stop() throws Exception {
flag = false;
}
}
public class BlockingProCons {
public static void main(String[] args) {
//具體的實現類
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
//共享資源
ShareData shareData = new ShareData(blockingQueue);
//生產線程
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 生產線程啟動");
try {
shareData.myIncrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Productor").start();
//消費線程
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 消費線程啟動");
try {
shareData.myDecrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Consumer").start();
try {
//主線程
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
//主線程停止生產者消費者模型
System.out.println("5s后main叫停,線程結束");
try {
shareData.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
結果:
//輸出具體的實現類
java.util.concurrent.ArrayBlockingQueue
//生產者消費者模式
Productor 生產線程啟動
Consumer 消費線程啟動
Productor 插入0阻塞隊列成功
Consumer 消費隊列0成功
Productor 插入1阻塞隊列成功
Consumer 消費隊列1成功
Productor 插入2阻塞隊列成功
Consumer 消費隊列2成功
Productor 插入3阻塞隊列成功
Consumer 消費隊列3成功
Productor 插入4阻塞隊列成功
Consumer 消費隊列4成功
//停止操作
5s后main叫停,線程結束
Productor 老板叫停了, flag=false,生產結束
Consumer 超過 2s沒有取到蛋糕,消費退出
可以看到上述並沒有用到Lock以及synchronized,而僅僅用到了阻塞隊列以及原子整型類,就可以實現生產者消費者模型,也就是不用程序員關心具體的加鎖解鎖過程,而是關心具體的業務邏輯
6. synchronized和lock有什么區別?用新的lock有什么好處?請舉例
區別:
-
原始構成
-
synchronized是關鍵字屬於jvm
其中jvm會將其字節碼運行為monitorenter以及monitorexit
monitorenter,底層是通過monitor對象來完成,其實wait/notify等方法也依賴於monitor對象只有在同 步或方法中才能掉wait/notify等方法
monitorexit
-
Lock是具體類,是api層面的鎖(java.util.concurrent.locks.Lock)
-
-
使用方法
- sychronized不需要用戶取手動釋放鎖,當synchronized代碼執行完后系統會自動讓線程釋放對鎖的占用
- ReentrantLock則需要用戶去手動釋放鎖若沒有主動釋放鎖,就有可能導致出現死鎖現象,需要lock()和 unlock()方法配合try/finally語句塊來完成
-
等待是否可中斷
- synchronized不可中斷,除非拋出異常或者正常運行完成
- ReentrantLock可中斷,設置超時方法
tryLock(long timeout, TimeUnit unit)
,或者lockInterruptibly()
放代碼塊中,調用interrupt()方法可中斷
-
加鎖是否公平
- synchronized是非公平鎖
- ReentrantLock兩者都可以,默認也是非公平鎖,構造方法可以傳入boolean值,true為公平鎖,false為非公平鎖
-
鎖綁定多個條件Condition
- synchronized沒有
- ReentrantLock用來實現分組喚醒需要要喚醒的線程們,可以精確喚醒,而不是像synchronized要么隨機喚醒一個線程要么喚醒全部線程
代碼:
package com.yuxue.juc.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * synchronized和lock區別 * ===lock可綁定多個條件=== * 對線程之間按順序調用,實現A>B>C三個線程啟動,要求如下: * AA打印5次,BB打印10次,CC打印15次 * 緊接着 * AA打印5次,BB打印10次,CC打印15次 * 。。。。 * 來十輪 */ class DataShare { private volatile int num = 0; private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); public void print5() { lock.lock(); try { while (num != 0) { c1.await(); } for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } num = 1; c2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print10() { lock.lock(); try { while (num != 1) { c2.await(); } for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } num = 2; c3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print15() { lock.lock(); try { while (num != 2) { c3.await(); } for (int i = 0; i < 15; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } num = 0; c1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class SyncAndReentrantLockDemo { public static void main(String[] args) { DataShare dataShare = new DataShare(); for (int i = 0; i < 10; i++) { new Thread(() -> { dataShare.print5(); }, "AA").start(); new Thread(() -> { dataShare.print10(); }, "BB").start(); new Thread(() -> { dataShare.print15(); }, "CC").start(); } } }
我們可以清楚地看到Lock可以創建多個Condition,同時對不同的Condition調用await以及signal方法,可以對不同的線程進行操作,這就是Lock比synchronized更方便的原因,synchronized只能對所有線程進行notifyall()
方法,隨機喚醒線程
注意:notifyall()
方法是Object類當中的方法!