- 1 AQS 簡單介紹
- 2 AQS 原理
- 3 Semaphore(信號量)-允許多個線程同時訪問
- 4 CountDownLatch (倒計時器)
- 5 CyclicBarrier(循環柵欄)
- 6 ReentrantLock 和 ReentrantReadWriteLock
- 參考
常見問題:AQS 原理?;CountDownLatch 和 CyclicBarrier 了解嗎,兩者的區別是什么?用過 Semaphore 嗎?
1 AQS 簡單介紹
AQS 的全稱為(AbstractQueuedSynchronizer),這個類在 java.util.concurrent.locks 包下面。
AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的 ReentrantLock,Semaphore,其他的諸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基於 AQS 的。當然,我們自己也能利用 AQS 非常輕松容易地構造出符合我們自己需求的同步器。
2 AQS 原理
在面試中被問到並發知識的時候,大多都會被問到“請你說一下自己對於 AQS 原理的理解”。下面給大家一個示例供大家參考,面試不是背題,大家一定要加入自己的思想,即使加入不了自己的思想也要保證自己能夠通俗的講出來而不是背出來。
下面大部分內容其實在 AQS 類注釋上已經給出了,不過是英語看着比較吃力一點,感興趣的話可以看看源碼。
2.1 AQS 原理概覽
AQS 核心思想是,如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,並且將共享資源設置為鎖定狀態。如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制 AQS 是用 CLH 隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關系)。AQS 是將每條請求共享資源的線程封裝成一個 CLH 鎖隊列的一個結點(Node)來實現鎖的分配。
看個 AQS(AbstractQueuedSynchronizer)原理圖:
AQS 使用一個 int 成員變量來表示同步狀態,通過內置的 FIFO 隊列來完成獲取資源線程的排隊工作。AQS 使用 CAS 對該同步狀態進行原子操作實現對其值的修改。
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態信息通過 protected 類型的getState
,setState
,compareAndSetState
進行操作
//返回同步狀態的當前值
protected final int getState() {
return state;
}
// 設置同步狀態的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)將同步狀態值設置為給定值update如果當前同步狀態的值等於expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2 AQS 對資源的共享方式
AQS 定義兩種資源共享方式
1)Exclusive(獨占)
只有一個線程能執行,如 ReentrantLock。又可分為公平鎖和非公平鎖,ReentrantLock 同時支持兩種鎖,下面以 ReentrantLock 對這兩種鎖的定義做介紹:
- 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
- 非公平鎖:當線程要獲取鎖時,先通過兩次 CAS 操作去搶鎖,如果沒搶到,當前線程再加入到隊列中等待喚醒。
說明:下面這部分關於
ReentrantLock
源代碼內容節選自:https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,這是一篇很不錯文章,推薦閱讀。
下面來看 ReentrantLock 中相關的源代碼:
ReentrantLock 默認采用非公平鎖,因為考慮獲得更好的性能,通過 boolean 來決定是否用公平鎖(傳入 true 用公平鎖)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默認非公平鎖
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 中公平鎖的 lock
方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平鎖相比,這里多了一個判斷:是否有線程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平鎖的 lock 方法:
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平鎖相比,這里會直接先進行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 這里沒有對阻塞隊列進行判斷
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
總結:公平鎖和非公平鎖只有兩處不同:
- 非公平鎖在調用 lock 后,首先就會調用 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被占用,那么直接就獲取到鎖返回了。
- 非公平鎖在 CAS 失敗后,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷等待隊列是否有線程處於等待狀態,如果有則不去搶鎖,乖乖排到后面。
公平鎖和非公平鎖就這兩點區別,如果這兩次 CAS 都不成功,那么后面非公平鎖和公平鎖是一樣的,都要進入到阻塞隊列等待喚醒。
相對來說,非公平鎖會有更好的性能,因為它的吞吐量比較大。當然,非公平鎖讓獲取鎖的時間變得更加不確定,可能會導致在阻塞隊列中的線程長期處於飢餓狀態。
2)Share(共享)
多個線程可同時執行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我們都會在后面講到。
ReentrantReadWriteLock 可以看成是組合式,因為 ReentrantReadWriteLock 也就是讀寫鎖允許多個線程同時對某一資源進行讀。
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源 state 的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS 已經在上層已經幫我們實現好了。
2.3 AQS 底層使用了模板方法模式
同步器的設計是基於模板方法模式的,如果需要自定義同步器一般的方式是這樣(模板方法模式很經典的一個應用):
- 使用者繼承 AbstractQueuedSynchronizer 並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源 state 的獲取和釋放)
- 將 AQS 組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。
這和我們以往通過實現接口的方式有很大區別,這是模板方法模式很經典的一個運用,下面簡單的給大家介紹一下模板方法模式,模板方法模式是一個很容易理解的設計模式之一。
模板方法模式是基於”繼承“的,主要是為了在不改變模板結構的前提下在子類中重新定義模板中的內容以實現復用代碼。舉個很簡單的例子假如我們要去一個地方的步驟是:購票
buyTicket()
->安檢securityCheck()
->乘坐某某工具回家ride()
->到達目的地arrive()
。我們可能乘坐不同的交通工具回家比如飛機或者火車,所以除了ride()
方法,其他方法的實現幾乎相同。我們可以定義一個包含了這些方法的抽象類,然后用戶根據自己的需要繼承該抽象類然后修改ride()
方法。
AQS 使用了模板方法模式,自定義同步器時需要重寫下面幾個 AQS 提供的模板方法:
isHeldExclusively()//該線程是否正在獨占資源。只有用到condition才需要去實現它。
tryAcquire(int)//獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int)//獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
默認情況下,每個方法都拋出 UnsupportedOperationException
。 這些方法的實現必須是內部線程安全的,並且通常應該簡短而不是阻塞。AQS 類中的其他方法都是 final ,所以無法被其他類使用,只有這幾個方法可以被其他類使用。
以 ReentrantLock 為例,state 初始化為 0,表示未鎖定狀態。A 線程 lock()時,會調用 tryAcquire()獨占該鎖並將 state+1。此后,其他線程再 tryAcquire()時就會失敗,直到 A 線程 unlock()到 state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A 線程自己是可以重復獲取此鎖的(state 會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證 state 是能回到零態的。
再以 CountDownLatch 以例,任務分為 N 個子線程去執行,state 也初始化為 N(注意 N 要與線程個數一致)。這 N 個子線程是並行執行的,每個子線程執行完后 countDown()一次,state 會 CAS(Compare and Swap)減 1。等到所有子線程都執行完后(即 state=0),會 unpark()主調用線程,然后主調用線程就會從 await()函數返回,繼續后余動作。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一種即可。但 AQS 也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock
。
推薦兩篇 AQS 原理和相關源碼分析的文章:
- http://www.cnblogs.com/waterystone/p/4920797.html
- https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html
3 Semaphore(信號量)-允許多個線程同時訪問
synchronized 和 ReentrantLock 都是一次只允許一個線程訪問某個資源,Semaphore(信號量)可以指定多個線程同時訪問某個資源。
示例代碼如下:
/**
*
* @author Snailclimb
* @date 2018年9月30日
* @Description: 需要一次性拿一個許可的情況
*/
public class SemaphoreExample1 {
// 請求的數量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 創建一個具有固定線程數量的線程池對象(如果這里線程池的線程數量給太少的話你會發現執行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允許執行的線程數量。
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表達式的運用
try {
semaphore.acquire();// 獲取一個許可,所以可運行線程數量為20/1=20
test(threadnum);
semaphore.release();// 釋放一個許可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模擬請求的耗時操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模擬請求的耗時操作
}
}
執行 acquire
方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證;每個 release
方法增加一個許可證,這可能會釋放一個阻塞的 acquire 方法。然而,其實並沒有實際的許可證這個對象,Semaphore 只是維持了一個可獲得許可證的數量。 Semaphore 經常用於限制獲取某種資源的線程數量。
當然一次也可以一次拿取和釋放多個許可,不過一般沒有必要這樣做:
semaphore.acquire(5);// 獲取5個許可,所以可運行線程數量為20/5=4
test(threadnum);
semaphore.release(5);// 獲取5個許可,所以可運行線程數量為20/5=4
除了 acquire
方法之外,另一個比較常用的與之對應的方法是tryAcquire
方法,該方法如果獲取不到許可就立即返回 false。
Semaphore 有兩種模式,公平模式和非公平模式。
- 公平模式: 調用 acquire 的順序就是獲取許可證的順序,遵循 FIFO;
- 非公平模式: 搶占式的。
Semaphore 對應的兩個構造方法如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
這兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,默認非公平模式。
issue645補充內容 :Semaphore與CountDownLatch一樣,也是共享鎖的一種實現。它默認構造AQS的state為permits。當執行任務的線程數量超出permits,那么多余的線程將會被放入阻塞隊列Park,並自旋判斷state是否大於0。只有當state大於0的時候,阻塞的線程才能繼續執行,此時先前執行任務的線程繼續執行release方法,release方法使得state的變量會加1,那么自旋的線程便會判斷成功。
如此,每次只有最多不超過permits數量的線程能自旋成功,便限制了執行任務線程的數量。
由於篇幅問題,如果對 Semaphore 源碼感興趣的朋友可以看下這篇文章:https://juejin.im/post/5ae755366fb9a07ab508adc6
4 CountDownLatch (倒計時器)
CountDownLatch允許 count 個線程阻塞在一個地方,直至所有線程的任務都執行完畢。在 Java 並發中,countdownlatch 的概念是一個常見的面試題,所以一定要確保你很好的理解了它。
CountDownLatch是共享鎖的一種實現,它默認構造 AQS 的 state 值為 count。當線程使用countDown方法時,其實使用了tryReleaseShared
方法以CAS的操作來減少state,直至state為0就代表所有的線程都調用了countDown方法。當調用await方法的時候,如果state不為0,就代表仍然有線程沒有調用countDown方法,那么就把已經調用過countDown的線程都放入阻塞隊列Park,並自旋CAS判斷state == 0,直至最后一個線程調用了countDown,使得state == 0,於是阻塞的線程便判斷成功,全部往下執行。
4.1 CountDownLatch 的兩種典型用法
- 某一線程在開始運行前等待 n 個線程執行完畢。將 CountDownLatch 的計數器初始化為 n :
new CountDownLatch(n)
,每當一個任務線程執行完畢,就將計數器減 1countdownlatch.countDown()
,當計數器的值變為 0 時,在CountDownLatch上 await()
的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個組件加載完畢,之后再繼續執行。 - 實現多個線程開始執行任務的最大並行性。注意是並行性,不是並發,強調的是多個線程在某一時刻同時開始執行。類似於賽跑,將多個線程放到起點,等待發令槍響,然后同時開跑。做法是初始化一個共享的
CountDownLatch
對象,將其計數器初始化為 1 :new CountDownLatch(1)
,多個線程在開始執行任務前首先coundownlatch.await()
,當主線程調用 countDown() 時,計數器變為 0,多個線程同時被喚醒。
4.2 CountDownLatch 的使用示例
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: CountDownLatch 使用方法示例
*/
public class CountDownLatchExample1 {
// 請求的數量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 創建一個具有固定線程數量的線程池對象(如果這里線程池的線程數量給太少的話你會發現執行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表達式的運用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown();// 表示一個請求已經被完成
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模擬請求的耗時操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模擬請求的耗時操作
}
}
上面的代碼中,我們定義了請求的數量為 550,當這 550 個請求被處理完成之后,才會執行System.out.println("finish");
。
與 CountDownLatch 的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調用 CountDownLatch.await()
方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。
其他 N 個線程必須引用閉鎖對象,因為他們需要通知 CountDownLatch
對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()
方法來完成的;每調用一次這個方法,在構造函數中初始化的 count 值就減 1。所以當 N 個線程都調 用了這個方法,count 的值等於 0,然后主線程就能通過 await()
方法,恢復執行自己的任務。
再插一嘴:CountDownLatch
的 await()
方法使用不當很容易產生死鎖,比如我們上面代碼中的 for 循環改為:
for (int i = 0; i < threadCount-1; i++) {
.......
}
這樣就導致 count
的值沒辦法等於 0,然后就會導致一直等待。
如果對CountDownLatch源碼感興趣的朋友,可以查看: 【JUC】JDK1.8源碼分析之CountDownLatch(五)
4.3 CountDownLatch 的不足
CountDownLatch 是一次性的,計數器的值只能在構造方法中初始化一次,之后沒有任何機制再次對其設置值,當 CountDownLatch 使用完畢后,它不能再次被使用。
4.4 CountDownLatch 相常見面試題
解釋一下 CountDownLatch 概念?
CountDownLatch 和 CyclicBarrier 的不同之處?
給出一些 CountDownLatch 使用的例子?
CountDownLatch 類中主要的方法?
5 CyclicBarrier(循環柵欄)
CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現線程間的技術等待,但是它的功能比 CountDownLatch 更加復雜和強大。主要應用場景和 CountDownLatch 類似。
CountDownLatch的實現是基於AQS的,而CycliBarrier是基於 ReentrantLock(ReentrantLock也屬於AQS同步器)和 Condition 的.
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續干活。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties)
,其參數表示屏障攔截的線程數量,每個線程調用await
方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。
再來看一下它的構造函數:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中,parties 就代表了有攔截的線程的數量,當攔截的線程數量達到這個值的時候就打開柵欄,讓所有線程通過。
5.1 CyclicBarrier 的應用場景
CyclicBarrier 可以用於多線程計算數據,最后合並計算結果的應用場景。比如我們用一個 Excel 保存了用戶所有銀行流水,每個 Sheet 保存一個帳戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個 sheet 里的銀行流水,都執行完之后,得到每個 sheet 的日均銀行流水,最后,再用 barrierAction 用這些線程的計算結果,計算出整個 Excel 的日均銀行流水。
5.2 CyclicBarrier 的使用示例
示例 1:
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 測試 CyclicBarrier 類中帶參數的 await() 方法
*/
public class CyclicBarrierExample2 {
// 請求的數量
private static final int threadCount = 550;
// 需要同步的線程數量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 創建線程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保證子線程完全執行結束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
運行結果,如下:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
可以看到當線程數量也就是請求數量達到我們定義的 5 個的時候, await
方法之后的方法才被執行。
另外,CyclicBarrier 還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction)
,用於在線程到達屏障時,優先執行barrierAction
,方便處理更復雜的業務場景。示例代碼如下:
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的時候指定一個 Runnable
*/
public class CyclicBarrierExample3 {
// 請求的數量
private static final int threadCount = 550;
// 需要同步的線程數量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------當線程數達到之后,優先執行------");
});
public static void main(String[] args) throws InterruptedException {
// 創建線程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
運行結果,如下:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------當線程數達到之后,優先執行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------當線程數達到之后,優先執行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
5.3 CyclicBarrier
源碼分析
當調用 CyclicBarrier
對象調用 await()
方法時,實際上調用的是dowait(false, 0L)
方法。 await()
方法就像樹立起一個柵欄的行為一樣,將線程擋住了,當攔住的線程數量達到 parties 的值時,柵欄才會打開,線程才得以通過執行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait(false, 0L)
:
// 當線程數量或者請求數量達到 count 時 await 之后的方法才會被執行。上面的示例中 count 的值就為 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 鎖住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果線程中斷了,拋出異常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout減1
int index = --count;
// 當 count 數量減為 0 之后說明最后一個線程已經到達柵欄了,也就是達到了可以執行await 方法之后的條件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 將 count 重置為 parties 屬性的初始化值
// 喚醒之前等待的線程
// 下一波執行開始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
總結:CyclicBarrier
內部通過一個 count 變量作為計數器,cout 的初始值為 parties 屬性的初始化值,每當一個線程到了柵欄這里了,那么就將計數器減一。如果 count 值為 0 了,表示這是這一代最后一個線程到達柵欄,就嘗試執行我們構造方法中輸入的任務。
5.4 CyclicBarrier 和 CountDownLatch 的區別
下面這個是國外一個大佬的回答:
CountDownLatch 是計數器,只能使用一次,而 CyclicBarrier 的計數器提供 reset 功能,可以多次使用。但是我不那么認為它們之間的區別僅僅就是這么簡單的一點。我們來從 jdk 作者設計的目的來看,javadoc 是這么描述它們的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個線程,等待其他多個線程完成某件事情之后才能執行;)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個線程互相等待,直到到達同一個同步點,再繼續一起執行。)
對於 CountDownLatch 來說,重點是“一個線程(多個線程)等待”,而其他的 N 個線程在完成“某件事情”之后,可以終止,也可以等待。而對於 CyclicBarrier,重點是多個線程,在任意一個線程沒有完成,所有的線程都必須等待。
CountDownLatch 是計數器,線程完成一個記錄一個,只不過計數不是遞增而是遞減,而 CyclicBarrier 更像是一個閥門,需要所有線程都到達,閥門才能打開,然后繼續執行。
6 ReentrantLock 和 ReentrantReadWriteLock
ReentrantLock 和 synchronized 的區別在上面已經講過了這里就不多做講解。另外,需要注意的是:讀寫鎖 ReentrantReadWriteLock 可以保證多個線程可以同時讀,所以在讀操作遠大於寫操作的時候,讀寫鎖就非常有用了。