Java多線程:CountDownLatch、CyclicBarrier 和 Semaphore


場景描述:

  多線程設計過程中,經常會遇到需要等待其它線程結束以后再做其他事情的情況。
有幾種方案:
 
  1.在主線程中設置一自定義全局計數標志,在工作線程完成時,計數減1。主線程偵測該標志是否為0,一旦為0,表示所有工作線程已經完成。
  2.使用Java標准的類CountDownLatch來完成這項工作,原理是一樣的,計數。
 
 

CountDownLatch

一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。 
其機制是:
  當多個(具體數量等於初始化CountDownLatch時的count參數的值)線程都達到了預期狀態或完成預期工作時觸發事件,其他線程可以等待這個事件來觸發自己的后續工作。這里需要注意的是,等待的線程可以是多個,即CountDownLatch是可以喚醒多個等待的線程的。達到自己預期狀態的線程會調用CountDownLatch的countDown方法,而等待的線程會調用CountDownLatch的await方法。
CountDownLatch 很適合用來將一個任務分為n個獨立的部分,等這些部分都完成后繼續接下來的任務,CountDownLatch 只能出發一次,計數值不能被重置。

流程圖

 

如上圖所示,當7個線程都完成latch.countDown調用后,最下面那條線程會從latch.await返回,繼續執行后面的代碼

函數列表

  • CountDownLatch(int count) :構造一個用給定計數初始化的 CountDownLatch。
  • void await():使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。
  • boolean await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
  • void countDown() 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。

實現原理

 

實例

   我們來看一個具體的例子。假設我們使用一台多核的機器對一組數據進行排序,那么我們可以把這組數據分到不同線程中進行排序,然后合並;可以利用線程池來管理多線程;可以將CountDownLatch用作各個分組數據都排好序的通知。下面是代碼片段:

先看主線程

int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
int[] datas = new int[10204];
int step = datas.length / count;
for (int i=0; i < count; i++) {
    int start = i * step;
    int end = (i+1) * step;
    if (i == count - 1) end = datas.length;
    threadpool.execute(new MyRunnable(latch, datas, start, end));
}
latch.await();
//合並數據

我們再看一下具體任務的代碼,即MyRunnable的run方法的實現:

public void run() {
      //數據排序
     latch.countDown(); 
}

 

CyclicBarrier

可以協同多個線程,讓多個線程在這個屏障前等待,直到所有線程都達到了這個屏障時,再一起繼續執行后面的動作。
CyclicBarrier適用於多個線程有固定的多步需要執行,線程間互相等待,當都執行完了,再一起執行下一步。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
 

流程圖

 上圖中的7個線程各有一個barrier.await,那么任何一個線程在執行到barrier.await時就會進入阻塞等待狀態,直到7個線程都到了barrier.await時才會同時從await返回,繼續后面的工作。此外如果在構造CyclicBarrier時設置了一個Runnable實現,那么最后一個到barrier.await的線程會執行這個Runnable的run方法,以完成一些預設的工作。
 
注意比較CountDownLatchCyclicBarrier
  (01) CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程相互等待。
  (02) CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置后使用,因此它被稱為是循環的barrier。
CountDownLatch 適用於一組線程和另一個主線程之間的工作協作。一個主線程等待一組工作線程的任務完畢才繼續它的執行是使用 CountDownLatch 的主要場景;CyclicBarrier 用於一組或幾組線程,比如一組線程需要在一個時間點上達成一致,例如同時開始一個工作。另外,CyclicBarrier 的循環特性和構造函數所接受的 Runnable 參數也是 CountDownLatch 所不具備的。
 
 
 
CountDownLatch
CyclicBarrier
適用場景
主線程等待其他工作線程結束
多個線程相互等待,直到所有線程都達到一個障礙點Barrier
主要方法
CountDownLatch(int count) 主線程調用:初始化計數
 
await() 主線程調用 : 阻塞,直到等待計數為0時解除阻塞 
 
countDown() 工作線程調用 : 計數減1
CyclicBarrier(int parties , Runnnable barrierAction) : 初始化參與者數量和障礙點執行Action,action可選,由主線程初始化
 
await() : 由工作線程調用,每被調用一次,計數便會減少1,並阻塞住當前線程 , 直到所有線程都達到障礙點
等待結束
各線程之間不再相互影響, 可以繼續做自己的事情, 不再執行下一個工作目標。
在障礙點到達后, 允許所有線程繼續執行,到達下一個目標后,可以恢復使用CyclicBarrier, barrier 在釋放等待線程后可以重用
異常
 
如果其中一個線程由於中斷、錯誤、或者超時導致永久離開障礙點,其他線程也將拋出異常。
 

實例

   
int count = 10;
final CyclicBarrier barrier = new CyclicBarrier(count + 1);
int[] datas = new int[10204];
int step = datas.length / count;
for (int i=0; i < count; i++) {
    int start = i * step;
    int end = (i+1) * step;
    if (i == count - 1) end = datas.length;
    threadpool.execute(new MyRunnable(barrier, datas, start, end));
}
barrier.await();
//合並數據

可以看到CyclicBarrier對象傳入的參數值比CountDownLatch大1,原因是構造CountDownLatch的參數是調用countDown的數量,而CyclicBarrier的數量是await的數量

public void run() {
      //數據排序
     try {
         barrier.await(); 
    }catch (...)
}

 

Semaphore

Semaphore 信號量對象管理的信號就像令牌,構造時傳入個數,總數就是控制並發的數量 。我們需要控制並發的代碼,執行前先獲取信號(通過acquire獲取信號許可),執行后歸還信號(通過release歸還信號許可)。每次acquire成功返回后,Semaphore可用的信號量就會減少一個,如果沒有可用的信號,acquire調用就會阻塞,等待有release調用釋放信號后,acquire才會得到信號並返回。
如果Semaphore管理的信號量為1個,那么就退化到互斥鎖了;如果多於一個信號量,則主要用於控制並發數。與通過控制線程數來控制並發數的方式相比,通過Semaphore來控制並發數可以控制得更加細粒度,因為真正被控制最大並發的代碼放到acquire和release之間就行了。
  Semaphore類位於java.util.concurrent包下,它提供了2個構造器:
public Semaphore(int permits) {          //參數permits表示許可數目,即同時可以允許多少線程進行訪問
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

實例

   例如我們需要控制遠程方法的並發量,超過並發量的方法就等待有其他方法執行返回后再執行,那么其代碼如下:
semaphore.acquire();
try {
    //調用遠程通信的方法
}
finally {
    semaphore.release();
}

 

 
 
 

參考資料:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM