本文主要介紹和對比我們常用的幾種並發工具類,主要涉及 CountDownLatch
、 CyclicBarrier
、 Semaphore
、 Exchanger
相關的內容,如果對多線程相關內容不熟悉,可以看筆者之前的一些文章:
- 《Java並發編程-線程基礎》
- 《總算把線程六種狀態的轉換說清楚了!》
- 《[高頻面試]解釋線程池的各個參數含義》
- 《知道線程池的四種拒絕策略嗎?》
- 《java中常見的六種線程池詳解》
- 《基於synchronized的鎖的深度解析》💡推薦
- 《JAVA中常見的阻塞隊列詳解》
- 《優雅關閉線程池的方案》
- 介紹
CountDownLatch
、CyclicBarrier
兩者的使用與區別,他們都是等待多線程完成,是一種並發流程的控制手段, - 介紹
Semaphore
、Exchanger
的使用,semaphore
是信號量,可以用來控制允許的線程數,而Exchanger
可以用來交換兩個線程間的數據。
CountDownLatch
CountDownLatch
是JDK5
之后加入的一種並發流程控制工具,它在java.util.concurrent
包下CountDownLatch
允許一個或多個線程等待其他線程完成操作,這里需要注意,是可以是一個等待也可以是多個來等待CountDownLatch
的構造函數如下,它接受一個int
類型的參數作為計數器,即如果你想等待N
個線程完成,那么這里就傳入N
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
- 其中有兩個核心的方法
countDown
與await
,其中 當我們調用countDown
方法時相應的N
的值減 1,而await
方法則會阻塞當前線程,直到N
的值變為零。 - 說起來比較抽象,下面我們通過實際案例來說明。
多個線程等待一個線程
- 在我們生活中最典型的案例就是體育中的跑步,假設現在我們要進行一場賽跑,那么所有的選手都需要等待裁判員的起跑命令,這時候,我們將其抽象化每個選手對應的是一個線程,而裁判員也是一個線程,那么就是多個選手的線程再等待裁判員線程的命令來執行
- 我們通過
CountDownLatch
來實現這一案例,那么等待的個數N
就是上面的裁判線程的個數,即為 1,
/**
* @url i-code.onlien
* 雲棲簡碼
*/
public static void main(String[] args) throws InterruptedException {
//模擬跑步比賽,裁判說開始,所有選手開始跑,我們可以使用countDownlatch來實現
//這里需要等待裁判說開始,所以時等着一個線程
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() +"已准備");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"開始跑~~");
},"選手1").start();
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() +"已准備");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"開始跑~~");
},"選手2").start();
TimeUnit.SECONDS.sleep(1);
System.out.println("裁判:預備~~~");
countDownLatch.countDown();
System.out.println("裁判:跑~~~");
}
- 運行結果如下:
在上述代碼中,我們首先創建了一個計數為1 的
CountDownLatch
對象,這代表我們需要等待的線程數,之后再創建了兩個線程,用來代表選手線程,同時在選手的線程中我們都調用了await
方法,讓線程進入阻塞狀態,直到CountDownLatch的計數為零后再執行后面的內容,在主線程main
方法中我們等待 1秒后執行countDown
方法,這個方法就是減一,此時的N
則為零了,那么選手線程則開始執行后面的內容,整體的輸出如上圖所示
一個/多個線程等待多個線程
- 同樣從我們生活中的場景來抽象,假設公司要組織出游,大巴車接送,當湊夠五個人大巴車則發車出發,這里就是大巴車需要等待這五個人全部到齊才能繼續執行,我們抽象之后用
CountDownLatch
來實現,那么的計數個數N
則為5,因為要等待這五個,通過代碼實現如下:
public static void main(String[] args) throws InterruptedException {
/**
* i-code.online
* 雲棲簡碼
*/
//等待的個數
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "從住所出發...");
try {
TimeUnit.SECONDS.sleep((long) (Math.random()*10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 到達目的地-----");
countDownLatch.countDown();
},"人員-"+i).start();
}
System.out.println("大巴正在等待人員中.....");
countDownLatch.await();
System.out.println("-----所有人到齊,出發-----");
}
- 上面代碼執行結果如下:
從上述代碼中我們可以看到,定義了一個計數為5的
countDownLatch
,之后通過循環創建五個線程,模擬五個人員,當他們到達指定地點后執行countDown
方法,對計數減一。主線程相當於是大巴車的線程,執行await
方法進行阻塞,只有當N
的值減到0后則執行后面的輸出
CountDownLatch 主要方法介紹
- 構造函數:
public CountDownLatch(int count) { };
它的構造函數是傳入一個參數,該參數
count
是需要倒數的數值。
await()
:調用await()
方法的線程開始等待,直到倒數結束,也就是count
值為0
的時候才會繼續執行。await(long timeout, TimeUnit unit)
:await()
有一個重載的方法,里面會傳入超時參數,這個方法的作用和await()
類似,但是這里可以設置超時時間,如果超時就不再等待了。countDown()
:把數值倒數1
,也就是將count
值減1
,直到減為0
時,之前等待的線程會被喚起。
上面的案例介紹了
CountDownLatch
的使用,但是CountDownLatch
有個特點,那就是不能夠重用,比如已經完成了倒數,那可不可以在下一次繼續去重新倒數呢?是可以的,一旦倒數到0 則結束了,無法再次設置循環執行,但是我們實際需求中有很多場景中需要循環來處理,這時候我們可以使用CyclicBarrier
來實現
CyclicBarrier
CyclicBarrier
與CountDownLatch
比較相似,當等待到一定數量的線程后開始執行某個任務CyclicBarrier
的字面意思是可以循環使用的屏障,它的功能就是讓一組線程到達一個屏障(同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開會,此時所有被屏障阻塞的線程都將繼續執行。如下演示
- 上圖中可以看到,到線程到達屏障后阻塞,直到最后一個也到達后,則全部放行
- 首先我們來看下它的構造函數,如下:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
CyclicBarrier(int parties)
構造函數提供了int
類型的參數,代表的是需要攔截的線程數量,而每個線程通過調用await
方法來告訴CyclicBarrier
我到達屏障點了,然后阻塞CyclicBarrier(int parties, Runnable barrierAction)
構造函數是為我們提供的一個高級方法,加了一個barrierAction
的參數,這是一個Runnable
類型的,也就是一個線程,它表示當所有線程到達屏障后,悠閑觸發barrierAction
線程執行,再執行各個線程之后的內容
案例
- 假設你要和你女朋友約會,約定了一個時間地點,那么不管你們誰先到都會等待另一個到才會出發取約會~ 那么這時候我們通過
CyclicBarrier
的來實現,這里我們需要來攔截的線程就是兩個。具體實現 如下:
/*
CyclicBarrier 與countDownLatch 比較相似,也是等待線程完成,
不過countDownLatch 是await等待其他的線程通過countDown的數量,達到一定數則執行,
而 CyclicBarrier 則是直接看await的數量,達到一定數量直接全部執行,
*/
public static void main(String[] args) {
//好比情侶約會,不管誰先到都的等另一個,這里就是兩個線程,
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(() ->{
System.out.println("快速收拾,出門~~~");
try {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("到了約會地點等待女朋友前來~~");
cyclicBarrier.await();
System.out.println("女朋友到來嗨皮出發~~約會");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"男朋友").start();
new Thread(() ->{
System.out.println("慢慢收拾,出門~~~");
try {
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println("到了約會地點等待男朋友前來~~");
cyclicBarrier.await();
System.out.println("男朋友到來嗨皮出發~~約會");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"女朋友").start();
}
- 代碼執行結果如下:
上面代碼,相對簡單,創建一個攔截數為2的屏障,之后創建兩個線程,調用await方法,只有當調用兩次才會觸發后面的流程。
- 我們再寫一個案例sh,使用含有
Runnable
參數的構造函數;和之前CountDownLatch
的案例相似,公司組織出游,這時候肯定有很多大巴在等待接送,大巴不會等所有的 人都到才出發,而是每坐滿一輛車就出發一輛,這種場景我們就可以使用CyclicBarrier
來實現,實現如下:
/*
CyclicBarrier是可重復使用到,也就是每當幾個滿足是不再等待執行,
比如公司組織出游,安排了好多輛大把,每坐滿一輛就發車,不再等待,類似這種場景,實現如下:
*/
public static void main(String[] args) {
//公司人數
int peopleNum = 2000;
//每二十五個人一輛車,湊夠二十五則發車~
CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
//達到25人出發
System.out.println("------------25人數湊齊出發------------");
});
for (int j = 1; j <= peopleNum; j++) {
new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
}
}
static class PeopleTask implements Runnable{
private String name;
private CyclicBarrier cyclicBarrier;
public PeopleTask(String name,CyclicBarrier cyclicBarrier){
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(name+"從家里出發,正在前往聚合地....");
try {
TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+"到達集合地點,等待其他人..");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CyclicBarrier 和 CountDownLatch 的異同
相同點:
- 都能阻塞一個或一組線程,直到某個預設的條件達成發生,再統一出發
不同點:
- 可重復性:
CountDownLatch
的計數器只能使用一次,到到達0后就不能再次使用了,除非新建實例;而CyclicBarrier
的計數器是可以復用循環的,所以CyclicBarrier
可以用在更復雜的場景,可以隨時調用reset
方法來重制攔截數,如計算發生錯誤時可以直接充值計數器,讓線程重新執行一次。 - 作用對象:
CyclicBarrier
要等固定數量的線程都到達了屏障位置才能繼續執行,而CountDownLatch
只需等待數字倒數到0
,也就是說CountDownLatch
作用於事件,但CyclicBarrier
作用於線程;CountDownLatch
是在調用了countDown
方法之后把數字倒數減1
,而CyclicBarrier
是在某線程開始等待后把計數減1
。 - 執行動作:
CyclicBarrier
有執行動作barrierAction
,而CountDownLatch
沒這個功能。
Semaphore
Semaphore
(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源,
- 從圖中可以看出,信號量的一個最主要的作用就是,來控制那些需要限制並發訪問量的資源。具體來講,信號量會維護“許可證”的計數,而線程去訪問共享資源前,必須先拿到許可證(
acquire
方法)。線程可以從信號量中去“獲取”一個許可證,一旦線程獲取之后,信號量持有的許可證就轉移過去了,所以信號量手中剩余的許可證要減一。 - 同理,線程也可以“釋放”一個許可證,如果線程釋放了許可證(
release
方法),這個許可證相當於被歸還給信號量了,於是信號量中的許可證的可用數量加一。當信號量擁有的許可證數量減到 0 時,如果下個線程還想要獲得許可證,那么這個線程就必須等待,直到之前得到許可證的線程釋放,它才能獲取。由於線程在沒有獲取到許可證之前不能進一步去訪問被保護的共享資源,所以這就控制了資源的並發訪問量,這就是整體思路。
案例
- 如我們平時開發中典型的數據庫操作,這是一個密集
IO
操作,我們可以啟動很多線程但是數據庫的連接池是有限制的,假設我們設置允許五個鏈接,如果我們開啟太多線程直接操作則會出現異常,這時候我們可以通過信號量來控制,讓一直最多只有五個線程來獲取連接。代碼如下:
/*
Semaphore 是信號量, 可以用來控制線程的並發數,可以協調各個線程,以達到合理的使用公共資源
*/
public static void main(String[] args) {
//創建10個容量的線程池
final ExecutorService service = Executors.newFixedThreadPool(100);
//設置信號量的值5 ,也就是允許五個線程來執行
Semaphore s = new Semaphore(5);
for (int i = 0; i < 100; i++) {
service.submit(() ->{
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("數據庫耗時操作"+Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在執行....");
s.release();
});
}
}
如上代碼,創建了一個容量100的線程池,模擬我們程序中大量的線程,添加一百個任務,讓線程池執行。創建了一個容量為5的信號量,在線程中我們調用
acquire
來獲得信號量的許可,只有獲得了才能只能下面的內容不然阻塞。當執行完后釋放該許可,通過release
方法,
- 通過上面的演示,有沒有覺得非常眼熟,對,就是和我們之前接觸過的鎖很相似,只是鎖是只允許一個線程訪問,那我們能不能將信號量的容量設置為1呢? 這當然是可以的,當我們設置為1時其實就和我們的鎖的功能是一致的,如下代碼:
private static int count = 0;
/*
Semaphore 中如果我們允許的的許可證數量為1 ,那么它的效果與鎖相似。
*/
public static void main(String[] args) throws InterruptedException {
final ExecutorService service = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(1);
for (int i = 0; i < 10000; i++) {
service.submit(() ->{
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "執行了");
count ++;
semaphore.release();
});
}
service.shutdown();
TimeUnit.SECONDS.sleep(5);
System.out.println(count);
}
其他主要方法介紹
public boolean tryAcquire()
:tryAcquire
和鎖的trylock
思維是一致的,是嘗試獲取許可證,相當於看看現在有沒有空閑的許可證,如果有就獲取,如果現在獲取不到也沒關系,不必陷入阻塞,可以去做別的事。public boolean tryAcquire(long timeout, TimeUnit unit)
:是一個重載的方法,它里面傳入了超時時間。比如傳入了 3 秒鍾,則意味着最多等待 3 秒鍾,如果等待期間獲取到了許可證,則往下繼續執行;如果超時時間到,依然獲取不到許可證,它就認為獲取失敗,且返回 false。int availablePermits()
:返回此信號量中當前可用的許可證數int getQueueLength()
:返回正在等待許可證的線程數boolean hasQueuedThreads()
:判斷是否有線程正在等待獲取許可證void reducePermits(int reduction)
:減少reduction
個許可證,是個protected
方法Collection<Thread> getQueuedThreads()
:返回正在等待獲取許可證的線程集合,是個protected
方法
Exchanger
Exchanger
(交換者)是一個用於線程間協作的工具類,它主要用於進行線程間數據的交換,它有一個同步點,當兩個線程到達同步點時可以將各自的數據傳給對方,如果一個線程先到達同步點則會等待另一個到達同步點,到達同步點后調用exchange
方法可以傳遞自己的數據並且獲得對方的數據。- 我們假設現在需要錄入一些重要的賬單信息,為了保證准備,讓兩個人分別錄入,之后再進行對比后是否一致,防止錯誤繁盛。下面通過代碼來演示:
public class ExchangerTest {
/*
Exchanger 交換, 用於線程間協作的工具類,可以交換線程間的數據,
其提供一個同步點,當線程到達這個同步點后進行數據間的交互,遺傳算法可以如此來實現,
以及校對工作也可以如此來實現
*/
public static void main(String[] args) {
/*
模擬 兩個工作人員錄入記錄,為了防止錯誤,兩者錄的相同內容,程序僅從校對,看是否有錯誤不一致的
*/
//開辟兩個容量的線程池
final ExecutorService service = Executors.newFixedThreadPool(2);
Exchanger<InfoMsg> exchanger = new Exchanger<>();
service.submit(() ->{
//模擬數據 線程 A的
InfoMsg infoMsg = new InfoMsg();
infoMsg.content="這是線程A";
infoMsg.id ="10001";
infoMsg.desc = "1";
infoMsg.message = "message";
System.out.println("正在執行其他...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
final InfoMsg exchange = exchanger.exchange(infoMsg);
System.out.println("線程A 交換數據====== 得到"+ exchange);
if (!exchange.equals(infoMsg)){
System.out.println("數據不一致~~請稽核");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.submit(() ->{
//模擬數據 線程 B的
InfoMsg infoMsg = new InfoMsg();
infoMsg.content="這是線程B";
infoMsg.id ="10001";
infoMsg.desc = "1";
infoMsg.message = "message";
System.out.println("正在執行其他...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
final InfoMsg exchange = exchanger.exchange(infoMsg);
System.out.println("線程B 交換數據====== 得到"+ exchange);
if (!exchange.equals(infoMsg)){
System.out.println("數據不一致~~請稽核");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.shutdown();
}
static class InfoMsg{
String id;
String name;
String message;
String content;
String desc;
@Override
public String toString() {
return "InfoMsg{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", message='" + message + '\'' +
", content='" + content + '\'' +
", desc='" + desc + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InfoMsg infoMsg = (InfoMsg) o;
return Objects.equals(id, infoMsg.id) &&
Objects.equals(name, infoMsg.name) &&
Objects.equals(message, infoMsg.message) &&
Objects.equals(content, infoMsg.content) &&
Objects.equals(desc, infoMsg.desc);
}
@Override
public int hashCode() {
return Objects.hash(id, name, message, content, desc);
}
}
}
- 運行結果如下:
上面代碼運行可以看到,當我們線程
A/B
到達同步點即調用exchange
后進行數據的交換,拿到對方的數據再與自己的數據對比可以做到稽核 的效果
Exchanger
同樣可以用於遺傳算法中,選出兩個對象進行交互兩個的數據通過交叉規則得到兩個混淆的結果。Exchanger
中嗨提供了一個方法public V exchange(V x, long timeout, TimeUnit unit)
主要是用來防止兩個程序中一個一直沒有執行exchange
而導致另一個一直陷入等待狀態,這是可以用這個方法,設置超時時間,超過這個時間則不再等待。
本文由AnonyStar 發布,可轉載但需聲明原文出處。
歡迎關注微信公賬號 :雲棲簡碼 獲取更多優質文章
更多文章關注筆者博客 :雲棲簡碼 i-code.online