【JUC】JDK1.8源碼分析之CyclicBarrier(四)


一、前言

  有了前面分析的基礎,現在,接着分析CyclicBarrier源碼,CyclicBarrier類在進行多線程編程時使用很多,比如,你希望創建一組任務,它們並行執行工作,然后在進行下一個步驟之前等待,直至所有的任務都完成,和join很類似,下面,開始分析源碼。

二、CyclicBarrier數據結構

  分析源碼可以知道,CyclicBarrier底層是基於ReentrantLockAbstractQueuedSynchronizer來實現的,所以,CyclicBarrier的數據結構也依托於AQS的數據結構,在前面對AQS的分析中已經指出了其數據結構,在這里不再累贅。

三、CyclicBarrier源碼分析

  3.1 類的繼承關系

public class CyclicBarrier {}

  說明:可以看到CyclicBarrier沒有顯示繼承哪個父類或者實現哪個父接口,根據Java語言規定,可知其父類是Object。

  3.2 類的內部類

  CyclicBarrier類存在一個內部類Generation,每一次使用的CycBarrier可以當成Generation的實例,其源代碼如下

private static class Generation {
    boolean broken = false;
}

  說明:Generation類有一個屬性broken,用來表示當前屏障是否被損壞。

  3.3 類的屬性 

public class CyclicBarrier {
    
    /** The lock for guarding barrier entry */
    // 可重入鎖
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    // 條件隊列
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    // 參與的線程數量
    private final int parties;
    /* The command to run when tripped */
    // 由最后一個進入 barrier 的線程執行的操作
    private final Runnable barrierCommand;
    /** The current generation */
    // 當前代
    private Generation generation = new Generation();
    // 正在等待進入屏障的線程數量
    private int count;
}
View Code

  說明:該屬性有一個為ReentrantLock對象,有一個為Condition對象,而Condition對象又是基於AQS的,所以,歸根到底,底層還是由AQS提供支持。

  3.4 類的構造函數

  1. CyclicBarrier(int, Runnable)型構造函數 

public CyclicBarrier(int parties, Runnable barrierAction) {
        // 參與的線程數量小於等於0,拋出異常
        if (parties <= 0) throw new IllegalArgumentException();
        // 設置parties
        this.parties = parties;
        // 設置count
        this.count = parties;
        // 設置barrierCommand
        this.barrierCommand = barrierAction;
    }
View Code

  說明:該構造函數可以指定關聯該CyclicBarrier的線程數量,並且可以指定在所有線程都進入屏障后的執行動作,該執行動作由最后一個進行屏障的線程執行。

  2. CyclicBarrier(int)型構造函數 

public CyclicBarrier(int parties) {
        // 調用含有兩個參數的構造函數
        this(parties, null);
    }
View Code

  說明:該構造函數僅僅執行了關聯該CyclicBarrier的線程數量,沒有設置執行動作。

  3.5 核心函數分析

  1. dowait函數

  此函數為CyclicBarrier類的核心函數,CyclicBarrier類對外提供的await函數在底層都是調用該了doawait函數,其源代碼如下。

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 保存當前鎖
        final ReentrantLock lock = this.lock;
        // 鎖定
        lock.lock();
        try {
            // 保存當前代
            final Generation g = generation;
            
            if (g.broken) // 屏障被破壞,拋出異常
                throw new BrokenBarrierException();

            if (Thread.interrupted()) { // 線程被中斷
                // 損壞當前屏障,並且喚醒所有的線程,只有擁有鎖的時候才會調用
                breakBarrier();
                // 拋出異常
                throw new InterruptedException();
            }
            
            // 減少正在等待進入屏障的線程數量
            int index = --count;
            if (index == 0) {  // 正在等待進入屏障的線程數量為0,所有線程都已經進入
                // 運行的動作標識
                boolean ranAction = false;
                try {
                    // 保存運行動作
                    final Runnable command = barrierCommand;
                    if (command != null) // 動作不為空
                        // 運行
                        command.run();
                    // 設置ranAction狀態
                    ranAction = true;
                    // 進入下一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction) // 沒有運行的動作
                        // 損壞當前屏障
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 無限循環
            for (;;) {
                try {
                    if (!timed) // 沒有設置等待時間
                        // 等待
                        trip.await(); 
                    else if (nanos > 0L) // 設置了等待時間,並且等待時間大於0
                        // 等待指定時長
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) { 
                    if (g == generation && ! g.broken) { // 等於當前代並且屏障沒有被損壞
                        // 損壞當前屏障
                        breakBarrier();
                        // 拋出異常
                        throw ie;
                    } else { // 不等於當前帶后者是屏障被損壞
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        // 中斷當前線程
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken) // 屏障被損壞,拋出異常
                    throw new BrokenBarrierException();

                if (g != generation) // 不等於當前代
                    // 返回索引
                    return index;

                if (timed && nanos <= 0L) { // 設置了等待時間,並且等待時間小於0
                    // 損壞屏障
                    breakBarrier();
                    // 拋出異常
                    throw new TimeoutException();
                }
            }
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
View Code

  說明:dowait方法的邏輯會進行一系列的判斷,大致流程如下。

  2. nextGeneration函數 

  此函數在所有線程進入屏障后會被調用,即生成下一個版本,所有線程又可以重新進入到屏障中,其源代碼如下  

    private void nextGeneration() {
        // signal completion of last generation
        // 喚醒所有線程
        trip.signalAll();
        // set up next generation
        // 恢復正在等待進入屏障的線程數量
        count = parties;
        // 新生一代
        generation = new Generation();
    }
View Code

  在此函數中會調用AQS的signalAll方法,即喚醒所有等待線程。如果所有的線程都在等待此條件,則喚醒所有線程。其源代碼如下 

public final void signalAll() {
            if (!isHeldExclusively()) // 不被當前線程獨占,拋出異常
                throw new IllegalMonitorStateException();
            // 保存condition隊列頭結點
            Node first = firstWaiter;
            if (first != null) // 頭結點不為空
                // 喚醒所有等待線程
                doSignalAll(first);
        }
View Code

  說明:此函數判斷頭結點是否為空,即條件隊列是否為空,然后會調用doSignalAll函數,doSignalAll函數源碼如下  

private void doSignalAll(Node first) {
            // condition隊列的頭結點尾結點都設置為空
            lastWaiter = firstWaiter = null;
            // 循環
            do {
                // 獲取first結點的nextWaiter域結點
                Node next = first.nextWaiter;
                // 設置first結點的nextWaiter域為空
                first.nextWaiter = null;
                // 將first結點從condition隊列轉移到sync隊列
                transferForSignal(first);
                // 重新設置first
                first = next;
            } while (first != null);
        }
View Code

  說明:此函數會依次將條件隊列中的節點轉移到同步隊列中,會調用到transferForSignal函數,其源碼如下

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
View Code

  說明:此函數的作用就是將處於條件隊列中的節點轉移到同步隊列中,並設置結點的狀態信息,其中會調用到enq函數,其源代碼如下。  

private Node enq(final Node node) {
        for (;;) { // 無限循環,確保結點能夠成功入隊列
            // 保存尾結點
            Node t = tail;
            if (t == null) { // 尾結點為空,即還沒被初始化
                if (compareAndSetHead(new Node())) // 頭結點為空,並設置頭結點為新生成的結點
                    tail = head; // 頭結點與尾結點都指向同一個新生結點
            } else { // 尾結點不為空,即已經被初始化過
                // 將node結點的prev域連接到尾結點
                node.prev = t; 
                if (compareAndSetTail(t, node)) { // 比較結點t是否為尾結點,若是則將尾結點設置為node
                    // 設置尾結點的next域為node
                    t.next = node; 
                    return t; // 返回尾結點
                }
            }
        }
    }
View Code

  說明:此函數完成了結點插入同步隊列的過程,也很好理解。

  綜合上面的分析可知,newGeneration函數的主要方法的調用如下,之后會通過一個例子詳細講解。

  3. breakBarrier函數

  此函數的作用是損壞當前屏障,會喚醒所有在屏障中的線程。源代碼如下  

private void breakBarrier() {
        // 設置狀態
        generation.broken = true;
        // 恢復正在等待進入屏障的線程數量
        count = parties;
        // 喚醒所有線程
        trip.signalAll();
    }
View Code

  說明:可以看到,此函數也調用了AQS的signalAll函數,由signal函數提供支持。

四、示例

  下面通過一個例子來詳解CyclicBarrier的使用和內部工作機制,源代碼如下  

package com.hust.grid.leesf.cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * 
 * @author leesf
 * @time   2016.4.16
 */
class MyThread extends Thread {
    private CyclicBarrier cb;
    public MyThread(String name, CyclicBarrier cb) {
        super(name);
        this.cb = cb;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " going to await");
        try {
            cb.await();
            System.out.println(Thread.currentThread().getName() + " continue");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") {
            public void run() {
                System.out.println(Thread.currentThread().getName() + " barrier action");
                
            }
        });
        MyThread t1 = new MyThread("t1", cb);
        MyThread t2 = new MyThread("t2", cb);
        t1.start();
        t2.start();
        System.out.println(Thread.currentThread().getName() + " going to await");
        cb.await();
        System.out.println(Thread.currentThread().getName() + " continue");

    }
}

  運行結果(某一次): 

t1 going to await
main going to await
t2 going to await
t2 barrier action
t2 continue
t1 continue
main continue

  說明:根據結果可知,可能會存在如下的調用時序。

  說明:由上圖可知,假設t1線程的cb.await是在main線程的cb.barrierAction動作是由最后一個進入屏障的線程執行的。根據時序圖,進一步分析出其內部工作流程。

  ① main(主)線程執行cb.await操作,主要調用的函數如下。

  說明:由於ReentrantLock的默認采用非公平策略,所以在dowait函數中調用的是ReentrantLock.NonfairSync的lock函數,由於此時AQS的狀態是0,表示還沒有被任何線程占用,故main線程可以占用,之后在dowait中會調用trip.await函數,最終的結果是條件隊列中存放了一個包含main線程的結點,並且被禁止運行了,同時,main線程所擁有的資源也被釋放了,可以供其他線程獲取。

  ② t1線程執行cb.await操作,其中假設t1線程的lock.lock操作在main線程釋放了資源之后,則其主要調用的函數如下。

  說明:可以看到,之后condition queue(條件隊列)里面有兩個節點,包含t1線程的結點插入在隊列的尾部,並且t1線程也被禁止了,因為執行了park操作,此時兩個線程都被禁止了。

  ③ t2線程執行cb.await操作,其中假設t2線程的lock.lock操作在t1線程釋放了資源之后,則其主要調用的函數如下。

  說明:由上圖可知,在t2線程執行await操作后,會直接執行command.run方法,不是重新開啟一個線程,而是最后進入屏障的線程執行。同時,會將Condition queue中的所有節點都轉移到Sync queue中,並且最后main線程會被unpark,可以繼續運行。main線程獲取cpu資源,繼續運行。

  ④ main線程獲取cpu資源,繼續運行,下圖給出了主要的方法調用。

  說明:其中,由於main線程是在AQS.CO的wait中被park的,所以恢復時,會繼續在該方法中運行。運行過后,t1線程被unpark,它獲得cpu資源可以繼續運行。

  ⑤ t1線程獲取cpu資源,繼續運行,下圖給出了主要的方法調用。

  說明:其中,由於t1線程是在AQS.CO的wait方法中被park,所以恢復時,會繼續在該方法中運行。運行過后,Sync queue中保持着一個空節點。頭結點與尾節點均指向它。

   注意:在線程await過程中中斷線程會拋出異常,所有進入屏障的線程都將被釋放。至於CyclicBarrier的其他用法,讀者可以自行查閱API,不再累贅。

五、總結

  有了AQS與ReentrantLock的基礎,分析CyclicBarrier就會非常簡單,因為其底層就是由兩者支撐的,關於CycylicBarrier的源碼就分析到此,有疑問的讀者,歡迎交流,謝謝各位園友的觀看~

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM