CyclicBarrier 原理(秒懂)



JUC 高並發工具類(3文章)與高並發容器類(N文章) :

1 CyclicBarrier 是什么?

從字面上的意思可以知道,這個類的中文意思是“循環柵欄”。大概的意思就是一個可循環利用的屏障。

它的作用就是會讓所有線程都等待完成后才會繼續下一步行動。

現實生活中我們經常會遇到這樣的情景,在進行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅游時要等全部人都到齊了才出發,比賽時要等運動員都上場后才開始。

在JUC包中為我們提供了一個同步工具類能夠很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類可以實現一組線程相互等待,當所有線程都到達某個屏障點后再進行后續的操作。下圖演示了這一過程。

CyclicBarrier字面意思是“可重復使用的柵欄”,CyclicBarrier 相比 CountDownLatch 來說,要簡單很多,其源碼沒有什么高深的地方,它是 ReentrantLock 和 Condition 的組合使用。

看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重復使用(Cyclic)。

cyclicbarrier-2

2 怎么使用 CyclicBarrier

2.1 構造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

解析:

parties 是參與線程的個數
第二個構造方法有一個 Runnable 參數,這個參數的意思是最后一個到達線程要做的任務

2.2 重要方法

public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

解析:

線程調用 await() 表示自己已經到達柵欄
BrokenBarrierException 表示柵欄已經被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時

3 使用案例

2.3.1 需求
一個線程組的線程需要等待所有線程完成任務后再繼續執行下一次任務

2.3.2 代碼實現

public class CyclicBarrierDemo {

static class TaskThread extends Thread {
    
    CyclicBarrier barrier;
    
    public TaskThread(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(getName() + " 到達柵欄 A");
            barrier.await();
            System.out.println(getName() + " 沖破柵欄 A");
            
            Thread.sleep(2000);
            System.out.println(getName() + " 到達柵欄 B");
            barrier.await();
            System.out.println(getName() + " 沖破柵欄 B");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    int threadNum = 5;
    CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 完成最后任務");
        }
    });
    
    for(int i = 0; i < threadNum; i++) {
        new TaskThread(barrier).start();
    }
}

}

打印結果:

Thread-1 到達柵欄 A
Thread-3 到達柵欄 A
Thread-0 到達柵欄 A
Thread-4 到達柵欄 A
Thread-2 到達柵欄 A
Thread-2 完成最后任務
Thread-2 沖破柵欄 A
Thread-1 沖破柵欄 A
Thread-3 沖破柵欄 A
Thread-4 沖破柵欄 A
Thread-0 沖破柵欄 A
Thread-4 到達柵欄 B
Thread-0 到達柵欄 B
Thread-3 到達柵欄 B
Thread-2 到達柵欄 B
Thread-1 到達柵欄 B
Thread-1 完成最后任務
Thread-1 沖破柵欄 B
Thread-0 沖破柵欄 B
Thread-4 沖破柵欄 B
Thread-2 沖破柵欄 B
Thread-3 沖破柵欄 B

從打印結果可以看出,所有線程會等待全部線程到達柵欄之后才會繼續執行,並且最后到達的線程會完成 Runnable 的任務。
在這里插入圖片描述

CyclicBarrier 使用場景

可以用於多線程計算數據,最后合並計算結果的場景。

4 CyclicBarrier 原理

而 CyclicBarrier 基於 Condition 來實現的。因為 CyclicBarrier 的源碼相對來說簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那么這里的源碼是毫無壓力的,就是幾個特殊概念罷了。

在CyclicBarrier類的內部有一個計數器,每個線程在到達屏障點的時候都會調用await方法將自己阻塞,此時計數器會減1,當計數器減為0的時候所有因調用await方法而被阻塞的線程將被喚醒。這就是實現一組線程相互等待的原理,下面我們先看看CyclicBarrier有哪些成員變量。

成員變量

//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數
private final int parties;
//換代前執行的任務
private final Runnable barrierCommand;
//表示柵欄的當前代
private Generation generation = new Generation();
//計數器
private int count;

//靜態內部類Generation
private static class Generation {
  boolean broken = false;
}

上面貼出了CyclicBarrier所有的成員變量,可以看到CyclicBarrier內部是通過條件隊列trip來對線程進行阻塞的,並且其內部維護了兩個int型的變量parties和count,parties表示每次攔截的線程數,該值在構造時進行賦值。count是內部計數器,它的初始值和parties相同,以后隨着每次await方法的調用而減1,直到減為0就將所有線程喚醒。CyclicBarrier有一個靜態內部類Generation,該類的對象代表柵欄的當前代,就像玩游戲時代表的本局游戲,利用它可以實現循環等待。barrierCommand表示換代前執行的任務,當count減為0時表示本局游戲結束,需要轉到下一局。在轉到下一局游戲之前會將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定barrierCommand來執行自己的任務。我用一圖來描繪下 CyclicBarrier 里面的一些概念:

構造器

接下來我們看看它的構造器。

//構造器1
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}
 
//構造器2
public CyclicBarrier(int parties) {
  this(parties, null);
}

CyclicBarrier有兩個構造器,其中構造器1是它的核心構造器,在這里你可以指定本局游戲的參與者數量(要攔截的線程數)以及本局結束時要執行的任務,還可以看到計數器count的初始值被設置為parties。

等待的方法

CyclicBarrier類最主要的功能就是使先到達屏障點的線程阻塞並等待后面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。

//非定時等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}

//定時等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

可以看到不管是定時等待還是非定時等待,它們都調用了dowait方法,只不過是傳入的參數不同而已。下面我們就來看看dowait方法都做了些什么。

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //檢查當前柵欄是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //檢查當前線程是否被中斷
    if (Thread.interrupted()) {
      //如果當前線程被中斷會做以下三件事
      //1.打翻當前柵欄
      //2.喚醒攔截的所有線程
      //3.拋出中斷異常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都將計數器的值減1
    int index = --count;
    //計數器的值減為0則需喚醒所有線程並轉換到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //喚醒所有線程前先執行指定的任務
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //喚醒所有線程並轉到下一代
        nextGeneration();
        return 0;
      } finally {
        //確保在任務未成功執行時能將所有線程喚醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }

//如果計數器不為0則執行此循環
for (;;) {
  try {
    //根據傳入的參數來決定是定時等待還是非定時等待
    if (!timed) {
      trip.await();
    }else if (nanos > 0L) {
      nanos = trip.awaitNanos(nanos);
    }
  } catch (InterruptedException ie) {
    //若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程
    if (g == generation && ! g.broken) {
      breakBarrier();
      throw ie;
    } else {
      //若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接調用中斷操作
      Thread.currentThread().interrupt();
    }
  }
  //如果線程因為打翻柵欄操作而被喚醒則拋出異常
  if (g.broken) {
    throw new BrokenBarrierException();
  }
  //如果線程因為換代操作而被喚醒則返回計數器的值
  if (g != generation) {
    return index;
  }
  //如果線程因為時間到了而被喚醒則打翻柵欄並拋出異常
  if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
  }
}
​```

  } finally {
    lock.unlock();
  }
}


上面貼出的代碼中注釋都比較詳細,我們只挑一些重要的來講。可以看到在dowait方法中每次都將count減1,減完后立馬進行判斷看看是否等於0,如果等於0的話就會先去執行之前指定好的任務,執行完之后再調用nextGeneration方法將柵欄轉到下一代,在該方法中會將所有線程喚醒,將計數器的值重新設為parties,最后會重新設置柵欄代次,在執行完nextGeneration方法之后就意味着游戲進入下一局。如果計數器此時還不等於0的話就進入for循環,根據參數來決定是調用trip.awaitNanos(nanos)還是trip.await()方法,這兩方法對應着定時和非定時等待。如果在等待過程中當前線程被中斷就會執行breakBarrier方法,該方法叫做打破柵欄,意味着游戲在中途被掐斷,設置generation的broken狀態為true並喚醒所有線程。同時這也說明在等待過程中有一個線程被中斷整盤游戲就結束,所有之前被阻塞的線程都會被喚醒。線程醒來后會執行下面三個判斷,看看是否因為調用breakBarrier方法而被喚醒,如果是則拋出異常;看看是否是正常的換代操作而被喚醒,如果是則返回計數器的值;看看是否因為超時而被喚醒,如果是的話就調用breakBarrier打破柵欄並拋出異常。這里還需要注意的是,如果其中有一個線程因為等待超時而退出,那么整盤游戲也會結束,其他線程都會被喚醒。下面貼出nextGeneration方法和breakBarrier方法的具體代碼。


怎么重置一個柵欄

最后,我們來看看怎么重置一個柵欄:



public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}


我們設想一下,如果初始化時,指定了線程 parties = 4,前面有 3 個線程調用了 await 等待,在第 4 個線程調用 await 之前,我們調用 reset 方法,那么會發生什么?

首先,打破柵欄,那意味着所有等待的線程(3個等待的線程)會喚醒,await 方法會通過拋出 BrokenBarrierException 異常返回。然后開啟新的一代,重置了 count 和 generation,相當於一切歸零了。

5 CyclicBarrier 與 CountDownLatch 區別

CountDownLatch 是一次性的,CyclicBarrier 是可循環利用的
CountDownLatch 參與的線程的職責是不一樣的,有的在倒計時,有的在等待倒計時結束。CyclicBarrier 參與的線程職責是一樣的。

CyclicBarrier 的源碼實現和 CountDownLatch 大相徑庭,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現的。因為 CyclicBarrier 的源碼相對來說簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那么這里的源碼是毫無壓力的,就是幾個特殊概念罷了。


回到◀瘋狂創客圈

瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM