什么時候使用CountDownLatch


場景:在學習單例模式時候,用到了鎖synchronized的概念,在多線程中又用到了CountDownLatch的概念

jdk:https://docs.oracle.com/javase/7/docs/api/

1 CountDownLatch

正如每個Java文檔所描述的那樣,CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完后再執行。在Java並發中,countdownlatch的概念是一個常見的面試題,所以一定要確保你很好的理解了它。在這篇文章中,我將會涉及到在Java並發編 程中跟CountDownLatch相關的以下幾點:

  • CountDownLatch是什么?
  • CountDownLatch如何工作?
  • 在實時系統中的應用場景
  • 應用范例
  • 常見的面試題

1.1 CountDownLatch是什么

CountDownLatch是在java1.5被引入的,跟它一起被引入的並發工具類還有CyclicBarrier、Semaphore、ConcurrentHashMapBlockingQueue,它們都存在於java.util.concurrent包下。CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之后再執行。

CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。

 

CountDownLatch的偽代碼如下所示:

//Main thread start //Create CountDownLatch for N threads //Create and start N threads //Main thread wait on latch (待執行的程序先執行,這里通過CountDownLatch的值來判斷前面的線程是否執行完畢,如果沒有執行完畢會一直卡着) //N threads completes there tasks are returns //Main thread resume execution

 

ps:注意這里的順序

1.2 CountDownLatch如何工作

CountDownLatch.java類中定義的構造函數:

//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}

執行過程:

構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量

這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值

與CountDownLatch的第一次交互是主線程等待其他線程。

主線程必須在啟動其他線程后立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。(重要

其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。

這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。

所以當N個線程都調 用了這個方法,count的值等於0,然后主線程就能通過await()方法,恢復執行自己的任務。

1.3 在實時系統中的使用場景

讓我們嘗試羅列出在java實時系統中CountDownLatch都有哪些使用場景。我所羅列的都是我所能想到的。如果你有別的可能的使用方法,請在留言里列出來,這樣會幫助到大家。

  1. 實現最大的並行性:有時我們想同時啟動多個線程,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們創建一個初始計數為1的CountDownLatch,並讓所有線程都在這個鎖上等待,那么我們可以很輕松地完成測試。我們只需調用 一次countDown()方法就可以讓所有的等待線程同時恢復執行。
  2. 開始執行前等待n個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前,所有N個外部系統已經啟動和運行了。
  3. 死鎖檢測:一個非常方便的使用場景是,你可以使用n個線程訪問共享資源,在每次測試階段的線程數目是不同的,並嘗試產生死鎖。

1.4 CountDownLatch使用例子

場景:在這個例子中,我模擬了一個應用程序啟動類,它開始時啟動了n個線程類,這些線程將檢查外部系統並通知閉鎖,並且啟動類一直在閉鎖上等待着。一旦驗證和檢查了所有外部服務,那么啟動類恢復執行。

BaseHealthChecker.java:這個類是一個Runnable,負責所有特定的外部服務健康的檢測。它刪除了重復的代碼和閉鎖的中心控制代碼。

package countDown; import java.util.concurrent.CountDownLatch; public abstract class BaseHealthChecker implements Runnable { private CountDownLatch _latch; private String _serviceName; private boolean _serviceUp; //用來檢測線程任務是否都成功執行完畢 //Get latch object in constructor so that after completing the task, thread can countDown() the latch
    public BaseHealthChecker(String serviceName, CountDownLatch latch) { super(); this._latch = latch; this._serviceName = serviceName; this._serviceUp = false; } @Override public void run() { try { verifyService(); _serviceUp = true; } catch (Throwable t) { t.printStackTrace(System.err); _serviceUp = false; } finally { //不管程序有無異常,都會執行一次countDown()
            if(_latch != null) {  _latch.countDown(); } } } public String getServiceName() { return _serviceName; } public boolean isServiceUp() { return _serviceUp; } //This methos needs to be implemented by all specific service checker
    public abstract void verifyService(); }

NetworkHealthChecker.java這個類繼承了BaseHealthChecker,實現了verifyService()方法。DatabaseHealthChecker.javaCacheHealthChecker.java除了服務名和休眠時間外,與NetworkHealthChecker.java是一樣的。

package countDown; import java.util.concurrent.CountDownLatch; public class NetworkHealthChecker extends BaseHealthChecker{ public NetworkHealthChecker (CountDownLatch latch) { super("Network Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(7000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
package countDown; import java.util.concurrent.CountDownLatch; public class DatabaseHealthChecker extends BaseHealthChecker { public DatabaseHealthChecker (CountDownLatch latch) { super("Database Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(7000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
package countDown; import java.util.concurrent.CountDownLatch; public class CacheHealthChecker extends BaseHealthChecker { public CacheHealthChecker (CountDownLatch latch) { super("Cache Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(7000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }

ApplicationStartupUtil.java:這個類是一個主啟動類,它負責初始化閉鎖,然后等待,直到所有服務都被檢測完。

package countDown; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * * @author prd-lxw * 啟動類通過CountDownLatch來等待其它線程執行完畢 * */
public class ApplicationStartupUtil { //List of service checkers
    private static List<BaseHealthChecker> _services; //This latch will be used to wait on
    private static CountDownLatch _latch; private ApplicationStartupUtil() { } //單例啟動類
    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil(); public static ApplicationStartupUtil getInstance() { return INSTANCE; } public static boolean checkExternalServices() throws Exception { //Initialize the latch with number of service checkers
        _latch = new CountDownLatch(3); //All add checker in lists
        _services = new ArrayList<BaseHealthChecker>(); _services.add(new NetworkHealthChecker(_latch)); _services.add(new CacheHealthChecker(_latch)); _services.add(new DatabaseHealthChecker(_latch)); //Start service checkers using executor framework
        Executor executor = Executors.newFixedThreadPool(_services.size()); for(final BaseHealthChecker v : _services) { executor.execute(v); } //Now wait till all services are checked
 _latch.await(); //Services are file and now proceed startup //任何一個service沒有正確結束,mian最后都不能返回正確的結果
        for(final BaseHealthChecker v : _services) { if( ! v.isServiceUp()) { return false; } } return true; } }

現在你可以寫測試代碼去檢測一下閉鎖的功能了。

package countDown; public class Main { public static void main(String[] args){ boolean result = false; //檢測所有結果是否正確的執行
        try { result = ApplicationStartupUtil.checkExternalServices(); } catch (Exception e) { e.printStackTrace(); } System.out.println("External services validation completed !! Result was :: "+ result); } }

1.5 常見面試題

可以為你的下次面試准備以下一些CountDownLatch相關的問題:

  • 解釋一下CountDownLatch概念?
  • CountDownLatch 和CyclicBarrier的不同之處?
  • 給出一些CountDownLatch使用的例子?
  •  CountDownLatch 類中主要的方法?

http://tool.oschina.net/apidocs/apidoc?api=jdk-zh

一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

這里展示兩個api中的示例,通俗易懂:

示例用法: 下面給出了兩個類,其中一組 worker 線程使用了兩個倒計數鎖存器:

  • 第一個類是一個啟動信號,在 driver 為繼續執行 worker 做好准備之前,它會阻止所有的 worker 繼續執行。
  • 第二個類是一個完成信號,它允許 driver 在完成所有 worker 之前一直等待。
    class Driver { // ...
       void main() throws InterruptedException {
         CountDownLatch startSignal = new CountDownLatch(1);
         CountDownLatch doneSignal = new CountDownLatch(N);
    
         for (int i = 0; i < N; ++i) // create and start threads
           new Thread(new Worker(startSignal, doneSignal)).start();
    
         doSomethingElse();            // don't let run yet
         startSignal.countDown();      // let all threads proceed  此時才開始執行worker線程
         doSomethingElse();
         doneSignal.await();           // wait for all to finish 等待所有worker執行完之后才繼續執行
       }
     }
    
     class Worker implements Runnable {
       private final CountDownLatch startSignal;
       private final CountDownLatch doneSignal;
       Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
          this.startSignal = startSignal;
          this.doneSignal = doneSignal;
       }
       public void run() {
          try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
    } catch (InterruptedException ex) {} // return;
       }
    
       void doWork() { ... }
     }

     另一種典型用法是,將一個問題分成 N 個部分,用執行每個部分並讓鎖存器倒計數的 Runnable 來描述每個部分,然后將所有 Runnable 加入到 Executor 隊列。當所有的子部分完成后,協調線程就能夠通過 await。(當線程必須用這種方法反復倒計數時,可改為使用 CyclicBarrier。)

    class Driver2 { // ...
       void main() throws InterruptedException {
         CountDownLatch doneSignal = new CountDownLatch(N);
         Executor e = ...
    
         for (int i = 0; i < N; ++i) // create and start threads
           e.execute(new WorkerRunnable(doneSignal, i));
    
         doneSignal.await();           // wait for all to finish  等待N個worker線程執行完之后,才開始執行Driver2
       }
     }
    
     class WorkerRunnable implements Runnable {
       private final CountDownLatch doneSignal;
       private final int i;
       WorkerRunnable(CountDownLatch doneSignal, int i) {
          this.doneSignal = doneSignal;
          this.i = i;
       }
       public void run() {
          try {
            doWork(i);
            doneSignal.countDown();
          } catch (InterruptedException ex) {} // return;
       }
    
       void doWork() { ... }
     }

     

 

2 使用 CountDownLatch 控制多個線程執行順序

(轉自另外一篇博客,通俗易懂)

有時候會有這樣的需求,多個線程同時工作,然后其中幾個可以隨意並發執行,但有一個線程需要等其他線程工作結束后,才能開始。

舉個例子,開啟多個線程分塊下載一個大文件,每個線程只下載固定的一截,最后由另外一個線程來拼接所有的分段,那么這時候我們可以考慮使用CountDownLatch來控制並發。

ps:工作過程

 CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,

可以把它看成是一個計數器,其內部維護着一個count計數,只不過對這個計數器的操作都是原子操作同時只能有一個線程去操作這個計數器

CountDownLatch通過構造函數傳入一個初始計數值,調用者可以通過調用CounDownLatch對象的cutDown()方法,來使計數減1;

如果調用對象上的await()方法,那么調用者就會一直阻塞在這里,直到別人通過cutDown方法,將計數減到0,才可以繼續執行。

/**
 * Project Name:Spring0725
 * File Name:Sample.java
 * Package Name:work1128.singleton
 * Date:2017年11月28日下午5:15:38
 * Copyright (c) 2017, 深圳金融電子結算中心 All Rights Reserved.
 *
*/

package work1128.singleton;

import java.util.concurrent.CountDownLatch;

/**
 * ClassName: Sample <br/>
 * Function: 演示CountDownLatch的執行過程
 * date: 2017年11月28日 下午5:16:40 <br/>
 *
 * @author prd-lxw
 * @version 1.0
 * @since JDK 1.7
 */
public class Sample {
    /**
     * 計數器,用來控制線程
     * 傳入參數2,表示計數器計數為2
     */
    private final static CountDownLatch mCountDownLatch = new CountDownLatch(2);

    /**
     * 示例工作線程類
     */
    private static class WorkingThread extends Thread {
        private final String mThreadName; //線程名稱
        private final int mSleepTime; //睡眠時間
        public WorkingThread(String name, int sleepTime) {
            mThreadName = name;
            mSleepTime = sleepTime;
        }
        
        @Override
        public void run() {
            System.out.println("[" + mThreadName + "] started!");
            try {  
                    Thread.sleep(mSleepTime);  
            } catch (InterruptedException e) {  
                    e.printStackTrace();  
            }
            mCountDownLatch.countDown();
            System.out.println("[" + mThreadName + "] end!"); 
        }
    }
    
    /**
     * 示例線程類
     */
    private static class SampleThread extends Thread {
        
        @Override
        public void run() {
            System.out.println("[SampleThread] started!");
            try {
                // 會阻塞在這里等待 mCountDownLatch 里的count變為0;
                // 也就是等待另外的WorkingThread調用countDown()
 mCountDownLatch.await();
            } catch (InterruptedException e) {
                
            }
            System.out.println("[SampleThread] end!");
        }
    }
    
    public static void main(String[] args) throws Exception {
        // 最先run SampleThread ,后面線程執行完畢后,才會回來執行SampleThread這個線程
        new SampleThread().start();
        // 運行兩個工作線程
        // 工作線程1運行5秒
        new WorkingThread("WorkingThread1", 5000).start();
        // 工作線程2運行2秒
        new WorkingThread("WorkingThread2", 2000).start();
    }
}

 

 

 

運行結果:

[SampleThread] started!
[WorkingThread1] started!
[WorkingThread2] started!
[WorkingThread2] end!
[WorkingThread1] end!
[SampleThread] end!

達到了目的。當然還有其他方式可以做到這樣的效果,本文僅僅是介紹了一種使用CountDownLatch的方式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM