-
瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高並發實戰》 面試必備 + 面試必備 + 面試必備 【博客園總入口 】
-
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高並發核心編程》 大廠必備 + 大廠必備 + 大廠必備 【博客園總入口 】
-
入大廠+漲工資必備: 高並發【 億級流量IM實戰】 實戰系列 【 SpringCloud Nginx秒殺】 實戰系列 【博客園總入口 】
JUC 高並發工具類(3文章)與高並發容器類(N文章) :
- 1 CyclicBarrier 使用&核心原理 圖解
- 2 countDownLatch 使用&核心原理 圖解
- 3 Semaphore 使用&核心原理 圖解
- 4 跳表 核心原理 圖解
- 5 ConcurrentSkipListMap - 秒懂
- 6 ConcurrentSkipListSet - 秒懂
1 CountDownLatch 是什么?
Java的concurrent包里面的CountDownLatch其實可以把它看作一個計數器,只不過這個計數器的操作是原子操作,同時只能有一個線程去操作這個計數器,也就是同時只能有一個線程去減這個計數器里面的值。
你可以向CountDownLatch對象設置一個初始的數字作為計數值,任何調用這個對象上的await()方法都會阻塞,直到這個計數器的計數值被其他的線程減為0為止。
CountDownLatch的一個非常典型的應用場景是:有一個任務想要往下執行,但必須要等到其他的任務執行完畢后才可以繼續往下執行。假如我們這個想要繼續往下執行的任務調用一個CountDownLatch對象的await()方法,其他的任務執行完自己的任務后調用同一個CountDownLatch對象上的countDown()方法,這個調用await()方法的任務將一直阻塞等待,直到這個CountDownLatch對象的計數值減到0為止。
比如:客戶端一次請求5個統計數據,服務器需要全部統計完成后,才返回客戶端,可以使用CountDownLatch 。
2 怎么使用 CyclicBarrier
2.1 構造方法
//參數count為計數值
public CountDownLatch(int count) { };
2.2 重要方法
//調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
public void await() throws InterruptedException { };
//和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//將count值減1
public void countDown() { };
3 使用案例
使用步驟
- 首先是創建實例
CountDownLatch countDown = new CountDownLatch(2)
- 需要同步的線程執行完之后,計數-1;
countDown.countDown()
- 需要等待其他線程執行完畢之后,再運行的線程,調用
countDown.await()
實現阻塞同步
示例代碼
package cn.day13;
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) {
// TODO Auto-generated method stub
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
try {
System.out.println("子線程" + Thread.currentThread().getName()
+ "正在執行");
Thread.sleep(3000);
System.out.println("子線程" + Thread.currentThread().getName()
+ "執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread() {
public void run() {
try {
System.out.println("子線程" + Thread.currentThread().getName()
+ "正在執行");
Thread.sleep(3000);
System.out.println("子線程" + Thread.currentThread().getName()
+ "執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2個子線程執行完畢...");
latch.await();
System.out.println("2個子線程已經執行完畢");
System.out.println("繼續執行主線程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印結果:
子線程Thread-0正在執行
等待2個子線程執行完畢...
子線程Thread-1正在執行
子線程Thread-0執行完畢
子線程Thread-1執行完畢
2個子線程已經執行完畢
繼續執行主線程
4 CountDownLatch 使用場景
前面給了一個demo演示如何用,那這個東西在實際的業務場景中是否會用到呢?
因為確實在一個業務場景中使用到了,不然也就不會單獨撈出這一節...
電商的詳情頁,由眾多的數據拼裝組成,如可以分成一下幾個模塊
- 交易的收發貨地址,銷量
- 商品的基本信息(標題,圖文詳情之類的)
- 推薦的商品列表
- 評價的內容
- ....
上面的幾個模塊信息,都是從不同的服務獲取信息,且彼此沒啥關聯;所以為了提高響應,完全可以做成並發獲取數據,如
- 線程1獲取交易相關數據
- 線程2獲取商品基本信息
- 線程3獲取推薦的信息
- 線程4獲取評價信息
- ....
但是最終拼裝數據並返回給前端,需要等到上面的所有信息都獲取完畢之后,才能返回,這個場景就非常的適合 CountDownLatch
來做了
- 在拼裝完整數據的線程中調用
CountDownLatch#await(long, TimeUnit)
等待所有的模塊信息返回 - 每個模塊信息的獲取,由一個獨立的線程執行;執行完畢之后調用
CountDownLatch#countDown()
進行計數-1
5 CountDownLatch 原理
CountDownLatch在多線程並發編程中充當一個計時器的功能,並且內部維護一個count的變量,並且其操作都是原子操作,該類主要通過countDown()和await()兩個方法實現功能的,首先通過建立CountDownLatch對象,並且傳入參數即為count初始值。
如果一個線程調用了await()方法,那么這個線程便進入阻塞狀態,並進入阻塞隊列。如果一個線程調用了countDown()方法,則會使count-1;當count的值為0時,這時候阻塞隊列中調用await()方法的線程便會逐個被喚醒,從而進入后續的操作。比如下面的例子就是有兩個操作,一個是讀操作一個是寫操作,現在規定必須進行完寫操作才能進行讀操作。所以當最開始調用讀操作時,需要用await()方法使其阻塞,當寫操作結束時,則需要使count等於0。因此count的初始值可以定為寫操作的記錄數,這樣便可以使得進行完寫操作,然后進行讀操作。
構造方法:CountDownLatch
內部也是有個Sync
類繼承了AQS
,所以CountDownLatch
類的構造方法就是調用Sync
類的構造方法,然后調用setState()
方法設置AQS
中state
的值。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
方法:await()
該方法是使調用的線程阻塞住,直到state
的值為0就放開所有阻塞的線程。實現會調用到AQS
中的acquireSharedInterruptibly()
方法,先判斷下是否被中斷,接着調用了tryAcquireShared()
方法,講AQS
那篇文章里提到過這個方法是需要子類實現的,可以看到實現的邏輯就是判斷state
值是否為0,是就返回1,不是則返回-1。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
方法:countDown()
這個方法會對state
值減1,會調用到AQS
中releaseShared()
方法,目的是為了調用doReleaseShared()
方法,這個是AQS定義好的釋放資源的方法,而tryReleaseShared()
則是子類實現的,可以看到是一個自旋CAS
操作,每次都獲取state
值,如果為0則直接返回,否則就執行減1的操作,失敗了就重試,如果減完后值為0就表示要釋放所有阻塞住的線程了,也就會執行到AQS
中的doReleaseShared()
方法。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
回到◀瘋狂創客圈▶
瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門