Java並發編程筆記之 CountDownLatch閉鎖的源碼分析


JUC 中倒數計數器 CountDownLatch 的使用與原理分析,當需要等待多個線程執行完畢后在做一件事情時候 CountDownLatch 是比調用線程的 join 方法更好的選擇,CountDownLatch 與 線程的 join 方法區別是什么?

日常開發中經常會遇到需要在主線程中開啟多線程去並行執行任務,並且主線程需要等待所有子線程執行完畢后再進行匯總的場景,它的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操作計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減為0時就代表條件已成熟,所有因調用await方法而阻塞的線程都會被喚醒。這就是CountDownLatch的內部機制,看起來很簡單,無非就是阻塞一部分線程讓其在達到某個條件之后再執行。但是CountDownLatch的應用場景卻比較廣泛,只要你腦洞夠大利用它就可以玩出各種花樣。最常見的一個應用場景是開啟多個線程同時執行某個任務,等到所有任務都執行完再統計匯總結果。下圖動態演示了閉鎖阻塞線程的整個過程。

 

 

在CountDownLatch出現之前一般都是使用線程的join()方法來實現,但是join不夠靈活,不能夠滿足不同場景的需求。接下來我們看看CountDownLatch的原理實現。

 

一.CountDownLatch原理探究

  從CountDownLatch的名字可以猜測內部應該有個計數器,並且這個計數器是遞減的,下面就通過源碼看看JDK開發組是何時初始化計數器,何時遞減的,計數器變為 0 的時候做了什么操作,多個線程是如何通過計時器值實現同步的,首先我們先看看CountDownLatch內部結構,類圖如下:

從類圖可以知道CountDownLatch內部還是使用AQS實現的,通過下面構造函數初始化計數器的值,可知實際上是把計數器的值賦值給了AQS的state,也就是這里AQS的狀態值來表示計數器值。

構造函數源碼如下:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

   Sync(int count) {
       setState(count);
   }

接下來主要看一下CountDownLatch中幾個重要的方法內部是如何調用AQS來實現功能的。

  1.void await()方法,當前線程調用了CountDownLatch對象的await方法后,當前線程會被阻塞,直到下面的情況之一才會返回:(1)當所有線程都調用了CountDownLatch對象的countDown方法后,

也就是說計時器值為 0 的時候。(2)其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常后返回。接下來讓我們看看await()方法內部是如何調用

AQS的方法的,源碼如下:

//CountDownLatch的await()方法
public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}
    //AQS的獲取共享資源時候可被中斷的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    //如果線程被中斷則拋異常
    if (Thread.interrupted())
         throw new InterruptedException();
        //嘗試看當前是否計數值為0,為0則直接返回,否者進入AQS的隊列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
}

 //sync類實現的AQS的接口
 protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
 }

  從上面代碼可以看到await()方法委托sync調用了AQS的acquireSharedInterruptibly方法,該方法的特點是線程獲取資源的時候可以被中斷,並且獲取到的資源是共享資源,這里為什么要調用AQS的這個方法,而不是調用獨占鎖的accquireInterruptibly方法呢?這是因為這里狀態值需要的並不是非 0 即 1 的效果,而是和初始化時候指定的計數器值有關系,比如你初始化的時候計數器值為 8 ,那么state的值應該就有 0 到 8 的狀態,而不是只有  0  和  1 的獨占效果。

  這里await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這里計數器值是資源總數,也就是意味着是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷如果當前線程被中斷了則拋出異常,否則調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否為 0  ,是則當前線程的await()方法直接返回,否則調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是不是為 0 ,並沒有調用CAS讓當前狀態值減去 1 。

 

  2.boolean await(long timeout, TimeUnit unit),當線程調用了 CountDownLatch 對象的該方法后,當前線程會被阻塞,直到下面的情況之一發生才會返回: (1)當所有線程都調用了 CountDownLatch 對象的 countDown 方法后,也就是計時器值為 0 的時候,這時候返回 true; (2) 設置的 timeout 時間到了,因為超時而返回 false; (3)其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常后返回。源碼如下:

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

 

  3.void countDown() 當前線程調用了該方法后,會遞減計數器的值,遞減后如果計數器為 0 則會喚醒所有調用await 方法而被阻塞的線程,否則什么都不做,接下來看一下countDown()方法內部是如何調用AQS的方法的,源碼如下:

   //CountDownLatch的countDown()方法
    public void countDown() {
       //委托sync調用AQS的方法
        sync.releaseShared(1);
    }
   //AQS的方法
    public final boolean releaseShared(int arg) {
        //調用sync實現的tryReleaseShared
        if (tryReleaseShared(arg)) {
            //AQS的釋放資源方法
            doReleaseShared();
            return true;
        }
        return false;
    }

如上面代碼可以知道CountDownLatch的countDown()方法是委托sync調用了AQS的releaseShared方法,后者調用了sync 實現的AQS的tryReleaseShared,源碼如下:

//syn的方法
protected boolean tryReleaseShared(int releases) {
  //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一並更新到state
  for (;;) {
      int c = getState();

      //如果當前狀態值為0則直接返回(1)
      if (c == 0)
          return false;

      //CAS設置計數值減一(2)
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
          return nextc == 0;
  }
}

如上代碼可以看到首先獲取當前狀態值(計數器值),代碼(1)如果當前狀態值為 0 則直接返回 false ,則countDown()方法直接返回;否則執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,否則如果當前計數器為 0 則返回 true 。返回 true 后,說明當前線程是最后一個調用countDown()方法的線程,那么該線程除了讓計數器減一外,還需要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這里的代碼(1)貌似是多余的,其實不然,之所以添加代碼 (1) 是為了防止計數器值為 0 后,其他線程又調用了countDown方法,如果沒有代碼(1),狀態值就會變成負數。

 

  4.long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值,一般在 debug 測試時候使用,源碼如下:

public long getCount() {
     return sync.getCount();
}

int getCount() {
     return getState();
}

如上代碼可知內部還是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。

 

到目前為止原理理解的差不多了,接下來用一個例子進行講解CountDownLatch的用法,例子如下:

package com.hjc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by cong on 2018/7/6.
 */
public class CountDownLatchTest {

    private static AtomicInteger id = new AtomicInteger();

    // 創建一個CountDownLatch實例,管理計數為ThreadNum
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();
            }
        });

        Thread threadTwo = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        Thread threadThree = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        // 啟動子線程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
        System.out.println("等待斗地主玩家進場");

        // 等待子線程執行完畢,返回
        countDownLatch.await();

        System.out.println("斗地主玩家已經滿人,開始發牌.....");

    }
}

運行結果如下:

 

 

如上代碼,創建了一個 CountDownLatch 實例,因為有兩個子線程所以構造函數參數傳遞為 3,主線程調用 countDownLatch.await()方法后會被阻塞。子線程執行完畢后調用 countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等所有子線程執行完畢調用 countDown()后計數器會變為 0,這時候主線程的 await()才會返回。

 

如果把上面的代碼中Thread.sleep和countDownLatch.await()的代碼注釋掉,運行幾遍,運行結果就可能會出現如下結果,如下圖:

 可以看到在注釋掉latch.await()這行之后,就不能保證在所有玩家入場后才開始發牌了。

 

總結:CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法后,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器允許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 可以在子線程運行任何時候讓 await 方法返回而不一定必須等到線程結束;另外使用線程池來管理線程時候一般都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓我們對線程同步有更靈活的控制。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM