在實際開發中,碰上CPU密集且執行時間非常耗時的任務,通常我們會選擇將該任務進行分割,以多線程方式同時執行若干個子任務,等這些子任務都執行完后再將所得的結果進行合並。這正是著名的map-reduce思想,不過map-reduce通常被用在分布式計算的語境下,這里舉這個例子只是為了說明對多線程並發執行流程進行控制的重要性,比如某些線程必須等其他線程執行完后才能開始它的工作。使用jdk中的內置鎖或者重入鎖配合等待通知機制可以實現這個需求,但是會比較麻煩。因為不管是內置還是重入鎖,它們關注的重點在於如何協調多線程對共享資源的訪問,而不是協調特定線程的執行次序,完成復雜的並發流程控制。好在JDK在並發包下提供了CountDownLatch,CyclicBarrier,Semaphore等並發工具,可以讓我們站在更高的角度思考並解決這個問題。
2. 閉鎖CountDownLatch
2.1 CountDownLatch功能簡介
CountDownLatch通常稱之為閉鎖。它可以使一個或一批線程在閉鎖上等待,等到其他線程執行完相應操作后,閉鎖打開,這些等待的線程才可以繼續執行。確切的說,閉鎖在內部維護了一個倒計數器。通過該計數器的值來決定閉鎖的狀態,從而決定是否允許等待的線程繼續執行。該計數器的初始值由用戶在創建閉鎖對象時通過傳入的構造參數決定,如下所示
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
默認計數器初始值不能小於0,否則將拋出異常。
當計數器的值大於0時,該閉鎖處於關閉狀態,調用閉鎖的await()方法將導致當前線程在閉鎖上等待。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
但是我們可以通過調用閉鎖的countDown()方法來使閉鎖的計數值減少
public void countDown() {
sync.releaseShared(1);
}
每調用一次countDown()方法都會使閉鎖的計數值減少1,所以閉鎖的計數器准確來說是個倒計數器。當計數值減少到0時,阻塞在閉鎖上的線程將被喚醒從而繼續執行。下面以一個類似map-reduce的例子來對CountDownLatch的用法做講解。
2.2 使用CountDownLatch
為了計算一個CPU密集型的大任務,將該任務分割成10個子任務,交由開啟的10個子線程去執行。當所有子任務執行完畢后,主線程再執行后續的工作。任務的執行時間以線程休眠進行模擬,整個流程以日志方式進行記錄。完整代碼如下
/**
* @author: takumiCX
* @create: 2018-09-17
**/
class CountDownLatchTest {
static CountDownLatch countDownLatch;
public static void main(String[] args) throws InterruptedException {
int count=10;
//初始化計數器值為10
countDownLatch=new CountDownLatch(count);
//開啟10個子線程執行子任務
for(int i=0;i<count;i++){
Thread thread = new Thread(new CountDownThread(countDownLatch,i));
thread.start();
}
//主線程等待,直到所有子任務完成
countDownLatch.await();
//模擬主線程執行后續工作
TimeUnit.SECONDS.sleep(1);
System.out.println("任務執行完畢!");
}
private static class CountDownThread implements Runnable{
CountDownLatch countDownLatch;
//子任務序號
int taskNum;
public CountDownThread(CountDownLatch countDownLatch, int taskNum) {
this.countDownLatch = countDownLatch;
this.taskNum = taskNum;
}
@Override
public void run() {
try {
//模擬子任務的執行
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任務執行完畢,則調用countDown方法使計數器值減少1
countDownLatch.countDown();
System.out.println("子任務:"+taskNum+" 執行完畢!");
}
}
}
結果如下所示

可以看到主線程在所有子任務執行完前必須在閉鎖上等待。當最后一個子任務完成后,它將被喚醒,從而可以繼續之后的工作。
2.3 CountDownLatch原理淺析
CountDownLatch底層也是通過AQS實現的。和ReentrentLock以獨占的方式獲取和釋放同步狀態不同,CountDownLatch是以共享的方式獲取和釋放同步狀態的。獨占式和共享式的區別主要有以下幾點:
- 1.獨占式一次只允許一個線程獲取同步狀態,而共享式一次允許多個線程同時獲取同步狀態。
- 2.當在同步隊列等待的線程被喚醒然后成功獲取同步狀態時,它還必須喚醒后續結點中的線程,並將這個過程傳遞下去,使得多個線程可以同時獲取到同步狀態。
同步狀態依舊使用AQS中的state值進行表示,在CountDownLatch的語境下表示計數器的值,且只有在state=0時線程才能成功獲取到同步狀態,盡管有些奇怪,不過考慮到CountDownLatch中的計數器是個倒計數器,這么設定也並非不可理解。為了更好的理解CountDownLatch的源碼,從釋放同步狀態的方法countDown()開始講起
public void countDown() {
sync.releaseShared(1);
}
正確找到sync的實現類后跟進源碼
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //嘗試在共享模式下釋放同步狀態
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()嘗試在共享模式下釋放同步狀態,該方法是在AQS中定義的鈎子方法,必須由AQS的實現類自己實現,方法內容如下
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); //獲取同步狀態值
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) //以CAS方式更新同步狀態值
return nextc == 0;
}
}
使用死循環+CAS方式將計數值state減少1。僅當更新操作成功且state值被更新為0時返回true,表示在共享模式下釋放同步狀態成功,接着便會繼續執行doReleaseShared()方法,方法內容如下
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒后繼結點中的線程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
該方法主要完成的工作是喚醒頭結點之后的結點中的線程。那么其他在同步隊列中等待的線程使如何被喚醒的?別急,我們可以在await()方法中找到答案。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
找到sync正確的實現類后跟進源碼
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared()是在共享模式下嘗試獲取同步狀態,
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
當同步狀態值state=0時返回1,表示獲取同步狀態成功,否則返回-1表示獲取同步狀態失敗。獲取同步狀態失敗的線程顯然應該加入同步等待隊列並在隊列中等待,這部分邏輯我們在解讀ReentrentLock的源碼時應該已經看過了,不過在共享模式下細節方面有些不同
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//構造結點並加入同步隊列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//當前驅結點是頭結點時獲取同步狀態
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第一步自然是構造結點並加入同步隊列尾部,這部分邏輯在addWaiter()方法中,注意結點類型為共享類型。之后的邏輯和獨占模式類似,檢查前驅結點是否是隊列的頭結點,是則嘗試獲取同步狀態,成功則將當前結點設置為隊列頭結點,失敗則阻塞當前線程並等待喚醒並重新執行以上流程。不過在共享模式下,當前線程在成功獲取同步狀態並設置自身為頭結點后,還必須做些額外的工作:當后繼結點為共享類型時,喚醒后繼結點中的線程。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //設置當前結點為隊列頭結點
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); //喚醒后繼結點的線程
}
}
至此,CountDownLatch的原理就搞明白了,它是以AQS的共享模式來實現復雜的並發流程控制的。當其內部的計數器不為0時,調用其await方法將導致線程加入同步隊列並阻塞。當調用countDown方法使計數器的值為0時,會喚醒隊列中第一個等待的線程,之后由該線程喚醒后面的線程,以此類推,直到阻塞在閉鎖上的線程都被成功喚醒。
3.循環屏障CyclicBarrier
3.1 CyclicBarrier功能簡介
CyclicBarrier通常稱為循環屏障。它和CountDownLatch很相似,都可以使線程先等待然后再執行。不過CountDownLatch是使一批線程等待另一批線程執行完后再執行;而CyclicBarrier只是使等待的線程達到一定數目后再讓它們繼續執行。故而CyclicBarrier內部也有一個計數器,計數器的初始值在創建對象時通過構造參數指定,如下所示
public CyclicBarrier(int parties) {
this(parties, null);
}
每調用一次await()方法都將使阻塞的線程數+1,只有阻塞的線程數達到設定值時屏障才會打開,允許阻塞的所有線程繼續執行。除此之外,CyclicBarrier還有幾點需要注意的地方:
-
1.CyclicBarrier的計數器可以重置而CountDownLatch不行,這意味着CyclicBarrier實例可以被重復使用而CountDownLatch只能被使用一次。而這也是循環屏障循環二字的語義所在。
-
2.CyclicBarrier允許用戶自定義barrierAction操作,這是個可選操作,可以在創建CyclicBarrier對象時指定
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
一旦用戶在創建CyclicBarrier對象時設置了barrierAction參數,則在阻塞線程數達到設定值屏障打開前,會調用barrierAction的run()方法完成用戶自定義的操作。
3.2 使用CyclicBarrier
還是以多線程分割大任務並發執行的例子來進行講解,不過這次情況要稍微復雜些。線程在執行完分配給它的子任務后不能立即退出,必須等待所有任務都完成后再執行釋放資源的操作。而主線程在所有子任務都執行完畢后也要執行特定的操作,且該操作在線程釋放資源前。所有操作都以打印日志的方式進行模擬。代碼如下:
/**
* @author: takumiCX
* @create: 2018-09-18
**/
public class CyclicBarrierTest {
static CyclicBarrier cyclicBarrier;
public static void main(String[] args) {
int count = 10;
//當所有子任務都執行完畢時,barrierAction的run方法會被調用
cyclicBarrier = new CyclicBarrier(count, () ->
System.out.println("執行barrierAction操作!"));
//開啟多個線程執行子任務
for(int i=0;i<count;i++){
new Thread(new CyclicBarrierThread(cyclicBarrier,i)).start();
}
}
private static class CyclicBarrierThread implements Runnable {
public CyclicBarrier cyclicBarrier;
//任務序號
public int taskNum;
public CyclicBarrierThread(CyclicBarrier cyclicBarrier, int taskNum) {
this.cyclicBarrier = cyclicBarrier;
this.taskNum = taskNum;
}
@Override
public void run() {
//執行子任務
System.out.println("子任務:"+taskNum+" 執行完畢!");
try {
//等待所有子任務執行完成
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
//釋放資源
System.out.println("線程:"+taskNum+" 釋放資源!");
}
}
}
開啟10個線程執行子任務,每個線程執行完子任務后在CyclicBarrier上等待。等到所有子任務完成后,用戶設置自定義的barrierAction操作即被執行,之后屏障正式打開,阻塞的所有線程將完成釋放資源的操作。
結果如下圖所示

3.3 CyclicBarrier原理淺析
CyclicBarrier內部使用ReentrentLock來實現線程同步,而通過Condition來實現線程的阻塞和喚醒。當計數器值為0時,首先會執行用戶自定義的barrierAction操作。
int index = --count; //計數器值
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand; //用戶自定義的barrierAction
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
之后再進行阻塞線程的喚醒,以及將計數器重置為初始值。這部分代碼在nextGeneration()中
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //喚醒所有的阻塞線程
// set up next generation
count = parties; //計數器重置為初始值
generation = new Generation();
}
4. 信號量Semaphore
4.1 Semaphore功能簡介
如果學過操作系統的話,對信號量Semaphore應該不陌生。操作系統中的信號量是這么一個機構:它維護了一定數目的資源,進程向其請求資源將導致Semaphore中資源數量減少,當資源數量小於0時將會導致當前線程阻塞;而進程釋放資源將導致Semaphore中資源數量增加,當資源數量大於0時會喚醒阻塞的進程。操作系統中使用信號量可以輕松實現進程間的互斥和同步。java在語言層面也支持信號量機制,其工作原理和操作系統中的信號量類似,可以通過調用
public void acquire(int permits)
或者```
public boolean tryAcquire(int permits)
請求信號量中的許可(資源)。不過后者在信號量中許可數量不夠時不會阻塞而是立即返回一個失敗結果。當然,也可以通過```
public void release()
向信號量歸還資源。
信號量在創建時必須為其指定可以用的許可總數,如下所示
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
當創建信號量時指定許可總數為1,則可以起到獨占鎖的作用,不過它是不允許線程重入的。同時,它還有公平和非公平模式之分,通過在創建對象時傳入參數進行指定
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
和ReentrentLock一樣默認是非公平模式。
4.2 使用Semaphore進行最大並發數的控制
假設服務器上有一種資源可以同時供多個用戶進行訪問,出於系統穩定性考慮需要限制同時訪問的用戶的數量,整個過程可以模擬如下
/**
* @author: takumiCX
* @create: 2018-09-24
**/
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
//信號量控制並發數最多為3
Semaphore semaphore = new Semaphore(3);
//同時開啟10個線程
for(int i=1;i<=10;i++){
new Thread(new ReaderThread(semaphore,i)).start();
}
}
static class ReaderThread implements Runnable{
Semaphore semaphore;
//用戶序號
int userIndex;
public ReaderThread(Semaphore semaphore, int userIndex) {
this.semaphore = semaphore;
this.userIndex = userIndex;
}
@Override
public void run() {
try {
//獲取許可
semaphore.acquire(1);
//模擬訪問資源所用的時間
TimeUnit.SECONDS.sleep(1);
System.out.println("用戶 "+userIndex+" 訪問資源,時間:"+System.currentTimeMillis());
//釋放許可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用信號量限制同時並發訪問的線程數為3,然后開啟10個線程模擬用戶訪問。得到的結果如下

從結果上可以清晰的看到,每次最多允許3個用戶同時訪問資源,信號量很好的起到了限流作用。
4.3 Semaphore原理淺析
和CountDownLatch類似,Semaphore底層也是通過AQS的共享模式實現的。它和CountDownLatch的區別只是對於AQS共享模式的鈎子方法```
tryAcquireShared()
和```
tryReleaseShared()
的實現不同。
以Semaphore的非公平模式為例,其嘗試釋放同步狀態的邏輯如下
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //獲取可用許可數
int remaining = available - acquires; //計算被消耗后剩余的許可數
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
首先會獲取當前可用的許可值(state),根據請求數量計算出剩余的許可值,若剩余許可數小於0則直接返回剩余值表示該操作失敗;否則以CAS方式將state值更新為計算后的剩余值,並返回一個大於等於0的數表示成功。通過該方法的返回值可以知道嘗試獲取同步狀態的操作是否成功,返回值小於0表示沒有足夠的許可,線程將會加入同步隊列並等待;返回值大於等於0則表示許可足夠,則整個獲取許可的流程就結束了。
tryReleaseShared()的實現也很簡單,
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); //獲取當前許可數
int next = current + releases; //計算釋放后的許可總數
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //cas更新許可值
return true;
}
}
計算釋放后的許可總數並以CAS方式對state值進行更新。之后將返回上層繼續執行
doReleaseShared()
喚醒頭結點后面結點中的線程,被喚醒的線程將執行tryAcquireShared()重新嘗試獲取同步狀態,獲取失敗則繼續阻塞,獲取成功將設置當前結點為隊列頭結點並繼續喚醒后續結點中的線程。
