JUC系列回顧之-CountDownLatch底層原理和示例


 

CountDownLatch 是一個同步工具類,允許一個線程或者多個線程等待其他線程完成操作,再執行。

CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。

// 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。
void await()
// 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。
void countDown()
// 返回當前計數。
long getCount()
// 返回標識此鎖存器及其狀態的字符串。
String toString()

CountDownLatch和CyclicBarrier的區別:

(1).CountDownLatch 的作用是允許1或者多個線程,等待另外N個線程完成某件事情之后,這1個或者多個線程才能執行。CyclicBarrier 是N個線程相互等待,任何一個線程完成任務之前,所有的線程必須等待。

(2).CountDownLatch 計數器是一次性的,無法被重置的,而CyclicBarrier的計數器在調用reset方法之后,還可以重新使用,因此被稱為循環的barrier。

 

CountDownLatch 底層實現:

 

1.構造方法:創建一個Sync對象,而Sync繼承AQS。

 /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

2.Sync 是CountDownLatch的內部私有類,組合到CountDownLatch里:

 /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

在AQS中state是一個private volatile int類型的對象。CountDownLatch使用state來計數,CountDownLatch的getCount最終調用的是AQS的getState()

,返回state進行計數。

 

3.await()方法:調用AQS的acquireSharedInterruptibly方法

  public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 

 //1.獲取共享鎖
 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
//判斷線程是否為中斷狀態,如果是拋出interruptedException
if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取共享鎖,嘗試成功就返回,否則調用doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

 

//2.嘗試獲取共享鎖,重寫AQS里面的方法
protected int tryAcquireShared(int acquires) {
    //鎖狀態 == 0,表示所沒有被任何線程所獲取,即是可獲取的狀態,否則鎖是不可獲取的狀態
    return (getState() == 0) ? 1 : -1;
}

 

//3.doAcquireSharedInterruptibly方法會使得當前線程一直等待,直到當前線程獲取到鎖(或被中斷)才返回
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //創建“當前線程”的Node節點,且node中記錄的鎖是“共享鎖”類型,並將節點添加到CLH隊列末尾。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //獲取前繼節點,如果前繼節點是等待鎖隊列的表頭,則嘗試獲取共享鎖
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //前繼節點不是表頭,當前線程一直等待,直到獲取到鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

 /*說明:4.shouldParkAfterFailedAcquire 返回當前線程是否應該阻塞
    (01) 關於waitStatus請參考下表(中擴號內為waitStatus的值),更多關於waitStatus的內容,可以參考前面的Node類的介紹。

    CANCELLED[1]  -- 當前線程已被取消
    SIGNAL[-1]    -- “當前線程的后繼線程需要被unpark(喚醒)”。一般發生情況是:當前線程的后繼線程處於阻塞狀態,而當前線程被release或cancel掉,因此需要喚醒當前線程的后繼線程。
    CONDITION[-2] -- 當前線程(處在Condition休眠狀態)在等待Condition喚醒
    PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖”
    [0]           -- 當前線程不屬於上面的任何一種狀態。
    (02) shouldParkAfterFailedAcquire()通過以下規則,判斷“當前線程”是否需要被阻塞。

    規則1:如果前繼節點狀態為SIGNAL,表明當前節點需要被unpark(喚醒),此時則返回true。
    規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前繼節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false。
    規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,並返回false。
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驅節點的狀態
        int ws = pred.waitStatus;
        // 如果前驅節點是SIGNAL狀態,則意味着當前線程需要unpark喚醒,此時返回true
        if (ws == Node.SIGNAL)
            
            return true;
        // 如果前繼節點是取消的狀態,則設置當前節點的“當前前繼節點為”原節點的前繼節點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             // waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure
             //it cannot acquire before parking.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

4. countDown()源碼 :

 

//1.該方法其實調用AQS中的releaseShared(1)釋放共享鎖方法。
public
void countDown() { sync.releaseShared(1); }
//2.目的是讓當前線程釋放它所持有的共享鎖,它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//3.tryReleaseShared()在CountDownLatch.java中被重寫,釋放共享鎖,將鎖計數器-1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 獲取“鎖計數器”的狀態
        int c = getState();
        if (c == 0)
            return false;
        // “鎖計數器”-1
        int nextc = c-1;
        // 通過CAS函數進行賦值。
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

 

實例:

public class CountDownLatchTest1 {
    private static int SPORTSMAN_COUNT = 10;
    private static final Random random = new Random();
    // 用於判斷發令之前運動員是否已經進入准備狀態,需要等待10個運動員准備就緒,占有鎖,等待10個運動員完成,釋放鎖。
    private static CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT);
    // 用於判斷裁判是否已經發令,占有鎖,等待裁判發令完成,釋放鎖
    private static CountDownLatch startLatch = new CountDownLatch(1);

    public static void main(String[] args) {

        // 用於判斷發令之前運動員是否已經進入准備狀態,需要等待10個運動員准備就緒,占有鎖,等待10個運動員完成,釋放鎖。
        // CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT);
        // 用於判斷裁判是否已經發令,占有鎖,等待裁判發令完成,釋放鎖
        // CountDownLatch startLatch = new CountDownLatch(1);

        // 啟動10個線程,也就是10個運動員,做准備工作
        for (int i = 0; i < SPORTSMAN_COUNT; i++) {
            Thread t = new Thread(new MyTask((i + 1) + "號運動員", readyLatch, startLatch));
            t.start();
        }
        // 當前運動員在其他運動員准備就緒前一直等待,也就是說等readyLatch倒數計數器為0之前一直等待
        try {
            readyLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 裁判發令,釋放鎖
        startLatch.countDown();

        System.out.println("裁判:所有運動員准備完畢,開始...");

    }

    static class MyTask implements Runnable {

        private Lock lock = new ReentrantLock();

        private CountDownLatch ready;
        private CountDownLatch start;
        private String name;

        /**
         * 
         * (構造方法)  
         *   
         * @param ready
         * @param start
         * @param name 運動員名稱
         */
        public MyTask(String name, CountDownLatch ready, CountDownLatch start) {
            this.ready = ready;
            this.start = start;
            this.name = name;
        }

        @Override
        public void run() {
            lock.lock();
            try {

                // 1. 寫運動員准備就緒的邏輯,准備readyTime秒
                int readyTime = random.nextInt(1000);
                System.out.println(name + ":我需要" + readyTime + "秒的時間准備。");
                try {
                    Thread.sleep(readyTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(name + "我已經准備完畢!");
                // 釋放鎖readyLatch-1,表示一個運動員已經就緒
                ready.countDown();
                try {
                    // 等待裁判發開始命令
                    start.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(name + ":開跑...");
            } catch (Exception e) {
                // TODO: handle exception
            } finally {
                lock.unlock();
            }

        }

    }

}

運行結果:

1號運動員:我需要757秒的時間准備。
2號運動員:我需要9秒的時間准備。
3號運動員:我需要602秒的時間准備。
4號運動員:我需要232秒的時間准備。
5號運動員:我需要454秒的時間准備。
6號運動員:我需要440秒的時間准備。
7號運動員:我需要333秒的時間准備。
8號運動員:我需要406秒的時間准備。
9號運動員:我需要613秒的時間准備。
10號運動員:我需要121秒的時間准備。
2號運動員我已經准備完畢!
10號運動員我已經准備完畢!
4號運動員我已經准備完畢!
7號運動員我已經准備完畢!
8號運動員我已經准備完畢!
6號運動員我已經准備完畢!
5號運動員我已經准備完畢!
3號運動員我已經准備完畢!
9號運動員我已經准備完畢!
1號運動員我已經准備完畢!
裁判:所有運動員准備完畢,開始...
10號運動員:開跑...
8號運動員:開跑...
3號運動員:開跑...
1號運動員:開跑...
2號運動員:開跑...
9號運動員:開跑...
5號運動員:開跑...
6號運動員:開跑...
7號運動員:開跑...
4號運動員:開跑...

 

總結:CountDownLatch通過AQS里面的共享鎖來實現的,在創建CountDownLatch時候,會傳遞一個參數count,該參數是鎖計數器的初始狀態,表示該共享鎖能夠被count個線程同時獲取。當某個線程調用CountDownLatch對象的await方法時候,該線程會等待共享鎖可獲取時,才能獲取共享鎖繼續運行,而共享鎖可獲取的的條件是state == 0,而鎖倒數計數器的初始值為count,每當一個線程調用該CountDownLatch對象的countDown()方法時候,計數器才-1,所以必須有count個線程調用該countDown()方法后,鎖計數器才為0,這個時候等待的線程才能繼續運行。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM