CountDownLatch是個啥?


文章篇幅較短,對於一些AQS的頂級方法例如releaseShared並沒有做過深的講解,因為這些算是AQS的范疇,關於AQS可以看下另一篇文章——AQS

CountDownLatch一般被稱作"計數器",作用大致就是數量達到了某個點之后計數結束,才能繼續往下走。可以用作流程控制之類的作用,大流程分成多個子流程,然后大流程在子流程全部結束之前不動(子流程最好是相互獨立的,除非能很好的控制兩個流程的關聯關系),子流程全部結束后大流程開始操作。

 很抽象,小問題,下方的兩節或許能讓你理解CountDownLatch的用法和內部的實現。

1.CountDownLatch的使用

  假設現在,我們要起一個3塊錢的集資項目,並且限定每個人一次只能捐1塊錢當募集到3塊錢的時候立馬就把這筆錢捐給我自己,如果湊齊之后你還想捐,那么我會跟你說,項目已經完成了,你這一塊錢我不受理,自己去買雪糕吃吧;如果沒湊齊,那么我這個募集箱就一直掛在這里。這個場景用CountDownLatch可以很契合的模擬出來。

  字數也不湊了,直接看demo例子吧

public static void main(String[] args) throws InterruptedException {
    
    // 集資項目==========>啟動,目標3塊錢
    CountDownLatch countDownLatch = new CountDownLatch(3);
    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    executor.execute(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
            System.err.println("張1准備捐一塊錢");
            countDownLatch.countDown();
            System.err.println("張1捐了一塊錢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
            System.err.println("張2准備捐一塊錢");
            countDownLatch.countDown();
            System.err.println("張2捐了一塊錢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    executor.execute(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
            System.err.println("張3准備捐一塊錢");
            countDownLatch.countDown();
            System.err.println("張3捐了一塊錢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    System.err.println("我項目啟動后,就在這里等人捐錢,不夠3塊我不走了");
    countDownLatch.await();
    System.err.println("3塊錢到手,直接跑路");

    executor.shutdown();
}

結果圖:

pic1
這個結果,em,可以看到countDownLatch使用的幾個注意點:

  1. 調用countDownLatchawait()方法的線程會阻塞,直到湊夠3塊錢為止
  2. CyclicBarrier不同,其計完數之后並不會阻塞,而是直接執行接下來的操作
  3. 每次調用countDown()方法都會捐一塊錢(計數一次),滿了之后調用await()方法的線程不再阻塞

 另外,在上面的代碼中,在countDown方法之后還打印信息是為了驗證countDown方法不會阻塞當前線程,執行結果不一定如上圖那樣有順序的,例如可能出現下方的結果:

pic2
 因為最后一個countDown之后,await所在的線程不再阻塞了,又正好趕上JVM線程調度,所以就會出現上方的結果。

2.CountDownLatch的內部實現

  剛才已經講了CountDownLatch的用法,用起來還是不難的。那來看下內部是怎么實現的,又是怎么做到計數之后不跟CyclicBarrier一樣阻塞的呢?

  首先來看構造函數吧,CountDownLatch只有一個構造函數,如下

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

 所做的事情也就只有初始化內部對象sync一件事情(校驗總不能算一件事吧?),那來看下初始化了個啥玩意

// 變量sync,是不是看起來很眼熟?
private final Sync sync;

// 內部類Sync,又是一個AQS的產物
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

	// 構造方法,就是設置了AQS的state值
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    /*
     * 可以知道countDownLatch使用的是AQS的共享模式
     * 獲取資源方法,正數表示成功,負數表示失敗
     */
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 釋放方法
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            // state的狀態,在countDownLatch中表示剩余計數量,如果為0則表示可以被獲取,即await方法不再阻塞
            int c = getState();
            if (c == 0)
                return false;
            // 本次計數后還剩余的及數量
            int nextc = c-1;
            // CAS設置剩余計數量
            if (compareAndSetState(c, nextc))
                // ==0表示鎖釋放了,之后state的值將一直是0,意思就是之后的await方法都不再阻塞
                return nextc == 0;
        }
    }

 既然涉及到了AQS,那你應該懂我意思了——快去看我寫的AQS文章啊

 開個玩笑,我知道各位都多多少少了解一些,上方代碼的作用應該知道是干嘛的,不懂也沒關系,等下我在下面再講。

 回到正題,來講下從上方代碼能得到什么信息

1.CountDownLatch構造函數count的參數作用就是設置其內部的AQS的狀態state,假設count3,那么每次進行countDownAQSstate就減1,減到0的時候await方法就不再阻塞,注意這時候await方法就不再阻塞了,無論你調多少次。

2.CountDownLatch里邊的Sync實現的AQS的共享模式(從tryReleaseShared方法可以看出)

 到這里對其CountDownLatch的內部有個差不多印象了,接下來看下其最重要的awaitcountDown方法。

2.1 await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

 直接調用了AQS的頂級方法,再進去就是AQS的模塊了

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 獲取資源,成功直接返回,失敗執行下方方法(進入同步隊列)
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

 簡單說明一下,這個方法的意思就是調用tryAcquireShared的方法嘗試獲取資源,方法返回負數表示失敗返回正數則表示成功失敗了則入同步隊列(即阻塞),具體的細節可以看下AQS的詳解。
 也就是說關鍵點是 tryAcquireShared方法,這個方法剛才在上方已經解釋過,這里再放一次。方法邏輯很簡單,如果state=0(即計數完畢)則成功,否則失敗。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

  okay,await方法的整個流程大致就是:嘗試獲取資源,如果失敗則阻塞,成功了繼續當前線程的操作。什么時候會失敗呢,在state!=0的時候,而state這個變量的值我們在構造函數就已經賦予了,需要通過countDown方法來減少。

2.2 countDown

  既然這個方法這么重要,那讓它開始它的表演吧。

public void countDown() {
    sync.releaseShared(1);
}

 同樣的,直接調用AQS的頂級釋放資源的方法。

public final boolean releaseShared(int arg) {
    // 如果資源釋放了,那么喚醒同步隊列中等待的線程
    if (tryReleaseShared(arg)) {
        // 善后操作
        doReleaseShared();
        return true;
    }
    return false;
}

 關鍵的方法還是在資源的控制上——tryReleaseShared,代碼如下(上方也有):

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        /* 
         * state的狀態,在countDownLatch中表示剩余計數量
         * 如果為0則表示可以被獲取,即await方法不再阻塞
         */
        int c = getState();
        // 這里的意思是如果資源已經釋放的情況下,就不能再次釋放了,釋放成功的代碼在最后一行
        if (c == 0)
            return false;
        // 本次計數后還剩余的及數量
        int nextc = c-1;
        // CAS設置剩余計數量
        if (compareAndSetState(c, nextc))
            // ==0表示鎖釋放了,之后state的值將一直是0,意思就是之后的await方法都不再阻塞
            return nextc == 0;
    }
}

  到這里countDown方法的迷霧也看清了,每一次調用countDown方法就相當於調用tryReleaseShared方法,如果當前資源還沒釋放的話,將state-1,判斷是否為0如果為0的話表示資源釋放喚醒await方法的線程,否則的話只是更新state的值。

 整理一下整個CountDownLatch的流程。

1.創建一個CountDownLatch,並賦予一個數值,這個值表示需要計數的次數,每次countDown算一次

2.在主線程調用await方法,表示需要計數器完成之前都不能動await方法的內部實現依賴於內部的AQS,調用await方法的時候會嘗試去獲取資源,成功條件是state=0,也就是說除非countDowncount(構造函數賦予)次之后,才能成功,失敗的話當前線程進行休眠

3.在子線程調用countDown方法,每次調用都會使內部的state-1state0的時候資源釋放await方法不再阻塞(即使再次調用也是)

3. 小結

  如果理解AQS的話,不止CountDownLatch,其他衍生物例如ReentrantLock都能輕易的看懂。如果不了解的話也沒關系,這篇文章應該能讓你對CountDownLatch的內部實現有了大概的輪廓。

  簡單總結一下,CountDownLatch就三個點:構造函數的值、awaitcountDown。構造函數的值表示計數的次數,每次countDown都會使計數減一,減到0的時候await方法所在的線程就不再阻塞。



這篇文章寫得,自己都有點不好意思了...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM