countDownLatch



JUC 高並發工具類(3文章)與高並發容器類(N文章) :

1 CountDownLatch 是什么?

​ Java的concurrent包里面的CountDownLatch其實可以把它看作一個計數器,只不過這個計數器的操作是原子操作,同時只能有一個線程去操作這個計數器,也就是同時只能有一個線程去減這個計數器里面的值。

​ 你可以向CountDownLatch對象設置一個初始的數字作為計數值,任何調用這個對象上的await()方法都會阻塞,直到這個計數器的計數值被其他的線程減為0為止。

​ CountDownLatch的一個非常典型的應用場景是:有一個任務想要往下執行,但必須要等到其他的任務執行完畢后才可以繼續往下執行。假如我們這個想要繼續往下執行的任務調用一個CountDownLatch對象的await()方法,其他的任務執行完自己的任務后調用同一個CountDownLatch對象上的countDown()方法,這個調用await()方法的任務將一直阻塞等待,直到這個CountDownLatch對象的計數值減到0為止。

比如:客戶端一次請求5個統計數據,服務器需要全部統計完成后,才返回客戶端,可以使用CountDownLatch 。

在這里插入圖片描述

2 怎么使用 CyclicBarrier

2.1 構造方法

//參數count為計數值
public CountDownLatch(int count) {  };  

2.2 重要方法


//調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
public void await() throws InterruptedException { };   

//和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  

//將count值減1
public void countDown() { };  

3 使用案例

使用步驟

  1. 首先是創建實例 CountDownLatch countDown = new CountDownLatch(2)
  2. 需要同步的線程執行完之后,計數-1; countDown.countDown()
  3. 需要等待其他線程執行完畢之后,再運行的線程,調用 countDown.await()實現阻塞同步

示例代碼

package cn.day13;
 
import java.util.concurrent.CountDownLatch;
 
public class Test {
 
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		final CountDownLatch latch = new CountDownLatch(2);
 
		new Thread() {
			public void run() {
				try {
					System.out.println("子線程" + Thread.currentThread().getName()
							+ "正在執行");
					Thread.sleep(3000);
					System.out.println("子線程" + Thread.currentThread().getName()
							+ "執行完畢");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			};
		}.start();
 
		new Thread() {
			public void run() {
				try {
					System.out.println("子線程" + Thread.currentThread().getName()
							+ "正在執行");
					Thread.sleep(3000);
					System.out.println("子線程" + Thread.currentThread().getName()
							+ "執行完畢");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			};
		}.start();
 
		try {
			System.out.println("等待2個子線程執行完畢...");
			latch.await();
			System.out.println("2個子線程已經執行完畢");
			System.out.println("繼續執行主線程");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
 
}

打印結果:

子線程Thread-0正在執行
等待2個子線程執行完畢...
子線程Thread-1正在執行
子線程Thread-0執行完畢
子線程Thread-1執行完畢
2個子線程已經執行完畢
繼續執行主線程

4 CountDownLatch 使用場景

前面給了一個demo演示如何用,那這個東西在實際的業務場景中是否會用到呢?

因為確實在一個業務場景中使用到了,不然也就不會單獨撈出這一節...

電商的詳情頁,由眾多的數據拼裝組成,如可以分成一下幾個模塊

  • 交易的收發貨地址,銷量
  • 商品的基本信息(標題,圖文詳情之類的)
  • 推薦的商品列表
  • 評價的內容
  • ....

上面的幾個模塊信息,都是從不同的服務獲取信息,且彼此沒啥關聯;所以為了提高響應,完全可以做成並發獲取數據,如

  • 線程1獲取交易相關數據
  • 線程2獲取商品基本信息
  • 線程3獲取推薦的信息
  • 線程4獲取評價信息
  • ....

但是最終拼裝數據並返回給前端,需要等到上面的所有信息都獲取完畢之后,才能返回,這個場景就非常的適合 CountDownLatch來做了

  1. 在拼裝完整數據的線程中調用 CountDownLatch#await(long, TimeUnit) 等待所有的模塊信息返回
  2. 每個模塊信息的獲取,由一個獨立的線程執行;執行完畢之后調用 CountDownLatch#countDown() 進行計數-1

5 CountDownLatch 原理

​ CountDownLatch在多線程並發編程中充當一個計時器的功能,並且內部維護一個count的變量,並且其操作都是原子操作,該類主要通過countDown()和await()兩個方法實現功能的,首先通過建立CountDownLatch對象,並且傳入參數即為count初始值。

如果一個線程調用了await()方法,那么這個線程便進入阻塞狀態,並進入阻塞隊列。如果一個線程調用了countDown()方法,則會使count-1;當count的值為0時,這時候阻塞隊列中調用await()方法的線程便會逐個被喚醒,從而進入后續的操作。比如下面的例子就是有兩個操作,一個是讀操作一個是寫操作,現在規定必須進行完寫操作才能進行讀操作。所以當最開始調用讀操作時,需要用await()方法使其阻塞,當寫操作結束時,則需要使count等於0。因此count的初始值可以定為寫操作的記錄數,這樣便可以使得進行完寫操作,然后進行讀操作。

在這里插入圖片描述

構造方法:CountDownLatch

內部也是有個Sync類繼承了AQS,所以CountDownLatch類的構造方法就是調用Sync類的構造方法,然后調用setState()方法設置AQSstate的值。

 public CountDownLatch(int count) {
     if (count < 0) throw new IllegalArgumentException("count < 0");
     this.sync = new Sync(count);
 }

 Sync(int count) {
     setState(count);
 }    

方法:await()

該方法是使調用的線程阻塞住,直到state的值為0就放開所有阻塞的線程。實現會調用到AQS中的acquireSharedInterruptibly()方法,先判斷下是否被中斷,接着調用了tryAcquireShared()方法,講AQS那篇文章里提到過這個方法是需要子類實現的,可以看到實現的邏輯就是判斷state值是否為0,是就返回1,不是則返回-1。

 public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
 }

 public final void acquireSharedInterruptibly(int arg)
         throws InterruptedException {
     if (Thread.interrupted())
         throw new InterruptedException();
     if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
 }

 protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;
 }

方法:countDown()

這個方法會對state值減1,會調用到AQSreleaseShared()方法,目的是為了調用doReleaseShared()方法,這個是AQS定義好的釋放資源的方法,而tryReleaseShared()則是子類實現的,可以看到是一個自旋CAS操作,每次都獲取state值,如果為0則直接返回,否則就執行減1的操作,失敗了就重試,如果減完后值為0就表示要釋放所有阻塞住的線程了,也就會執行到AQS中的doReleaseShared()方法。

 public void countDown() {
     sync.releaseShared(1);
 }

 public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true;
     }
     return false;
 }

 protected boolean tryReleaseShared(int releases) {
     // Decrement count; signal when transition to zero
     for (;;) {
         int c = getState();
         if (c == 0)
             return false;
         int nextc = c-1;
         if (compareAndSetState(c, nextc))
             return nextc == 0;
     }
 }

回到◀瘋狂創客圈

瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM