一、背景
大家平時應該也遇到過這樣的場景,使用多線程執行一段操作,然后依賴這一段操作的結果再執行其他邏輯。這個時候我們就要控制線程之間的順序,必須保證該多線程操作執行完之后才開始執行后面的邏輯。
那么今天這篇文章將介紹CountDownLatch和CyclicBarrier的用法以及如何使用它們分別來實現以上場景。
二、CountDownLatch用法
概念:
CountDownLatch:具有計數器的功能,等待其他線程執行完畢,主線程在繼續執行,用於監聽某些初始化操作,並且線程進行阻塞,等初始化執行完畢后,通知主線程繼續工作執行。
值得注意的是CountDownLatch計數的次數一定要與構造器傳入的數字一致,比如構造器傳入的是3,則countDown()一定要執行3次,否則線程將一直阻塞。CountDownLatch通常用來控制線程等待,它可以讓線程等待倒計時結束,再開始執行。
特點:
只能一次性使用(不能reset);主線程阻塞;某個線程中斷將永遠到不了屏障點,所有線程都會一直等待。
CountDownLatch類只提供了一個構造器:
public CountDownLatch(int count) { }; //參數count為計數值
下面這3個方法是CountDownLatch類中最重要的方法:
public void await() throws InterruptedException { }; //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行 public void countDown() { }; //將count值減1
構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量。這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值。與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。所以當N個線程都調用了這個方法,count的值等於0,然后主線程就能通過await()方法,恢復執行自己的任務。
用法:
我們描述這樣一個場景:三位運動員比賽跑步,當三位運動員都准備好之后比賽才開始。
代碼實現如下
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * <p> * * </p> * * @className ThreadTest * @author Sue * @create 2021/8/27 **/ public class ThreadTestA { //創建初始化3個線程的線程池 private final ExecutorService threadPool = Executors.newFixedThreadPool(3); private final CountDownLatch countDownLatch = new CountDownLatch(3); private void ready() { for (int i = 0; i < 3; i++) { threadPool.execute(() -> { try { //讓該線程等待,假設為[0,5000]的隨機數 long times = Math.round(Math.random() * 5000); System.out.println("運動員" + Thread.currentThread().getName() + "需要准備" + times + "ms"); Thread.sleep(times); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "運動員准備完畢"); countDownLatch.countDown(); }); } threadPool.shutdown(); } public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有運動員准備完畢!比賽開始"); } public static void main(String[] args) { long now = System.currentTimeMillis(); ThreadTestA threadTestA = new ThreadTestA(); threadTestA.ready(); threadTestA.run(); long end = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + (end - now)); } }
執行后輸出結果,從結果可以看出確實是在三個線程都執行完成之后,才開始執行主線程的run方法。
三、CyclicBarrier用法
概念:
CyclicBarrier翻譯過來就是循環屏障的意思,其作用就是讓一組線程到達一個同步點后再一起繼續運行,在其中任意一個線程未達到同步點,其他到達的線程均會被阻塞。這個屏障之所以用循環修飾,是因為在所有的線程釋放彼此之后,這個屏障是可以重新使用的,這一點與CountDownLatch不同。假設有一個場景:每個線程代表一個跑步運動員,當運動員都准備好后,才一起出發,只要有一個人沒有准備好,大家都等待。
CyclicBarrier是一種同步機制允許一組線程相互等待,等到所有線程都到達一個屏障點才退出await方法,它沒有直接實現AQS而是借助ReentrantLock來實現的同步機制。它是可循環使用的,而CountDownLatch是一次性的,另外它體現的語義也跟CountDownLatch不同,CountDownLatch減少計數到達條件采用的是release方式,而CyclicBarrier走向屏障點(await)采用的是Acquire方式,Acquire是會阻塞的,這也實現了CyclicBarrier的另外一個特點,只要有一個線程中斷那么屏障點就被打破,所有線程都將被喚醒(CyclicBarrier自己負責這部分實現,不是由AQS調度的),這樣也避免了因為一個線程中斷引起永遠不能到達屏障點而導致其他線程一直等待。屏障點被打破的CyclicBarrier將不可再使用(會拋出BrokenBarrierException)除非執行reset操作。
CyclicBarrier類位於java.util.concurrent包下,CyclicBarrier提供2個構造器:
public CyclicBarrier(int parties, Runnable barrierAction) {}
public CyclicBarrier(int parties) {}
參數parties指讓多少個線程或者任務等待至barrier狀態;參數barrierAction為當這些線程都達到barrier狀態時會執行的內容。
CyclicBarrier中最重要的方法就是await方法,它有2個重載版本:
public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
第一個版本比較常用,用來掛起當前線程,直至所有線程都到達barrier狀態再同時執行后續任務;
第二個版本是讓這些線程等待至一定的時間,如果還有線程沒有到達barrier狀態就直接讓到達barrier的線程執行后續任務。
CyclicBarrier常用方法說明
getParties()
獲取CyclicBarrier打開屏障的線程數量。
getNumberWaiting()
獲取正在CyclicBarrier上等待的線程數量。
await()
在CyclicBarrier上進行阻塞等待,直到發生以下情形之一:
- 在CyclicBarrier上等待的線程數量達到parties,則所有線程被釋放,繼續執行。
- 當前線程被中斷,則拋出InterruptedException異常,並停止等待,繼續執行。
- 其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
- 其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
- 其他線程調用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
await(timeout,TimeUnit)
在CyclicBarrier上進行限時的阻塞等待,直到發生以下情形之一:
- 在CyclicBarrier上等待的線程數量達到parties,則所有線程被釋放,繼續執行。
- 當前線程被中斷,則拋出InterruptedException異常,並停止等待,繼續執行。
- 當前線程等待超時,則拋出TimeoutException異常,並停止等待,繼續執行。
- 其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
- 其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
- 其他線程調用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,並停止等待,繼續執行。
isBroken()
獲取是否破損標志位broken的值,此值有以下幾種情況:
- CyclicBarrier初始化時,broken=false,表示屏障未破損。
- 如果正在等待的線程被中斷,則broken=true,表示屏障破損。
- 如果正在等待的線程超時,則broken=true,表示屏障破損。
- 如果有線程調用CyclicBarrier.reset()方法,則broken=false,表示屏障回到未破損狀態。
reset()
使得CyclicBarrier回歸初始狀態,直觀來看它做了兩件事:
- 如果有正在等待的線程,則會拋出BrokenBarrierException異常,且這些線程停止等待,繼續執行。
- 將是否破損標志位broken置為false。
用法:我們繼續使用上面的例子,但是使用CyclicBarrier來實現
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * <p> * * </p> * * @className Test03 * @author Sue * @create 2021/8/27 **/ public class ThreadTestB implements Runnable { private static final int THREAD_COUNT_NUM = 3; //創建初始化3個線程的線程池 private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT_NUM); //創建3個CyclicBarrier對象,執行完后執行當前類的run方法 private CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT_NUM, this); private void ready() { for (int i = 0; i < THREAD_COUNT_NUM; i++) { threadPool.execute(() -> { //讓該線程等待,假設為[0,5000]的隨機數 long times = Math.round(Math.random() * 5000); try { Thread.sleep(times); System.out.println("運動員" + Thread.currentThread().getName() + "正在准備,用時" + times + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } try { //執行完運行await(),等待所有運動員准備完畢 cb.await(); System.out.println("運動員" + Thread.currentThread().getName() + "已出發!"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } } @Override public void run() { System.out.println(Thread.currentThread().getName() + "所有運動員准備完畢!比賽開始"); threadPool.shutdown(); } public static void main(String[] args) { long now = System.currentTimeMillis(); ThreadTestB cb = new ThreadTestB(); cb.ready(); long end = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + (end - now)); } }
執行后輸出結果
同樣可以看出,只有在最后一個線程達到屏障之后,才會從三個線程中選擇一個線程去執行Runnable,且不會阻塞主線程。
一個屏障可以多次使用的,代碼如下。

import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * <p> * * </p> * * @className Test03 * @author Sue * @create 2021/8/27 **/ public class ThreadTestC { //線程數量 private static final int THREAD_COUNT_NUM = 3; //創建初始化3個線程的線程池 private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT_NUM); //創建3個CyclicBarrier對象 private final CyclicBarrier cb1 = new CyclicBarrier(THREAD_COUNT_NUM, () -> { System.out.println("所有運動員已入場!開始准備比賽"); threadPool.shutdown(); }); private void entrance() { for (int i = 0; i < THREAD_COUNT_NUM; i++) { threadPool.execute(() -> { try { //讓該線程等待,假設為[0,1000]的隨機數 long times = Math.round(Math.random() * 5000); Thread.sleep(times); System.out.println("運動員" + Thread.currentThread().getName() + "已入場,用時" + times + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } try { //執行完運行await(),等待所有運動員入場 cb1.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } } private void ready() { for (int i = 0; i < THREAD_COUNT_NUM; i++) { threadPool.execute(() -> { try { //讓該線程等待,假設為[0,5000]的隨機數 long times = Math.round(Math.random() * 5000); Thread.sleep(times); System.out.println("運動員" + Thread.currentThread().getName() + "准備完畢,用時" + times + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } try { //執行完運行await(),等待所有運動員准備完畢 cb1.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } } public static void main(String[] args) { long now = System.currentTimeMillis(); ThreadTestC cb = new ThreadTestC(); //入場 cb.entrance(); //准備 cb.ready(); long end = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + (end - now)); } }
從執行結果可以看出,在初次的4個線程越過barrier狀態后,又可以用來進行新一輪的使用。而CountDownLatch無法進行重復使用。
總結
通過上面的幾個例子,想必應該對CountDownLatch和CyclicBarrier有一些了解了。我們再來總結一下兩者的區別。
- CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:
- CountDownLatch一般用於某個線程A等待若干個其他線程執行完任務之后,它才執行;
- 而CyclicBarrier一般用於一組線程互相等待至某個狀態,然后這一組線程再同時執行;
- 另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。
- CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
- CyclicBarrier可以使用reset()方法重置屏障點
如果想了解更多的用法,可以參考以下鏈接
Java並發編程:CountDownLatch、CyclicBarrier和Semaphore