本文基於 jdk 1.8 。
CountDownLatch 的使用
前面的文章中說到了 volatile 以及用 volatile 來實現自旋鎖,例如 java.util.concurrent.atomic 包下的工具類。但是 volatile 的使用場景畢竟有限,很多的情況下並不是適用,這個時候就需要 synchronized 或者各種鎖實現了。今天就來說一下幾種鎖的實現原理。
先來看一個最簡單的 CountDownLatch 使用方法,例子很簡單,可以運行看一下效果。CountDownLatch 的作用是:當一個線程需要另外一個或多個線程完成后,再開始執行。比如主線程要等待一個子線程完成環境相關配置的加載工作,主線程才繼續執行,就可以利用 CountDownLatch 來實現。
例如下面這個例子,首先實例化一個 CountDownLatch ,參數可以理解為一個計數器,這里為 1,然后主線程執行,調用 worker 子線程,接着調用 CountDownLatch 的 await() 方法,表示阻塞主線程。當子線程執行完成后,在 finnaly 塊調用 countDown() 方法,表示一個等待已經完成,把計數器減一,直到減為 0,主線程又開始執行。
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException{
System.out.println("主線程開始......");
Thread thread = new Thread(new Worker());
thread.start();
System.out.println("主線程等待......");
System.out.println(latch.toString());
latch.await();
System.out.println(latch.toString());
System.out.println("主線程繼續.......");
}
public static class Worker implements Runnable {
@Override
public void run() {
System.out.println("子線程任務正在執行");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
}finally {
latch.countDown();
}
}
}
執行結果如下:
主線程開始......
子線程任務正在執行
主線程等待......
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 1]
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 0]
主線程繼續.......
AQS 的原理
這么好用的功能是怎么實現的呢,下面就來說一說實現它的核心技術原理 AQS。 AQS 全稱 AbstractQueuedSynchronizer,是 java.util.concurrent 中提供的一種高效且可擴展的同步機制。它可以用來實現可以依賴 int 狀態的同步器,獲取和釋放參數以及一個內部FIFO等待隊列,除了CountDownLatch,ReentrantLock、Semaphore 等功能實現都使用了它。
接下來用 CountDownLatch 來分析一下 AQS 的實現。建議看文章的時候先大致看一下源碼,有助於理解下面所說的內容。
在我們的方法中調用 awit()和countDown()的時候,發生了幾個關鍵的調用關系,我畫了一個方法調用圖。

首先在 CountDownLatch 類內部定義了一個 Sync 內部類,這個內部類就是繼承自 AbstractQueuedSynchronizer 的。並且重寫了方法 tryAcquireShared和tryReleaseShared。例如當調用 awit()方法時,CountDownLatch 會調用內部類Sync 的 acquireSharedInterruptibly() 方法,然后在這個方法中會調用 tryAcquireShared 方法,這個方法就是 CountDownLatch 的內部類 Sync 里重寫的 AbstractQueuedSynchronizer 的方法。調用 countDown() 方法同理。
這種方式是使用 AbstractQueuedSynchronizer 的標准化方式,大致分為兩步:
1、內部持有繼承自 AbstractQueuedSynchronizer 的對象 Sync;
2、並在 Sync 內重寫 AbstractQueuedSynchronizer protected 的部分或全部方法,這些方法包括如下幾個:

之所以要求子類重寫這些方法,是為了讓使用者(這里的使用者指 CountDownLatch 等)可以在其中加入自己的判斷邏輯,例如 CountDownLatch 在 tryAcquireShared中加入了判斷,判斷 state 是否不為0,如果不為0,才符合調用條件。
tryAcquire和tryRelease是對應的,前者是獨占模式獲取,后者是獨占模式釋放。
tryAcquireShared和tryReleaseShared是對應的,前者是共享模式獲取,后者是共享模式釋放。
我們看到 CountDownLatch 重寫的方法 tryAcquireShared 實現如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
判斷 state 值是否為0,為0 返回1,否則返回 -1。state 值是 AbstractQueuedSynchronizer 類中的一個 volatile 變量。
private volatile int state;
在 CountDownLatch 中這個 state 值就是計數器,在調用 await 方法的時候,將值賦給 state 。
等待線程入隊
根據上面的邏輯,調用 await() 方法時,先去獲取 state 的值,當計數器不為0的時候,說明還有需要等待的線程在運行,則調用 doAcquireSharedInterruptibly 方法,進來執行的第一個動作就是嘗試加入等待隊列 ,即調用 addWaiter()方法, 源碼如下:
到這里就走到了 AQS 的核心部分,AQS 用內部的一個 Node 類維護一個 CHL Node FIFO 隊列。將當前線程加入等待隊列,並通過 parkAndCheckInterrupt()方法實現當前線程的阻塞。下面一大部分都是在說明 CHL 隊列的實現,里面用 CAS 實現隊列出入不會發生阻塞。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入等待隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// 進入 CAS 循環
try {
for (;;) {
//當一個節點(關聯一個線程)進入等待隊列后, 獲取此節點的 prev 節點
final Node p = node.predecessor();
// 如果獲取到的 prev 是 head,也就是隊列中第一個等待線程
if (p == head) {
// 再次嘗試申請 反應到 CountDownLatch 就是查看是否還有線程需要等待(state是否為0)
int r = tryAcquireShared(arg);
// 如果 r >=0 說明 沒有線程需要等待了 state==0
if (r >= 0) {
//嘗試將第一個線程關聯的節點設置為 head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//經過自旋tryAcquireShared后,state還不為0,就會到這里,第一次的時候,waitStatus是0,那么node的waitStatus就會被置為SIGNAL,第二次再走到這里,就會用LockSupport的park方法把當前線程阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我看看到上面先執行了 addWaiter() 方法,就是將當前線程加入等待隊列,源碼如下:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 嘗試快速入隊操作,因為大多數時候尾節點不為 null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾節點為空(也就是隊列為空) 或者嘗試CAS入隊失敗(由於並發原因),進入enq方法
enq(node);
return node;
}
上面是向等待隊列中添加等待者(waiter)的方法。首先構造一個 Node 實體,參數為當前線程和一個mode,這個mode有兩種形式,一個是 SHARED ,一個是 EXCLUSIVE,請看上面的代碼。然后執行下面的入隊操作 addWaiter,和 enq() 方法的 else 分支操作是一樣的,這里的操作如果成功了,就不用再進到 enq() 方法的循環中去了,可以提高性能。如果沒有成功,再調用 enq() 方法。
private Node enq(final Node node) {
// 死循環+CAS保證所有節點都入隊
for (;;) {
Node t = tail;
// 如果隊列為空 設置一個空節點作為 head
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//加入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
說明:循環加 CAS 操作是實現樂觀鎖的標准方式,CAS 是為了實現原子操作而出現的,所謂的原子操作指操作執行期間,不會受其他線程的干擾。Java 實現的 CAS 是調用 unsafe 類提供的方法,底層是調用 c++ 方法,直接操作內存,在 cpu 層面加鎖,直接對內存進行操作。
上面是 AQS 等待隊列入隊方法,操作在無限循環中進行,如果入隊成功則返回新的隊尾節點,否則一直自旋,直到入隊成功。假設入隊的節點為 node ,上來直接進入循環,在循環中,先拿到尾節點。
1、if 分支,如果尾節點為 null,說明現在隊列中還沒有等待線程,則嘗試 CAS 操作將頭節點初始化,然后將尾節點也設置為頭節點,因為初始化的時候頭尾是同一個,這和 AQS 的設計實現有關, AQS 默認要有一個虛擬節點。此時,尾節點不在為空,循環繼續,進入 else 分支;
2、else 分支,如果尾節點不為 null, node.prev = t ,也就是將當前尾節點設置為待入隊節點的前置節點。然后又是利用 CAS 操作,將待入隊的節點設置為隊列的尾節點,如果 CAS 返回 false,表示未設置成功,繼續循環設置,直到設置成功,接着將之前的尾節點(也就是倒數第二個節點)的 next 屬性設置為當前尾節點,對應 t.next = node 語句,然后返回當前尾節點,退出循環。
setHeadAndPropagate 方法負責將自旋等待或被 LockSupport 阻塞的線程喚醒。
private void setHeadAndPropagate(Node node, int propagate) {
//備份現在的 head
Node h = head;
//搶到鎖的線程被喚醒 將這個節點設置為head
setHead(node)
// propagate 一般都會大於0 或者存在可被喚醒的線程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 只有一個節點 或者是共享模式 釋放所有等待線程 各自嘗試搶占鎖
if (s == null || s.isShared())
doReleaseShared();
}
}
Node 對象中有一個屬性是 waitStatus ,它有四種狀態,分別是:
//線程已被 cancelled ,這種狀態的節點將會被忽略,並移出隊列
static final int CANCELLED = 1;
// 表示當前線程已被掛起,並且后繼節點可以嘗試搶占鎖
static final int SIGNAL = -1;
//線程正在等待某些條件
static final int CONDITION = -2;
//共享模式下 無條件所有等待線程嘗試搶占鎖
static final int PROPAGATE = -3;
等待線程被喚醒
當執行 CountDownLatch 的 countDown()方法,將計數器減一,也就是state減一,當減到0的時候,等待隊列中的線程被釋放。是調用 AQS 的 releaseShared 方法來實現的,下面代碼中的方法是按順序調用的,摘到了一起,方便查看:
// AQS類
public final boolean releaseShared(int arg) {
// arg 為固定值 1
// 如果計數器state 為0 返回true,前提是調用 countDown() 之前不能已經為0
if (tryReleaseShared(arg)) {
// 喚醒等待隊列的線程
doReleaseShared();
return true;
}
return false;
}
// CountDownLatch 重寫的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 依然是循環+CAS配合 實現計數器減1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/// AQS類
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果節點狀態為SIGNAL,則他的next節點也可以嘗試被喚醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 將節點狀態設置為PROPAGATE,表示要向下傳播,依次喚醒
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
因為這是共享型的,當計數器為 0 后,會喚醒等待隊列里的所有線程,所有調用了 await() 方法的線程都被喚醒,並發執行。這種情況對應到的場景是,有多個線程需要等待一些動作完成,比如一個線程完成初始化動作,其他5個線程都需要用到初始化的結果,那么在初始化線程調用 countDown 之前,其他5個線程都處在等待狀態。一旦初始化線程調用了 countDown ,其他5個線程都被喚醒,開始執行。
總結
1、AQS 分為獨占模式和共享模式,CountDownLatch 使用了它的共享模式。
2、AQS 當第一個等待線程(被包裝為 Node)要入隊的時候,要保證存在一個 head 節點,這個 head 節點不關聯線程,也就是一個虛節點。
3、當隊列中的等待節點(關聯線程的,非 head 節點)搶到鎖,將這個節點設置為 head 節點。
4、第一次自旋搶鎖失敗后,waitStatus 會被設置為 -1(SIGNAL),第二次再失敗,就會被 LockSupport 阻塞掛起。
5、如果一個節點的前置節點為 SIGNAL 狀態,則這個節點可以嘗試搶占鎖。
不妨到我的公眾號里互動一下 :古時的風箏

