CountDownLatch原理詳解


介紹

當你看到這篇文章的時候需要先了解AQS的原理,因為本文不涉及到AQS內部原理的講解。

CountDownLatch是一種同步輔助,讓我們多個線程執行任務時,需要等待線程執行完成后,才能執行下面的語句,之前線程操作時是使用Thread.join方法進行等待,CountDownLatch內部使用了AQS鎖,前面已經講述過AQS的內部結構,其實內部有一個state字段,通過該字段來控制鎖的操作,CountDownLatch是如何控制多個線程執行都執行結束?其實CountDownLatch內部是將state作為計數器來使用,比如我們初始化時,state計數器為3,同時開啟三個線程當有一個線程執行成功,每當有一個線程執行完成后就將state值減少1,直到減少到為0時,說明所有線程已經執行完畢。

源碼解析

以一個例子來開始進行源碼解析,后面的內容會針對例子來進行源碼的分解過程,我們開啟三個線程,主線程需要等待三個線程都完成后才能進行后續任務處理,源碼如下所示:

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 計數器3個。
        CountDownLatch countDownLatch = new CountDownLatch(3);

        for (int i = 0; i < 3; ++i) {
            new Thread(new Worker(countDownLatch, i)).start();
        }
        // 等待三個線程都完成
        countDownLatch.await();
        System.out.println("3個線程全部執行完成");
    }

    // 搬運工人工作線程工作類。
    static class Worker implements Runnable {
        private final CountDownLatch countDown;
        private final Integer id;

        Worker(CountDownLatch countDown, Integer id) {
            this.countDown = countDown;
            this.id = id;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(500);
                doWork();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDown.countDown();
            System.out.println("第" + id + "個線程執行完成工作");
        }

        void doWork() {
            System.out.println("第" + id + "個線程開始工作");
        }
    }
}

通過一個例子來說明一下CountDownLatch的工作原理,上面例子我們開啟了三個線程,每個線程分別執行自己的任務,主線程等待三個線程執行完成,看一下輸出的結果:

等待三個線程完成
第1個線程開始工作
第0個線程開始工作
第0個線程執行完成工作
第1個線程執行完成工作
第2個線程開始工作
第2個線程執行完成工作
3個線程全部執行完成

這里我們將三個線程想象成搬運工人,將貨物搬運到車上,三個人必須將自己手頭分配的任務都搬運完成后才能觸發,也即是貨車司機需要等待三個人都完成才能發車,貨車司機此時手里有個小本本,記錄本次搬運的總人數,線程未啟動時如下所示

1.png
1.png

當搬運工人開始工作時,每個搬運工人各自忙碌自己的任務,假如當工人1完成后,需要跟司機報備一下,說我已經完成任務了,這時候司機會在自己的小本本上記錄,工人1已經完成任務,此時還剩下兩個工人沒有完成任務。

2.png
2.png

每當工人完成自己手頭任務時,都會向司機報備,當所有工人都完成之后,此時工人的小本本記錄的完成人數都已完成,司機這時候就可以發車了,因為三個人已經完成了搬運工作。

3.png
3.png

通過上面的例子,大致了解了CountDownLatch的簡單原理,如何保證司機(state)記錄誰完成了誰沒完成呢?CountDownLatch內部通過AQS的state來完成計數器的功能,接下來通過源碼來進行詳細分析:

public class CountDownLatch {
    /**
     * 同步控制,
     * 使用 AQS的state來表示計數。
     */

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 初始化state值(也就是需要等待幾個線程完成任務)
        Sync(int count) {
            setState(count);
        }
        // 獲取state值。
        int getCount() {
            return getState();
        }
        // 獲得鎖。
        protected int tryAcquireShared(int acquires) {
            // 這里判斷如果state=0的時候才能獲得鎖,反之獲取不到將當前線程放入到隊列中阻塞。
            // 這里是關鍵點。
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // state進行減少,當state減少為0時,阻塞線程才能進行處理。
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    // 鎖對象。
    private final Sync sync;

    /**
     * 初始化同步鎖對象。
     */

    public CountDownLatch(int count) 
        if (count < 0throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * 導致當前線程等待直到閂鎖倒計時到零,除非線程是被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,
     * 則當前線程將被禁用以進行線程調度並處於休眠狀態,直到發生以下兩種情況:
     * 1.計數達到零。
     * 2.如果當前線程被中斷。
     */

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 等待計數器清零或被中斷,等待一段時間后如果還是沒有
     */

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException 
{
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 使當前線程等待直到閂鎖倒計時到零,除非線程被中斷或指定的等待時間已過。
     */

    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * 返回state值。
     */

    public long getCount() {
        return sync.getCount();
    }
}

CountDownLatch源碼看上去很少,通過CountDownLatch源碼看到內部是基於AQS來進行實現的,內部類Sync類繼承自AbstractQueuedSynchronizer並且實現了tryAcquireSharedtryReleaseShared,通過構造函數看到,會創建一個AQS同步對象,並且將state值進行初始化,如果初始化count小於0則拋出異常。

public CountDownLatch(int count) {
    if (count < 0throw new IllegalArgumentException("count < 0");
    // 初始化AQS的state值。
    this.sync = new Sync(count);
}

根據上面的例子我們來看一下初始化情況下的AQS內部情況:

5.png
5.png

awit方法

當調用awit方法時,其實內部調用的AQS的acquireSharedInterruptibly方法,這個方法會調用Sync中tryAcquireShared的方法,通過上面例子,我們初始化時將state值初始化2,但是Sync中判斷(getState() == 0) ? 1 : -1;此時state值為2,判定為false,則返回-1,當返回負數時,內部會將當前線程掛起,並且放入AQS的隊列中,直到AQS的state值減少到0會喚醒當前線程,或者是當前線程被中斷,線程會拋出InterruptedException異常,然后返回。

/**
 * 導致當前線程等待直到閂鎖倒計時到零,除非線程是被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,
 * 則當前線程將被禁用以進行線程調度並處於休眠狀態,直到發生以下兩種情況:
 * 1.計數達到零。
 * 2.如果當前線程被中斷。
 */

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

當線程調用await方法時,其實內部調用的是AQS的acquireSharedInterruptibly,我們來看一下AQS內部acquireSharedInterruptibly的方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
{
        // 響應中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        // 調用tryAcquireShared 方法。
        if (tryAcquireShared(arg) < 0)
            // 阻塞線程,將線程加入到阻塞隊列等到其他線程恢復線程。
            doAcquireSharedInterruptibly(arg);
    }
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException 
{
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null// help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireSharedInterruptibly內部調用的是CountDownLatch內部類Sync實現的tryAcquireShared方法,tryAcquireShared判斷state是否已經清空,也就是計數器是否已經清零了,清零時才能進行執行,此時並沒有進行清空,則會將當前線程掛起,並且將掛起的線程放入到AQS的阻塞隊列,等待其他線程喚醒動作。

6.png
6.png

coutDown方法

當線程執行完成后,會調用CountDownLatchcountDowncountDown方法內部調用的AQS的releaseSharedreleaseShared方法實現在Sync類中,該方法主要作用是將state計數器中的值,進行減1操作,先進行判斷是否已經將state值修改為0,如果修改為則不進行下面的操作,防止狀態已經修改為0時,其他線程還調用了countDown操作導致state值變為負數,當state值減少1時,會通知阻塞隊列中的等待線程,假設上面的例子其中一個線程先執行了countDown方法,則此時state=1,並且喚醒阻塞隊列中的線程,線程還是會去調用tryAcquireShared方法,發現還是返回-1,則還會將當前線程進行掛起阻塞並且加入到阻塞隊列中。此時隊列狀態如下所示:

7.png
7.png

當另外一個線程也執行完成,調用countDown時,state減少1則變為state=0,當這時候喚醒等待的線程時, tryAcquireShared返回的結果是1,則會直接返回成功。

總結

CountDownLatch是利用AQS的state來做計數器功能,當初始化CountDownLatch時,會將state值進行初始化,讓調用CountDownLatch的awit時,會判斷state計數器是否已經變為0,如果沒有變為0則掛起當前線程,並加入到AQS的阻塞隊列中,如果有線程調用了CountDownLatch的countDown時,這時的操作是將state計數器進行減少1,每當減少操作時都會喚醒阻塞隊列中的線程,線程會判斷此時state計數器是否已經都執行完了,如果還沒有執行完則繼續掛起當前線程,直到state計數器清零或線程被中斷為止。

喜歡的朋友可以關注我的微信公眾號,不定時推送文章

123.png
123.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM