CyclicBarrier正確的使用方法和錯誤的使用方法


CyclicBarrier是java推出的一個並發編程工具,它用在多個線程之間協同工作。線程約定到達某個點,到達這個點之后的線程都停下來,直到最后一個線程也到達了這個點之后,所有的線程才會得到釋放。常用的場景是:多個worker線程,每個線程都在循環地做一部分工作,並在最后用cyclicBarrier.await()設下約定點,當最后一個線程做完了工作也到達約定點后,所有線程得到釋放,開始下一輪工作。也就是下面這樣:

1 while(!done()){
2     //working
3     cyclicBarrier.await();
4 }

CyclicBarrier還支持一個回調函數,每當一輪工作結束后,下一輪工作開始前,這個回調函數都會被調用一次。

但是,使用CyclicBarrier必須准守最佳實踐的使用方法,否則,就可能達不到想要的效果。比如,下面這樣,就是一種典型的錯誤使用方法:

    private void process(CyclicBarrier cyclicBarrier) {
        final int n = 100;
        Runnable worker= new Runnable() {
            @Override
            public void run() {
               
                    try {
                        //模擬工作
                        Thread.sleep(3000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                    try {
                        cyclicBarrier.await();
                    } catch (BrokenBarrierException | InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("Worker is done");
                System.out.println("Thread of Worker is "+ Thread.currentThread().getId());
            
        };


        for (int i = 0; i < n; i++) {
            Thread t1 = new Thread(worker);
            Thread t2 = new Thread(worker);
            t1.start();
            t2.start();
        }


    }

在上面的代碼中,工作不在worker線程中循環,而是在開啟工作的線程中循環,也就是說,它會不斷地開啟新的worker線程。這會導致的一個問題是,上一輪的回調還沒執行完成,下一輪的工作就已經開始了。

那么為什么呢?下面來分析一下原因。

首先,要知道CyclicBarrier是如何做到在上一輪工作結束后下一輪工作開始前執行回調函數的。查看jdoc文檔,里面有這么一句話“A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. ”這是描述回調函數的,從描述中可以看到,回調函數是在最后一個線程到達約定點后,線程釋放前被執行的。也就是說,回調函數的執行時間發生在下一輪工作前,這是通過在執行完回調函數再釋放工作線程來實現的。

然后,我們再來看看上面錯誤的使用方法。在錯誤的使用方法中,主線程的每一輪循環中都開啟了新的worker線程,這樣在回調函數結束之前,前面開啟的worker線程確實沒有得到釋放,但是,新開啟的工作線程卻完全可以執行下一輪工作,這就是為什么在回調函數執行完畢之前,新一輪的工作就已經開始了的原因。並且,錯誤方法中的每一個工作線程只執行一輪工作就結束了,每一輪工作之間的線程互不影響,這也就失去了協作性,因此,千萬要避免寫出這種代碼。

關於CyclicBarrier使用的最佳時間,基本上就是官方示例中的用法了,如下:

 1 class Solver {
 2    final int N;
 3    final float[][] data;
 4    final CyclicBarrier barrier;
 5 
 6    class Worker implements Runnable {
 7      int myRow;
 8      Worker(int row) { myRow = row; }
 9      public void run() {
10        while (!done()) {
11          processRow(myRow);
12 
13          try {
14            barrier.await();
15          } catch (InterruptedException ex) {
16            return;
17          } catch (BrokenBarrierException ex) {
18            return;
19          }
20        }
21      }
22    }
23 
24    public Solver(float[][] matrix) {
25      data = matrix;
26      N = matrix.length;
27      barrier = new CyclicBarrier(N,
28                                  new Runnable() {
29                                    public void run() {
30                                      mergeRows(...);
31                                    }
32                                  });
33      for (int i = 0; i < N; ++i)
34        new Thread(new Worker(i)).start();
35 
36      waitUntilDone();
37    }
38  }

最后在有一個問題是,回調函數是在哪一個線程里執行的?

根據我的demo測試發現,是在第一個到達的線程中執行的。當然,官方並沒有明確規定這一點,也許以后會有變化吧,所以,我們也不能以來這一特征。我的demo如下:

public class Demo1 {
   public static main(String[] args){
        Demo1 demo =  new Demo1();
        demo1.showInfThreadWhenDirectly();
    }
private void process(CyclicBarrier cyclicBarrier) { final int n = 100; Runnable worker= new Runnable() { @Override public void run() { for (int i = 0; i < n; i++) { try { Thread.sleep(3000); } catch (InterruptedException ex) { ex.printStackTrace(); } try { int arrival_index=cyclicBarrier.await(); if(0==arrival_index){ System.out.println("first arrival Thread in this iteration is: " +Thread.currentThread().getId()); } } catch (BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } System.out.println("Worker is done"); System.out.println("Thread of Worker is "+ Thread.currentThread().getId()); } }; Thread t1 = new Thread(worker); Thread t2 = new Thread(worker); t1.start(); t2.start(); } public void showInfThreadWhenDirectly(){ CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("[Directly] Thread in invert call function is" + Thread.currentThread().getId())); process(cyclicBarrier); System.out.println("[Directly] main Thread is "+ Thread.currentThread().getId()); } }

輸出結果如下:

[Directly] main Thread is 1
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is11
first arrival Thread in this iteration is: 11
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is10
first arrival Thread in this iteration is: 10
[Directly] Thread in invert call function is11
first arrival Thread in this iteration is: 11

 

另外,官方還有一段:“

If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of await() returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:

  if (barrier.await() == 0) {
     // log the completion of this iteration
   }

 ”

意思是說,如果回調動作“arrier action”不需要在所有工作線程都停止的狀態下執行的話,那么可以隨便找一個工作線程去做這個動作。為了支持這個,CyclicBarrier 的await( )方法有一個返回值,返回的就是當前線程是第幾個到達約定點(barrier)的。

參考https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CyclicBarrier.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM