備注:博客園的markDown格式支持的特別不友好。也歡迎查看我的csdn的此篇文章鏈接:CountDownLatch、CyclicBarrier和Semaphore 使用示例及原理
CountDownLatch
CountDownLatch用戶監聽某些初始化操作,並且線程進行阻塞,等初始化執行完畢后,通知主線程繼續工作執行。
CountDownLatch 使用示例
使用示例,線程t3 要等待t1和t2執行完畢才執行:
/**
* @Description: CountDownLatch 等待和喚醒
* @Author: wangmeng
* @Date: 2018/12/16-16:38
*/
public class UseCountDownLatch {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("進入t1線程。。。");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1線程初始化完畢,通知t3線程繼續操作!");
countDownLatch.countDown();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("進入t2線程。。。");
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2線程初始化完畢,通知t3線程繼續操作!");
countDownLatch.countDown();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("進入t3 線程,並且等待...");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t3線程進行后續的執行操作...");
}
}, "t3");
t1.start();
t2.start();
t3.start();
}
}
打印結果:
進入t1線程。。。
進入t3 線程,並且等待...
進入t2線程。。。
t1線程初始化完畢,通知t3線程繼續操作!
t2線程初始化完畢,通知t3線程繼續操作!
t3線程進行后續的執行操作...
CountDownLatch 源碼解讀
其實CountDownLatch用的底層原理就是AQS, 可以參考:(AQS原理詳解)。AQS全局維護的有一個volatile修飾的state字段,當state為0時就會通知countDownLatch等待線程執行。
這也就是所以我們在new CountDownLatch(int n) 時指定的參數,n為多少,也就是要調用多少次countDown()方法。
public void await() throws InterruptedException { }; //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行
public void countDown() { }; //將count值減1
看看await()方法, 其底層調用的就是AQS中的getState方法,通過判斷state是否為0來決定是否喚醒等待的線程。
如果不為0則調用Unsafe中的park方法進行自旋,直到state==0時才繼續往下執行(喚醒等待的線程)。
public void await() throws InterruptedException {
//調用AQS中的方法
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//CountDownLatch中的方法,獲取state值
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//當AQS中的state不為0就會執行此方法,這個方法也就是讓線程等待。使用直到state==0才結束循環。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看到上面await方法了,那么countDown就可以直接猜出來了,無外乎就是使得AQS中的state通過CAS操作進行減一,如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier
CyclicBarrier是回環柵欄的概念,多線程來的進行阻塞,等待某一個臨界值條件滿足后,同時執行。
假設有一個場景:每個線程代表一個跑步運動員,當運動員都准備好后,才一起出發,只要有一個人沒有准備好,大家都等待。
CyclicBarrier 應用實例
/**
* @Description: 測試CyclicBarrier
* @Author: wangmeng
* @Date: 2018/12/16-17:05
*/
public class UseCyclicBarrier {
//模擬運動員的類。
static class Runner implements Runnable {
private String name;
private CyclicBarrier cyclicBarrier;
@Override
public void run() {
try {
System.out.println("運動員:" + this.name + "進行准備工作!");
TimeUnit.SECONDS.sleep((new Random().nextInt(5)));
System.out.println("運動員:" + this.name + "准備完成!");
this.cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("運動員" + this.name + "開始起跑!!!");
}
public Runner(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
ExecutorService executorPools = Executors.newFixedThreadPool(3);
executorPools.submit(new Thread(new Runner("張三", cyclicBarrier)));
executorPools.submit(new Thread(new Runner("李四", cyclicBarrier)));
executorPools.submit(new Thread(new Runner("王五", cyclicBarrier)));
executorPools.shutdown();
}
}
打印的結果:
運動員:張三進行准備工作!
運動員:李四進行准備工作!
運動員:王五進行准備工作!
運動員:張三准備完成!
運動員:王五准備完成!
運動員:李四准備完成!
運動員李四開始起跑!!!
運動員張三開始起跑!!!
運動員王五開始起跑!!!
可以看到三個線程都是先執行完初始化操作,然后才一起喚醒執行后續的操作。
CyclicBarrier 源碼解讀
CyclicBarrier是通過ReentrantLock和Condition來實現的。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 鎖住
lock.lock();
try {
// 當前代
final Generation g = generation;
// 如果這代損壞了,拋出異常
if (g.broken)
throw new BrokenBarrierException();
// 如果線程中斷了,拋出異常
if (Thread.interrupted()) {
// 將損壞狀態設置為 true
// 並通知其他阻塞在此柵欄上的線程
breakBarrier();
throw new InterruptedException();
}
// 獲取下標
int index = --count;
// 如果是 0 ,說明到頭了
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 執行柵欄任務
if (command != null)
command.run();
ranAction = true;
// 更新一代,將 count 重置,將 generation 重置.
// 喚醒之前等待的線程
nextGeneration();
// 結束
return 0;
} finally {
// 如果執行柵欄任務的時候失敗了,就將柵欄失效
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
// 如果沒有時間限制,則直接等待,直到被喚醒
if (!timed)
trip.await();
// 如果有時間限制,則等待指定時間
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// g == generation >> 當前代
// ! g.broken >>> 沒有損壞
if (g == generation && ! g.broken) {
// 讓柵欄失效
breakBarrier();
throw ie;
} else {
// 上面條件不滿足,說明這個線程不是這代的.
// 就不會影響當前這代柵欄執行邏輯.所以,就打個標記就好了
Thread.currentThread().interrupt();
}
}
// 當有任何一個線程中斷了,會調用 breakBarrier 方法.
// 就會喚醒其他的線程,其他線程醒來后,也要拋出異常
if (g.broken)
throw new BrokenBarrierException();
// g != generation >>> 正常換代了
// 一切正常,返回當前線程所在柵欄的下標
// 如果 g == generation,說明還沒有換代,那為什么會醒了?
// 因為一個線程可以使用多個柵欄,當別的柵欄喚醒了這個線程,就會走到這里,所以需要判斷是否是當前代。
// 正是因為這個原因,才需要 generation 來保證正確。
if (g != generation)
return index;
// 如果有時間限制,且時間小於等於0,銷毀柵欄,並拋出異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
用上面的示例總結一下CyclicBarrier的await方法實現,假設線程thread1和線程thread2都執行到CyclicBarrier的await(),都進入dowait(boolean timed, long nanos),thread1先獲取到獨占鎖,執行到--count的時,index等於1,所以進入下面的for循環,接着執行trip.await(),進入await()方法,執行Node node = addConditionWaiter()將當前線程構造成Node節點並加入到Condition等待隊列中,然后釋放獲取到的獨占鎖,當前線程進入阻塞狀態;此時,線程thread2可以獲取獨占鎖,繼續執行--count,index等於0,所以先執行command.run(),輸出myThread,然后執行nextGeneration(),nextGeneration()中trip.signalAll()只是將Condition等待隊列中的Node節點按之前順序都轉移到了AQS同步隊列中,這里也就是將thread1對應的Node節點轉移到了AQS同步隊列中,thread2執行完nextGeneration(),返回return 0之前,細看代碼還需要執行lock.unlock(),這里會執行到ReentrantLock的unlock()方法,最終執行到AQS的unparkSuccessor(Node node)方法,從AQS同步隊列中的頭結點開始釋放節點,喚醒節點對應的線程,即thread1恢復執行。
如果有三個線程thread1、thread2和thread3,假設線程執行順序是thread1、thread2、thread3,那么thread1、thread2對應的Node節點會被加入到Condition等待隊列中,當thread3執行的時候,會將thread1、thread2對應的Node節點按thread1、thread2順序轉移到AQS同步隊列中,thread3執行lock.unlock()的時候,會先喚醒thread1,thread1恢復繼續執行,thread1執行到lock.unlock()的時候會喚醒thread2恢復執行。
更多可參考:並發編程之 CyclicBarrier 源碼分析
CountdownLatch和CyclicBarrier的區別
1、CountDownLatch簡單的說就是一個線程等待,直到他所等待的其他線程都執行完成並且調用countDown()方法發出通知后,當前線程才可以繼續執行。
2、CyclicBarrier是所有線程都進行等待,直到所有線程都准備好進入await()方法之后,所有線程同時開始執行!
3、CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。
Semaphore
Semaphore翻譯成字面意思為 信號量,Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
關於限流的其他方式可以參考我另一篇博文:限流的簡單使用及學習
相關概念:
- PV(page view)網站的總訪問量,頁面瀏覽量或點擊量,用戶每刷新一次就會被記錄一次。
- UV(unique Visitor)訪問網站的一台電腦客戶端為一個訪客。一般來講時間上以00:00-24:00之內相同ip的客戶端只記錄。
- QPS(query per second)即每秒查詢數,qps很大程度上代表了系統業務上的繁忙程度,每次請求的背后,可能對應着多次磁盤I/O,多次網絡請求,多個cpu時間片等。我們通過qps可以非常直觀的了解當前系統業務情況,一旦當前qps超過所設定的預警閥值,可以考慮增加機器對集群擴容,以免壓力過大導致宕機,可以根據前期的壓力測試得到估值,在結合后期綜合運維情況,估算出閥值。
- RT(response time)請求的響應時間,這個指標非常關鍵,直接說明前端用戶的體驗,任何系統設計師都想降低rt時間。
- 當然還涉及cpu、內存、網絡、磁盤等情況,更細節的問題很多,如select、update、delete/ps等數據庫層面的統計。
- 容量評估:一般來說通過開發、運維、測試、以及業務等相關人員,綜合出系統的一系列閥值,然后我們根據關鍵閥值如qps、rt等,對系統進行有效的變更。
- 一般來講,我們進行多輪壓力測試以后,可以對系統進行峰值評估,采用所謂的80/20原則,即80%的訪問請求將在20%的時間內達到。這樣我們可以根據系統對應的PV計算出峰值qps。
- 峰值qps= (總PV × 80%)/ (60 × 60 × 24 × 20%)
- 然后在將總的峰值qps除以單台機器所能承受的最高的qps值,就是所需要機器的數量:機器數 = 總的峰值qps / 壓測得出的單機極限qps
- 當然不排除系統在上線前進行大型促銷活動,或者雙十一、雙十二熱點事件、遭受到DDos攻擊等情況,系統的開發和運維人員急需要了解當前系統運行的狀態和負載情況,一般都會有后台系統去維護。
Semaphore 使用示例:
/**
* @Description:使用Semaphore模擬限流操作
* @Author: wangmeng
* @Date: 2018/12/16-18:30
*/
public class UseSemaphore {
public static void main(String[] args) {
ExecutorService threadPools = Executors.newFixedThreadPool(20);
//同一時間只能有5個線程執行
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 20; i++) {
final int token = i;
Runnable run = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
//進行業務操作
System.out.println("獲得許可,執行操作..." + token);
long sleepTime = (long)(Math.random() * 10000);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
};
threadPools.execute(run);
}
System.out.println("queue length: " + semaphore.getQueueLength());
threadPools.shutdown();
}
}
原理也是使用AQS中的state變量。代碼我就不貼了。
Semaphore原理可參見:https://juejin.im/post/5ae755366fb9a07ab508adc6
Semaphore 就是一個共享鎖,通過設置 state 變量來實現對這個變量的共享。當調用 acquire 方法的時候,state 變量就減去一,當調用 release 方法的時候,state 變量就加一。當 state 變量為 0 的時候,別的線程就不能進入代碼塊了,就會在 AQS 中阻塞等待。