JUC 中倒數計數器 CountDownLatch 的使用與原理分析,當需要等待多個線程執行完畢后在做一件事情時候 CountDownLatch 是比調用線程的 join 方法更好的選擇,CountDownLatch 與 線程的 join 方法區別是什么?
日常開發中經常會遇到需要在主線程中開啟多線程去並行執行任務,並且主線程需要等待所有子線程執行完畢后再進行匯總的場景,它的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操作計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減為0時就代表條件已成熟,所有因調用await方法而阻塞的線程都會被喚醒。這就是CountDownLatch的內部機制,看起來很簡單,無非就是阻塞一部分線程讓其在達到某個條件之后再執行。但是CountDownLatch的應用場景卻比較廣泛,只要你腦洞夠大利用它就可以玩出各種花樣。最常見的一個應用場景是開啟多個線程同時執行某個任務,等到所有任務都執行完再統計匯總結果。下圖動態演示了閉鎖阻塞線程的整個過程。
在CountDownLatch出現之前一般都是使用線程的join()方法來實現,但是join不夠靈活,不能夠滿足不同場景的需求。接下來我們看看CountDownLatch的原理實現。
一.CountDownLatch原理探究
從CountDownLatch的名字可以猜測內部應該有個計數器,並且這個計數器是遞減的,下面就通過源碼看看JDK開發組是何時初始化計數器,何時遞減的,計數器變為 0 的時候做了什么操作,多個線程是如何通過計時器值實現同步的,首先我們先看看CountDownLatch內部結構,類圖如下:
從類圖可以知道CountDownLatch內部還是使用AQS實現的,通過下面構造函數初始化計數器的值,可知實際上是把計數器的值賦值給了AQS的state,也就是這里AQS的狀態值來表示計數器值。
構造函數源碼如下:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
接下來主要看一下CountDownLatch中幾個重要的方法內部是如何調用AQS來實現功能的。
1.void await()方法,當前線程調用了CountDownLatch對象的await方法后,當前線程會被阻塞,直到下面的情況之一才會返回:(1)當所有線程都調用了CountDownLatch對象的countDown方法后,
也就是說計時器值為 0 的時候。(2)其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常后返回。接下來讓我們看看await()方法內部是如何調用
AQS的方法的,源碼如下:
//CountDownLatch的await()方法 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //AQS的獲取共享資源時候可被中斷的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException { //如果線程被中斷則拋異常 if (Thread.interrupted()) throw new InterruptedException(); //嘗試看當前是否計數值為0,為0則直接返回,否者進入AQS的隊列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //sync類實現的AQS的接口 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
從上面代碼可以看到await()方法委托sync調用了AQS的acquireSharedInterruptibly方法,該方法的特點是線程獲取資源的時候可以被中斷,並且獲取到的資源是共享資源,這里為什么要調用AQS的這個方法,而不是調用獨占鎖的accquireInterruptibly方法呢?這是因為這里狀態值需要的並不是非 0 即 1 的效果,而是和初始化時候指定的計數器值有關系,比如你初始化的時候計數器值為 8 ,那么state的值應該就有 0 到 8 的狀態,而不是只有 0 和 1 的獨占效果。
這里await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這里計數器值是資源總數,也就是意味着是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷如果當前線程被中斷了則拋出異常,否則調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否為 0 ,是則當前線程的await()方法直接返回,否則調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是不是為 0 ,並沒有調用CAS讓當前狀態值減去 1 。
2.boolean await(long timeout, TimeUnit unit),當線程調用了 CountDownLatch 對象的該方法后,當前線程會被阻塞,直到下面的情況之一發生才會返回: (1)當所有線程都調用了 CountDownLatch 對象的 countDown 方法后,也就是計時器值為 0 的時候,這時候返回 true; (2) 設置的 timeout 時間到了,因為超時而返回 false; (3)其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常后返回。源碼如下:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
3.void countDown() 當前線程調用了該方法后,會遞減計數器的值,遞減后如果計數器為 0 則會喚醒所有調用await 方法而被阻塞的線程,否則什么都不做,接下來看一下countDown()方法內部是如何調用AQS的方法的,源碼如下:
//CountDownLatch的countDown()方法 public void countDown() { //委托sync調用AQS的方法 sync.releaseShared(1); } //AQS的方法 public final boolean releaseShared(int arg) { //調用sync實現的tryReleaseShared if (tryReleaseShared(arg)) { //AQS的釋放資源方法 doReleaseShared(); return true; } return false; }
如上面代碼可以知道CountDownLatch的countDown()方法是委托sync調用了AQS的releaseShared方法,后者調用了sync 實現的AQS的tryReleaseShared,源碼如下:
//syn的方法 protected boolean tryReleaseShared(int releases) { //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一並更新到state for (;;) { int c = getState(); //如果當前狀態值為0則直接返回(1) if (c == 0) return false; //CAS設置計數值減一(2) int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
如上代碼可以看到首先獲取當前狀態值(計數器值),代碼(1)如果當前狀態值為 0 則直接返回 false ,則countDown()方法直接返回;否則執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,否則如果當前計數器為 0 則返回 true 。返回 true 后,說明當前線程是最后一個調用countDown()方法的線程,那么該線程除了讓計數器減一外,還需要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這里的代碼(1)貌似是多余的,其實不然,之所以添加代碼 (1) 是為了防止計數器值為 0 后,其他線程又調用了countDown方法,如果沒有代碼(1),狀態值就會變成負數。
4.long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值,一般在 debug 測試時候使用,源碼如下:
public long getCount() { return sync.getCount(); } int getCount() { return getState(); }
如上代碼可知內部還是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。
到目前為止原理理解的差不多了,接下來用一個例子進行講解CountDownLatch的用法,例子如下:
package com.hjc; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * Created by cong on 2018/7/6. */ public class CountDownLatchTest { private static AtomicInteger id = new AtomicInteger(); // 創建一個CountDownLatch實例,管理計數為ThreadNum private static volatile CountDownLatch countDownLatch = new CountDownLatch(3); public static void main(String[] args) throws InterruptedException { Thread threadOne = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); Thread threadTwo = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); Thread threadThree = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); // 啟動子線程 threadOne.start(); threadTwo.start(); threadThree.start(); System.out.println("等待斗地主玩家進場"); // 等待子線程執行完畢,返回 countDownLatch.await(); System.out.println("斗地主玩家已經滿人,開始發牌....."); } }
運行結果如下:
如上代碼,創建了一個 CountDownLatch 實例,因為有兩個子線程所以構造函數參數傳遞為 3,主線程調用 countDownLatch.await()方法后會被阻塞。子線程執行完畢后調用 countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等所有子線程執行完畢調用 countDown()后計數器會變為 0,這時候主線程的 await()才會返回。
如果把上面的代碼中Thread.sleep和countDownLatch.await()的代碼注釋掉,運行幾遍,運行結果就可能會出現如下結果,如下圖:
可以看到在注釋掉latch.await()這行之后,就不能保證在所有玩家入場后才開始發牌了。
總結:CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法后,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器允許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 可以在子線程運行任何時候讓 await 方法返回而不一定必須等到線程結束;另外使用線程池來管理線程時候一般都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓我們對線程同步有更靈活的控制。