一、前言
最近在研究java.util.concurrent
包下的一些的常用類,之前寫了AQS
、ReentrantLock
、ArrayBlockingQueue
以及LinkedBlockingQueue
的相關博客,今天這篇博客就來寫一寫並發包下的另一個常用類——CountDownLatch
。這里首先要說明一點,CountDownLatch
是基於AQS
實現的,AQS
才是真正實現了線程同步的組件,CountDownLatch
只是它的使用者,所以如果想要學習CountDownLatch,請一定先要弄懂AQS的實現原理。我以下的描述均建立在已經了解AQS
的基礎之上。我之前寫過一篇AQS
實現原理的分析博客,感興趣可以看一看:並發——抽象隊列同步器AQS的實現原理。
二、正文
2.1 抽象隊列同步器AQS
在說CountDownLatch
前,必須要先提一下AQS
。AQS
全稱抽象隊列同步器(AbstractQuenedSynchronizer),它是一個可以用來實現線程同步的基礎框架。當然,它不是我們理解的Spring
這種框架,它是一個類,類名就是AbstractQuenedSynchronizer
,如果我們想要實現一個能夠完成線程同步的鎖或者類似的同步組件,就可以在使用AQS
來實現,因為它封裝了線程同步的方式,我們在自己的類中使用它,就可以很方便的實現一個我們自己的鎖。
AQS
的實現相對復雜,無法通過短短的幾句話將其說清楚,我之前專門寫過一篇分析AQS
實現原理的博客:並發——抽象隊列同步器AQS的實現原理。
在閱讀下面的內容前,請一定要先學習AQS的實現原理,因為CountDownLatch
的實現非常簡單,完全就是依賴於AQS
的,所以我以下的描述均建立在已經理解AQS
的基礎之上。可以閱讀上面推薦博客,也可以自己去查閱相關資料。
2.2 CountDownLatch的實現原理
既然已經開始學習CountDownLatch
的實現原理了,那一定已經知道了它的作用,我這里就不詳細展示了,簡單介紹一下:CountDownLatch
的被稱為門栓,可以將它看成是門上的鎖,它會給門上多把鎖,只有每一把鎖都解開,才能通過。對於線程來說,CountDownLatch
會阻塞線程的運行,只有當CountDownLatc
內部記錄的值減小為0
,線程才能繼續向前執行。
CountDownLatch
底層通過AQS
實現,AQS
的一般使用方式就是以內部類的形式繼承它,CountDownLatch
就是這么使用它的。在CountDownLatch
內部有一個內部類Sync
,繼承自AQS
,並重寫了AQS
加鎖解鎖的方法,並通過Sync
的對象,調用AQS
的方法,阻塞線程的運行。我們知道,創建一個CountDownLatch
對象時,需要傳入一個整數值count
,只有當count
被減小為0
時線程才能通過await
方法,否則將被await
阻塞。這里實際上是這樣的:當線程運行到await方法時,需要去獲取鎖(鎖由AQS實現),若count不為0,則線程就會獲取鎖失敗,被阻塞;若count為0,則就能順利通過。CountDownLatch
是一次性的,因為沒有方法可以增加count
的值,也就是說,一旦count
被減小為0
,則之后就一直是0
了,也就再也不能阻塞線程了。下面我們就從源碼的角度來分析CountDownLatch
。
2.3 CountDownLatch的內部類
前面我們說過,CountDownLatch
內部定義了一個內部類Sync
,繼承自AQS
,通過這個內部類來實現線程阻塞,下面我們就來看一看這個內部類的實現:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
/** 構造方法,接收count值,只有count減小為0時,線程才不會被await方法阻塞 */
Sync(int count) {
// CountDownLatch利用AQS的方式就是直接讓count作為AQS的同步變量state
// 所以直接用state記錄count值
setState(count);
}
/** 獲取當前的count值 */
int getCount() {
return getState();
}
/**
* 這是AQS的模板方法acquireShared、acquireSharedInterruptibly等方法內部將會調用的方法,
* 由子類實現,這個方法的作用是嘗試獲取一次共享鎖,對於AQS來說,
* 此方法返回值大於等於0,表示獲取共享鎖成功,反之則獲取共享鎖失敗,
* 而在這里,實際上就是判斷count是否等於0,線程能否向下運行
*/
protected int tryAcquireShared(int acquires) {
// 此處判斷state的值是否為0,也就是判斷count是否為0,
// 若count為0,返回1,表示獲取鎖成功,此時線程將不會阻塞,正常運行
// 若count不為0,則返回-1,表示獲取鎖失敗,線程將會被阻塞
// 從這里我們已經可以看出CountDownLatch的實現方式了
return (getState() == 0) ? 1 : -1;
}
/**
* 此方法的作用是用來是否AQS的共享鎖,返回true表示釋放成功,反之則失敗
* 此方法將會在AQS的模板方法releaseShared中被調用,
* 在CountDownLatch中,這個方法用來減小count值
*/
protected boolean tryReleaseShared(int releases) {
// 使用死循環不斷嘗試釋放鎖
for (;;) {
// 首先獲取當前state的值,也就是count值
int c = getState();
// 若count值已經等於0,則不能繼續減小了,於是直接返回false
// 為什么返回的是false,因為等於0表示之前等待的那些線程已經被喚醒了,
// 若返回true,AQS會嘗試喚醒線程,若返回false,則直接結束,所以
// 在沒有線程等待的情況下,返回false直接結束是正確的
if (c == 0)
return false;
// 若count不等於0,則將其-1
int nextc = c-1;
// compareAndSetState的作用是將count值從c,修改為新的nextc
// 此方法基於CAS實現,保證了操作的原子性
if (compareAndSetState(c, nextc))
// 若nextc == 0,則返回的是true,表示已經沒有鎖了,線程可以運行了,
// 若nextc > 0,則表示線程還需要繼續阻塞,此處將返回false
return nextc == 0;
}
}
}
可以看到,內部類Sync的實現非常簡單,它只實現了AQS
中的兩個方法,即tryAcquireShared以及tryReleaseShared,這兩個方法是AQS
提供的使用共享鎖的接口。這也就表明,CountDownLatch
實際上是一種共享鎖機制,即鎖可以同時被多個線程獲取,這個不難理解,因為一旦count
被減小為0,則所有線程通過await
方法時,都能夠順利通過,不會因為獲取不到鎖而阻塞。而且從上面的實現中我們可以看到,Sync
直接將count
值作為AQS
的state
的值,只有state
的值為0,線程才能獲取鎖,也就是獲得執行權限。
2.4 CountDownLatch的成員變量和構造方法
下面來看一看CountDownLatch
的屬性和構造方法:
/**
* 只有一個成員變量,就是內部類Sync的一個對象,通過此對象調用AQS的方法,實現線程阻塞和喚醒
*/
private final Sync sync;
/**
* 只有一個構造方法,接收一個count值
*/
public CountDownLatch(int count) {
// count值不能小於0
if (count < 0) throw new IllegalArgumentException("count < 0");
// 直接創建一個Sync對象,並傳入count值,Sync內部將會執行setState(count)
this.sync = new Sync(count);
}
2.5 await方法分析
CountDownLatch
類最最核心的兩個方法就是await
以及ountDown
,我們先來看一看await
方法的實現:
// 此方法用來讓當前線程阻塞,直到count減小為0才恢復執行
public void await() throws InterruptedException {
// 這里直接調用sync的acquireSharedInterruptibly方法,這個方法定義在AQS中
// 方法的作用是嘗試獲取共享鎖,若獲取失敗,則線程將會被加入到AQS的同步隊列中等待
// 直到獲取成功為止。且這個方法是會響應中斷的,線程在阻塞的過程中,若被其他線程中斷,
// 則此方法會通過拋出異常的方式結束等待。
sync.acquireSharedInterruptibly(1);
}
await
的實現異常簡單,只有短短一行代碼,調用了AQS
中已經封裝好的方法。這就是AQS
的好處,AQS
已經實現了線程的阻塞和喚醒機制,將實現的復雜性隱藏,而其他類只需要簡單的使用它即可。為了方便理解,我們還是來看看acquireSharedInterruptibly
方法吧:
/** 此方法是AQS中提供的一個模板方法,用以獲取共享鎖,並且會響應中斷 */
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 首先判斷當前線程釋放被中斷,若被中斷,則直接拋出異常結束
if (Thread.interrupted())
throw new InterruptedException();
// 調用tryAcquireShared方法嘗試獲取鎖,這個方法被Sycn類重寫了,
// 若count == 0,則這個方法會返回1,表示獲取鎖成功,則這里會直接返回,線程不會被阻塞
// 若count < 0,將會執行下面的doAcquireSharedInterruptibly方法,
// 此處請去查看Sync中tryAcquireShared方法的實現
if (tryAcquireShared(arg) < 0)
// 下面這個方法的作用是,線程獲取鎖失敗,將會加入到AQS的同步隊列中阻塞等待,
// 直到成功獲取到鎖,而此處成功獲取到鎖的條件就是count == 0,若當前線程在等待的過程中,
// 成功地獲取了鎖,則它會繼續喚醒在它后面等待的線程,也嘗試獲取鎖,
// 這也就是說,只要count == 0了,則所有被阻塞的線程都能恢復運行
doAcquireSharedInterruptibly(arg);
}
相信看到這里,對CountDownLatch
的實現原理已經有一個比較清晰的理解了。CountDownLatch
的實現完全就是依賴於AQS
的,所有再次提醒,如果以上內容理解不了,請先去學習AQS
。
2.6 countDown方法分析
下面我們來分析CountDownLatch
中另一個核心的方法——countDown
,
/**
* 此方法的作用就是將count的值-1,如果count等於0了,就喚醒等待的線程
*/
public void countDown() {
// 這里直接調用sync的releaseShared方法,這個方法的實現在AQS中,也是AQS提供的模板方法,
// 這個方法的作用是當前線程釋放鎖,若釋放失敗,返回false,若釋放成功,則返回false,
// 若鎖被釋放成功,則當前線程會喚醒AQS同步隊列中第一個被阻塞的線程,讓他嘗試獲取鎖
// 對於CountDownLatch來說,釋放鎖實際上就是讓count - 1,只有當count被減小為0,
// 鎖才是真正被釋放,線程才能繼續向下運行
sync.releaseShared(1);
}
為了方便理解,我們還是來看一看AQS
中releaseShared
方法的實現:
public final boolean releaseShared(int arg) {
// 調用tryReleaseShared嘗試釋放鎖,這個方法已經由Sycn重寫,請回顧上面對此方法的分析
// 若tryReleaseShared返回true,表示count經過這次釋放后,等於0了,於是執行doReleaseShared
if (tryReleaseShared(arg)) {
// 這個方法的作用是喚醒AQS的同步隊列中,正在等待的第一個線程
// 而我們分析acquireSharedInterruptibly方法時已經說過,
// 若一個線程被喚醒,檢測到count == 0,會繼續喚醒下一個等待的線程
// 也就是說,這個方法的作用是,在count == 0時,喚醒所有等待的線程
doReleaseShared();
return true;
}
return false;
}
三、總結
如果直接去看CountDownLatch
的源碼會發現,它的實現真的非常簡單,包括注釋在內,總共300
行代碼,除去注釋,連100
行代碼都不到。因為它所作的工作,除了重寫AQS
的兩個方法外,其余的基本上就是調用AQS
提供的模板方法而已。所以,理解CountDownLatch
的過程,實際上是理解AQS
的過程,只要理解了AQS
,看懂CountDownLatch
的原理,不需要5
分鍾。AQS
真的是Java
並發中非常重要的一個組件,很多類都是基於它實現的,比如還有ReentrantLock
,同時AQS
也是面試中的常考點,所以一定要好好研究。最后再次推薦我之前編寫的有關AQS
的源碼分析博客:並發——抽象隊列同步器AQS的實現原理。
四、參考
- JDK1.8源碼