【JUC】JDK1.8源碼分析之CountDownLatch(五)


一、前言

  分析完了CyclicBarrier后,下面分析CountDownLatch,CountDownLatch用於同步一個或多個任務,強制他們等待由其他任務執行的一組操作完成。CountDownLatch典型的用法是將一個程序分為n個互相獨立的可解決任務,並創建值為n的CountDownLatch。當每一個任務完成時,都會在這個鎖存器上調用countDown,等待問題被解決的任務調用這個鎖存器的await,將他們自己攔住,直至鎖存器計數結束。下面開始分析源碼。

二、CountDownLatch數據結構

  從源碼可知,其底層是由AQS提供支持,所以其數據結構可以參考AQS的數據結構,而AQS的數據結構核心就是兩個虛擬隊列:同步隊列sync queue 和條件隊列condition queue,不同的條件會有不同的條件隊列。讀者可以參考之前介紹的AQS

三、CountDownLatch源碼分析

  3.1 類的繼承關系 

public class CountDownLatch {}

  說明:可以看到CountDownLatch沒有顯示繼承哪個父類或者實現哪個父接口,根據Java語言規定,可知其父類是Object。

  3.2 類的內部類

  CountDownLatch類存在一個內部類Sync,繼承自AbstractQueuedSynchronizer,其源代碼如下。 

    private static final class Sync extends AbstractQueuedSynchronizer {
        // 版本號
        private static final long serialVersionUID = 4982264981922014374L;
        
        // 構造器
        Sync(int count) {
            setState(count);
        }
        
        // 返回當前計數
        int getCount() {
            return getState();
        }

        // 試圖在共享模式下獲取對象狀態
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 試圖設置狀態來反映共享模式下的一個釋放
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 無限循環
            for (;;) {
                // 獲取狀態
                int c = getState();
                if (c == 0) // 沒有被線程占有
                    return false;
                // 下一個狀態
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) // 比較並且設置成功
                    return nextc == 0;
            }
        }
    }
View Code

  說明:對CountDownLatch方法的調用會轉發到對Sync或AQS的方法的調用,所以,AQS對CountDownLatch提供支持。

  3.3 類的屬性 

public class CountDownLatch {
    // 同步隊列
    private final Sync sync;
}

  說明:可以看到CountDownLatch類的內部只有一個Sync類型的屬性,這個屬性相當重要,后面會進行分析。

  3.4 類的構造函數

  1. CountDownLatch(int) 型構造函數  

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 初始化狀態數
        this.sync = new Sync(count);
    }
View Code

  說明:該構造函數可以構造一個用給定計數初始化的CountDownLatch,並且構造函數內完成了sync的初始化,並設置了狀態數。

  3.5 核心函數分析

  1. await函數

  此函數將會使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。其源碼如下  

    public void await() throws InterruptedException {
        // 轉發到sync對象上
        sync.acquireSharedInterruptibly(1);
    }
View Code

  說明:由源碼可知,對CountDownLatch對象的await的調用會轉發為對Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的調用,acquireSharedInterruptibly源碼如下  

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
View Code

  說明:從源碼中可知,acquireSharedInterruptibly又調用了CountDownLatch的內部類Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函數。tryAcquireShared函數的源碼如下 

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
View Code

  說明:該函數只是簡單的判斷AQS的state是否為0,為0則返回1,不為0則返回-1。doAcquireSharedInterruptibly函數的源碼如下  

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加節點至等待隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) { // 無限循環
                // 獲取node的前驅節點
                final Node p = node.predecessor();
                if (p == head) { // 前驅節點為頭結點
                    // 試圖在共享模式下獲取對象狀態
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 獲取成功
                        // 設置頭結點並進行繁殖
                        setHeadAndPropagate(node, r);
                        // 設置節點next域
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 在獲取失敗后是否需要禁止線程並且進行中斷檢查
                    // 拋出異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
View Code

  說明:在AQS的doAcquireSharedInterruptibly中可能會再次調用CountDownLatch的內部類Sync的tryAcquireShared方法和AQS的setHeadAndPropagate方法。setHeadAndPropagate方法源碼如下。  

    private void setHeadAndPropagate(Node node, int propagate) {
        // 獲取頭結點
        Node h = head; // Record old head for check below
        // 設置頭結點
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        // 進行判斷
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 獲取節點的后繼
            Node s = node.next;
            if (s == null || s.isShared()) // 后繼為空或者為共享模式
                // 以共享模式進行釋放
                doReleaseShared();
        }
    }
View Code

  說明:該方法設置頭結點並且釋放頭結點后面的滿足條件的結點,該方法中可能會調用到AQS的doReleaseShared方法,其源碼如下。

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        // 無限循環
        for (;;) {
            // 保存頭結點
            Node h = head;
            if (h != null && h != tail) { // 頭結點不為空並且頭結點不為尾結點
                // 獲取頭結點的等待狀態
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) { // 狀態為SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續
                        continue;            // loop to recheck cases
                    // 釋放后繼結點
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態為0並且不成功,繼續
                    continue;                // loop on failed CAS
            }
            if (h == head) // 若頭結點改變,繼續循環  
                break;
        }
    }
View Code

  說明:該方法在共享模式下釋放,具體的流程再之后會通過一個示例給出。

  所以,對CountDownLatch的await調用大致會有如下的調用鏈。

  說明:上圖給出了可能會調用到的主要方法,並非一定會調用到,之后,會通過一個示例給出詳細的分析。

  2. countDown函數

  此函數將遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程  

    public void countDown() {
        sync.releaseShared(1);
    }
View Code

  說明:對countDown的調用轉換為對Sync對象的releaseShared(從AQS繼承而來)方法的調用。releaseShared源碼如下 

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
View Code

  說明:此函數會以共享模式釋放對象,並且在函數中會調用到CountDownLatch的tryReleaseShared函數,並且可能會調用AQS的doReleaseShared函數,其中,tryReleaseShared源碼如下  

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 無限循環
            for (;;) {
                // 獲取狀態
                int c = getState();
                if (c == 0) // 沒有被線程占有
                    return false;
                // 下一個狀態
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) // 比較並且設置成功
                    return nextc == 0;
            }
        }
View Code

  說明:此函數會試圖設置狀態來反映共享模式下的一個釋放。具體的流程在下面的示例中會進行分析。AQS的doReleaseShared的源碼如下 

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        // 無限循環
        for (;;) {
            // 保存頭結點
            Node h = head;
            if (h != null && h != tail) { // 頭結點不為空並且頭結點不為尾結點
                // 獲取頭結點的等待狀態
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) { // 狀態為SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續
                        continue;            // loop to recheck cases
                    // 釋放后繼結點
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態為0並且不成功,繼續
                    continue;                // loop on failed CAS
            }
            if (h == head) // 若頭結點改變,繼續循環  
                break;
        }
    }
View Code

  說明:此函數在共享模式下釋放資源。

  所以,對CountDownLatch的countDown調用大致會有如下的調用鏈。

  說明:上圖給出了可能會調用到的主要方法,並非一定會調用到,之后,會通過一個示例給出詳細的分析。

四、示例

  下面給出了一個使用CountDownLatch的示例。 

package com.hust.grid.leesf.cyclicbarrier;

import java.util.concurrent.CountDownLatch;

class MyThread extends Thread {
    private CountDownLatch countDownLatch;
    
    public MyThread(String name, CountDownLatch countDownLatch) {
        super(name);
        this.countDownLatch = countDownLatch;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " doing something");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " finish");
        countDownLatch.countDown();
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyThread t1 = new MyThread("t1", countDownLatch);
        MyThread t2 = new MyThread("t2", countDownLatch);
        t1.start();
        t2.start();
        System.out.println("Waiting for t1 thread and t2 thread to finish");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }            
        System.out.println(Thread.currentThread().getName() + " continue");        
    }
}

  運行結果(某一次):  

Waiting for t1 thread and t2 thread to finish
t1 doing something
t2 doing something
t1 finish
t2 finish
main continue

  說明:本程序首先計數器初始化為2。根據結果,可能會存在如下的一種時序圖。

  說明:首先main線程會調用await操作,此時main線程會被阻塞,等待被喚醒,之后t1線程執行了countDown操作,最后,t2線程執行了countDown操作,此時main線程就被喚醒了,可以繼續運行。下面,進行詳細分析。

  ① main線程執行countDownLatch.await操作,主要調用的函數如下。

  說明:在最后,main線程就被park了,即禁止運行了。此時Sync queue(同步隊列)中有兩個節點,AQS的state為2,包含main線程的結點的nextWaiter指向SHARED結點。

  ② t1線程執行countDownLatch.countDown操作,主要調用的函數如下。

  說明:此時,Sync queue隊列里的結點個數未發生變化,但是此時,AQS的state已經變為1了。

  ③ t2線程執行countDownLatch.countDown操作,主要調用的函數如下。

  說明:經過調用后,AQS的state為0,並且此時,main線程會被unpark,可以繼續運行。當main線程獲取cpu資源后,繼續運行。

  ④ main線程獲取cpu資源,繼續運行,由於main線程是在parkAndCheckInterrupt函數中被禁止的,所以此時,繼續在parkAndCheckInterrupt函數運行。

  說明:main線程恢復,繼續在parkAndCheckInterrupt函數中運行,之后又會回到最終達到的狀態為AQS的state為0,並且head與tail指向同一個結點,該節點的額nextWaiter域還是指向SHARED結點。

五、總結

  經過分析CountDownLatch的源碼可知,其底層結構仍然是AQS,對其線程所封裝的結點是采用共享模式,而ReentrantLock是采用獨占模式。由於采用的共享模式,所以會導致后面的操作會有所差異,通過閱讀源碼就會很容易掌握CountDownLatch實現機制。謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM