CountDownLatch 是一個同步工具類,允許一個線程或者多個線程等待其他線程完成操作,再執行。
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。 // 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。 void await() // 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。 void countDown() // 返回當前計數。 long getCount() // 返回標識此鎖存器及其狀態的字符串。 String toString()
CountDownLatch和CyclicBarrier的區別:
(1).CountDownLatch 的作用是允許1或者多個線程,等待另外N個線程完成某件事情之后,這1個或者多個線程才能執行。CyclicBarrier 是N個線程相互等待,任何一個線程完成任務之前,所有的線程必須等待。
(2).CountDownLatch 計數器是一次性的,無法被重置的,而CyclicBarrier的計數器在調用reset方法之后,還可以重新使用,因此被稱為循環的barrier。
CountDownLatch 底層實現:
1.構造方法:創建一個Sync對象,而Sync繼承AQS。
/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
2.Sync 是CountDownLatch的內部私有類,組合到CountDownLatch里:
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync;
在AQS中state是一個private volatile int類型的對象。CountDownLatch使用state來計數,CountDownLatch的getCount最終調用的是AQS的getState()
,返回state進行計數。
3.await()方法:調用AQS的acquireSharedInterruptibly方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
//1.獲取共享鎖 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//判斷線程是否為中斷狀態,如果是拋出interruptedException if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取共享鎖,嘗試成功就返回,否則調用doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
//2.嘗試獲取共享鎖,重寫AQS里面的方法 protected int tryAcquireShared(int acquires) { //鎖狀態 == 0,表示所沒有被任何線程所獲取,即是可獲取的狀態,否則鎖是不可獲取的狀態 return (getState() == 0) ? 1 : -1; }
//3.doAcquireSharedInterruptibly方法會使得當前線程一直等待,直到當前線程獲取到鎖(或被中斷)才返回 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //創建“當前線程”的Node節點,且node中記錄的鎖是“共享鎖”類型,並將節點添加到CLH隊列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲取前繼節點,如果前繼節點是等待鎖隊列的表頭,則嘗試獲取共享鎖 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前繼節點不是表頭,當前線程一直等待,直到獲取到鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
/*說明:4.shouldParkAfterFailedAcquire 返回當前線程是否應該阻塞 (01) 關於waitStatus請參考下表(中擴號內為waitStatus的值),更多關於waitStatus的內容,可以參考前面的Node類的介紹。 CANCELLED[1] -- 當前線程已被取消 SIGNAL[-1] -- “當前線程的后繼線程需要被unpark(喚醒)”。一般發生情況是:當前線程的后繼線程處於阻塞狀態,而當前線程被release或cancel掉,因此需要喚醒當前線程的后繼線程。 CONDITION[-2] -- 當前線程(處在Condition休眠狀態)在等待Condition喚醒 PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖” [0] -- 當前線程不屬於上面的任何一種狀態。 (02) shouldParkAfterFailedAcquire()通過以下規則,判斷“當前線程”是否需要被阻塞。 規則1:如果前繼節點狀態為SIGNAL,表明當前節點需要被unpark(喚醒),此時則返回true。 規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前繼節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false。 規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,並返回false。 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的狀態 int ws = pred.waitStatus; // 如果前驅節點是SIGNAL狀態,則意味着當前線程需要unpark喚醒,此時返回true if (ws == Node.SIGNAL) return true; // 如果前繼節點是取消的狀態,則設置當前節點的“當前前繼節點為”原節點的前繼節點 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure //it cannot acquire before parking. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
4. countDown()源碼 :
//1.該方法其實調用AQS中的releaseShared(1)釋放共享鎖方法。
public void countDown() { sync.releaseShared(1); }
//2.目的是讓當前線程釋放它所持有的共享鎖,它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
//3.tryReleaseShared()在CountDownLatch.java中被重寫,釋放共享鎖,將鎖計數器-1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 獲取“鎖計數器”的狀態 int c = getState(); if (c == 0) return false; // “鎖計數器”-1 int nextc = c-1; // 通過CAS函數進行賦值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }
實例:
public class CountDownLatchTest1 { private static int SPORTSMAN_COUNT = 10; private static final Random random = new Random(); // 用於判斷發令之前運動員是否已經進入准備狀態,需要等待10個運動員准備就緒,占有鎖,等待10個運動員完成,釋放鎖。 private static CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT); // 用於判斷裁判是否已經發令,占有鎖,等待裁判發令完成,釋放鎖 private static CountDownLatch startLatch = new CountDownLatch(1); public static void main(String[] args) { // 用於判斷發令之前運動員是否已經進入准備狀態,需要等待10個運動員准備就緒,占有鎖,等待10個運動員完成,釋放鎖。 // CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT); // 用於判斷裁判是否已經發令,占有鎖,等待裁判發令完成,釋放鎖 // CountDownLatch startLatch = new CountDownLatch(1); // 啟動10個線程,也就是10個運動員,做准備工作 for (int i = 0; i < SPORTSMAN_COUNT; i++) { Thread t = new Thread(new MyTask((i + 1) + "號運動員", readyLatch, startLatch)); t.start(); } // 當前運動員在其他運動員准備就緒前一直等待,也就是說等readyLatch倒數計數器為0之前一直等待 try { readyLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 裁判發令,釋放鎖 startLatch.countDown(); System.out.println("裁判:所有運動員准備完畢,開始..."); } static class MyTask implements Runnable { private Lock lock = new ReentrantLock(); private CountDownLatch ready; private CountDownLatch start; private String name; /** * * (構造方法) * * @param ready * @param start * @param name 運動員名稱 */ public MyTask(String name, CountDownLatch ready, CountDownLatch start) { this.ready = ready; this.start = start; this.name = name; } @Override public void run() { lock.lock(); try { // 1. 寫運動員准備就緒的邏輯,准備readyTime秒 int readyTime = random.nextInt(1000); System.out.println(name + ":我需要" + readyTime + "秒的時間准備。"); try { Thread.sleep(readyTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + "我已經准備完畢!"); // 釋放鎖readyLatch-1,表示一個運動員已經就緒 ready.countDown(); try { // 等待裁判發開始命令 start.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + ":開跑..."); } catch (Exception e) { // TODO: handle exception } finally { lock.unlock(); } } } }
運行結果:
1號運動員:我需要757秒的時間准備。
2號運動員:我需要9秒的時間准備。
3號運動員:我需要602秒的時間准備。
4號運動員:我需要232秒的時間准備。
5號運動員:我需要454秒的時間准備。
6號運動員:我需要440秒的時間准備。
7號運動員:我需要333秒的時間准備。
8號運動員:我需要406秒的時間准備。
9號運動員:我需要613秒的時間准備。
10號運動員:我需要121秒的時間准備。
2號運動員我已經准備完畢!
10號運動員我已經准備完畢!
4號運動員我已經准備完畢!
7號運動員我已經准備完畢!
8號運動員我已經准備完畢!
6號運動員我已經准備完畢!
5號運動員我已經准備完畢!
3號運動員我已經准備完畢!
9號運動員我已經准備完畢!
1號運動員我已經准備完畢!
裁判:所有運動員准備完畢,開始...
10號運動員:開跑...
8號運動員:開跑...
3號運動員:開跑...
1號運動員:開跑...
2號運動員:開跑...
9號運動員:開跑...
5號運動員:開跑...
6號運動員:開跑...
7號運動員:開跑...
4號運動員:開跑...
總結:CountDownLatch通過AQS里面的共享鎖來實現的,在創建CountDownLatch時候,會傳遞一個參數count,該參數是鎖計數器的初始狀態,表示該共享鎖能夠被count個線程同時獲取。當某個線程調用CountDownLatch對象的await方法時候,該線程會等待共享鎖可獲取時,才能獲取共享鎖繼續運行,而共享鎖可獲取的的條件是state == 0,而鎖倒數計數器的初始值為count,每當一個線程調用該CountDownLatch對象的countDown()方法時候,計數器才-1,所以必須有count個線程調用該countDown()方法后,鎖計數器才為0,這個時候等待的線程才能繼續運行。