文章篇幅較短,對於一些
AQS
的頂級方法例如releaseShared
並沒有做過深的講解,因為這些算是AQS
的范疇,關於AQS
可以看下另一篇文章——AQS。
CountDownLatch
一般被稱作"計數器",作用大致就是數量達到了某個點之后計數結束,才能繼續往下走。可以用作流程控制之類的作用,大流程分成多個子流程,然后大流程在子流程全部結束之前不動(子流程最好是相互獨立的,除非能很好的控制兩個流程的關聯關系),子流程全部結束后大流程開始操作。
很抽象,小問題,下方的兩節或許能讓你理解CountDownLatch
的用法和內部的實現。
1.CountDownLatch的使用
假設現在,我們要起一個3塊錢的集資項目,並且限定每個人一次只能捐1塊錢當募集到3塊錢的時候立馬就把這筆錢捐給我自己,如果湊齊之后你還想捐,那么我會跟你說,項目已經完成了,你這一塊錢我不受理,自己去買雪糕吃吧;如果沒湊齊,那么我這個募集箱就一直掛在這里。這個場景用CountDownLatch
可以很契合的模擬出來。
字數也不湊了,直接看demo
例子吧
public static void main(String[] args) throws InterruptedException {
// 集資項目==========>啟動,目標3塊錢
CountDownLatch countDownLatch = new CountDownLatch(3);
ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("張1准備捐一塊錢");
countDownLatch.countDown();
System.err.println("張1捐了一塊錢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("張2准備捐一塊錢");
countDownLatch.countDown();
System.err.println("張2捐了一塊錢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("張3准備捐一塊錢");
countDownLatch.countDown();
System.err.println("張3捐了一塊錢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.err.println("我項目啟動后,就在這里等人捐錢,不夠3塊我不走了");
countDownLatch.await();
System.err.println("3塊錢到手,直接跑路");
executor.shutdown();
}
結果圖:
這個結果,em,可以看到countDownLatch
使用的幾個注意點:
- 調用
countDownLatch
的await()
方法的線程會阻塞,直到湊夠3塊錢為止 - 跟CyclicBarrier不同,其計完數之后並不會阻塞,而是直接執行接下來的操作
- 每次調用
countDown()
方法都會捐一塊錢(計數一次),滿了之后調用await()
方法的線程不再阻塞
另外,在上面的代碼中,在countDown
方法之后還打印信息是為了驗證countDown
方法不會阻塞當前線程,執行結果不一定如上圖那樣有順序的,例如可能出現下方的結果:
因為最后一個countDown
之后,await
所在的線程不再阻塞了,又正好趕上JVM
線程調度,所以就會出現上方的結果。
2.CountDownLatch的內部實現
剛才已經講了CountDownLatch
的用法,用起來還是不難的。那來看下內部是怎么實現的,又是怎么做到計數之后不跟CyclicBarrier一樣阻塞的呢?
首先來看構造函數吧,CountDownLatch
只有一個構造函數,如下
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
所做的事情也就只有初始化內部對象sync
一件事情(校驗總不能算一件事吧?),那來看下初始化了個啥玩意
// 變量sync,是不是看起來很眼熟?
private final Sync sync;
// 內部類Sync,又是一個AQS的產物
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 構造方法,就是設置了AQS的state值
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/*
* 可以知道countDownLatch使用的是AQS的共享模式
* 獲取資源方法,正數表示成功,負數表示失敗
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 釋放方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
// state的狀態,在countDownLatch中表示剩余計數量,如果為0則表示可以被獲取,即await方法不再阻塞
int c = getState();
if (c == 0)
return false;
// 本次計數后還剩余的及數量
int nextc = c-1;
// CAS設置剩余計數量
if (compareAndSetState(c, nextc))
// ==0表示鎖釋放了,之后state的值將一直是0,意思就是之后的await方法都不再阻塞
return nextc == 0;
}
}
既然涉及到了AQS
,那你應該懂我意思了——快去看我寫的AQS文章啊。
開個玩笑,我知道各位都多多少少了解一些,上方代碼的作用應該知道是干嘛的,不懂也沒關系,等下我在下面再講。
回到正題,來講下從上方代碼能得到什么信息
1.
CountDownLatch
構造函數count
的參數作用就是設置其內部的AQS
的狀態state
,假設count
為3,那么每次進行countDown
,AQS
的state
就減1,減到0的時候await
方法就不再阻塞,注意這時候await方法就不再阻塞了,無論你調多少次。2.
CountDownLatch
里邊的Sync
實現的AQS
的共享模式(從tryReleaseShared
方法可以看出)
到這里對其CountDownLatch
的內部有個差不多印象了,接下來看下其最重要的await
和countDown
方法。
2.1 await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
直接調用了AQS
的頂級方法,再進去就是AQS
的模塊了
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 獲取資源,成功直接返回,失敗執行下方方法(進入同步隊列)
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
簡單說明一下,這個方法的意思就是調用tryAcquireShared
的方法嘗試獲取資源,方法返回負數表示失敗,返回正數則表示成功;失敗了則入同步隊列(即阻塞),具體的細節可以看下AQS的詳解。
也就是說關鍵點是 tryAcquireShared
方法,這個方法剛才在上方已經解釋過,這里再放一次。方法邏輯很簡單,如果state
=0(即計數完畢)則成功,否則失敗。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
okay,await
方法的整個流程大致就是:嘗試獲取資源,如果失敗則阻塞,成功了繼續當前線程的操作。什么時候會失敗呢,在state
!=0的時候,而state
這個變量的值我們在構造函數就已經賦予了,需要通過countDown
方法來減少。
2.2 countDown
既然這個方法這么重要,那讓它開始它的表演吧。
public void countDown() {
sync.releaseShared(1);
}
同樣的,直接調用AQS
的頂級釋放資源的方法。
public final boolean releaseShared(int arg) {
// 如果資源釋放了,那么喚醒同步隊列中等待的線程
if (tryReleaseShared(arg)) {
// 善后操作
doReleaseShared();
return true;
}
return false;
}
關鍵的方法還是在資源的控制上——tryReleaseShared
,代碼如下(上方也有):
protected boolean tryReleaseShared(int releases) {
for (;;) {
/*
* state的狀態,在countDownLatch中表示剩余計數量
* 如果為0則表示可以被獲取,即await方法不再阻塞
*/
int c = getState();
// 這里的意思是如果資源已經釋放的情況下,就不能再次釋放了,釋放成功的代碼在最后一行
if (c == 0)
return false;
// 本次計數后還剩余的及數量
int nextc = c-1;
// CAS設置剩余計數量
if (compareAndSetState(c, nextc))
// ==0表示鎖釋放了,之后state的值將一直是0,意思就是之后的await方法都不再阻塞
return nextc == 0;
}
}
到這里countDown
方法的迷霧也看清了,每一次調用countDown
方法就相當於調用tryReleaseShared
方法,如果當前資源還沒釋放的話,將state-1,判斷是否為0,如果為0的話表示資源釋放,喚醒await
方法的線程,否則的話只是更新state
的值。
整理一下整個CountDownLatch
的流程。
1.創建一個
CountDownLatch
,並賦予一個數值,這個值表示需要計數的次數,每次countDown
算一次2.在主線程調用
await
方法,表示需要計數器完成之前都不能動。await
方法的內部實現依賴於內部的AQS
,調用await
方法的時候會嘗試去獲取資源,成功條件是state
=0,也就是說除非countDown
了count
(構造函數賦予)次之后,才能成功,失敗的話當前線程進行休眠。
3.在子線程調用
countDown
方法,每次調用都會使內部的state
-1,state
為0的時候資源釋放,await
方法不再阻塞(即使再次調用也是)
3. 小結
如果理解AQS的話,不止CountDownLatch
,其他衍生物例如ReentrantLock
都能輕易的看懂。如果不了解的話也沒關系,這篇文章應該能讓你對CountDownLatch
的內部實現有了大概的輪廓。
簡單總結一下,CountDownLatch
就三個點:構造函數的值、await
、countDown
。構造函數的值表示計數的次數,每次countDown
都會使計數減一,減到0的時候await
方法所在的線程就不再阻塞。
這篇文章寫得,自己都有點不好意思了...