並發編程之:CountDownLatch


大家好,我是小黑,一個在互聯網苟且偷生的農民工。

先問大家一個問題,在主線程中創建多個線程,在這多個線程被啟動之后,主線程需要等子線程執行完之后才能接着執行自己的代碼,應該怎么實現呢?

Thread.join()

看過我 並發編程之:線程 的朋友應該知道怎么做,在Thread類中有一個方法join(),這個方法是一個阻塞方法,當前線程會等待調動join()方法的線程死亡之后再繼續執行。

我們通過代碼來看看執行結果。

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " run ~");
            });
            t.start();
            t.join();
        }
        System.out.println("main線程執行結束");
    }
}

從結果可以看出,main線程要等到所有子線程都執行完之后才會繼續執行,並且每一個子線程是按順序執行的。

我們在來看一下join()方法是如何讓主線程阻塞的呢?來看一下源碼。

public final void join() throws InterruptedException {
    // 默認傳入0毫秒
    join(0);
}
// 本方法是synchronized的
public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            // 測試當前線程是否還活着
            while (isAlive()) {
                // 執行wait,當前線程等待
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

從join方法的源碼中我們可以看到幾個重要的信息,首先join()方法默認是等待0毫秒;join(long millis)方法是一個synchronized方法;循環判斷當前線程是否還活着。什么意思呢?

  1. main線程在調用線程T的join()方法時,會先獲取T對象的鎖;
  2. 在join方法中會調用T對象的wait()方法等待,而wait()方法會釋放T對象的鎖,並且main線程在執行完wait()之后會進入阻塞狀態;
  3. 最后main線程在被notify喚醒之后,需要再循環判斷T對象是否還活着,如果還活着會再次執行wait()。

而在線程執行完run()方法之后,JVM會調用該線程的exit()方法,通過notifyAll()喚醒處於等待狀態的線程。

private void exit() {
    if (group != null) {
        // 終止group中的線程this
        group.threadTerminated(this);
        group = null;
    }
    /* Aggressively null out all reference fields: see bug 4006245 */
    target = null;
    /* Speed the release of some of these resources */
    threadLocals = null;
    inheritableThreadLocals = null;
    inheritedAccessControlContext = null;
    blocker = null;
    uncaughtExceptionHandler = null;
}

void threadTerminated(Thread t) {
    synchronized (this) {
        remove(t);

        if (nthreads == 0) {
            // 喚醒等待線程
            notifyAll();
        }
        if (daemon && (nthreads == 0) &&
            (nUnstartedThreads == 0) && (ngroups == 0))
        {
            destroy();
        }
    }
}

細心的話你會發現,使用Thread.join()只能做到讓一個線程執行完之后,做不到同時等待多個線程,比如我們上面的代碼,線程1執行完之后才能執行線程2,無法做到讓線程1和線程2同時處理。

CountDownLatch

而在JUC包中的工具類CountDownLatch具備和Thread.join()方法同樣的能力,可以等待一個線程執行完之后再處理,並且支持同時等待多個線程。我們來修改一下上面Thread.join()的例子。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " run ~");
                countDownLatch.countDown();
            });
            t.start();
        }
        countDownLatch.await();
        System.out.println("main線程執行結束");
    }
}

image-20210905233350593

CountDownLatch需要在創建時指定一個計數值,在子線程中執行完之后調用countDown()方法進行遞減,主線程的await()方法會等到值減為0之后繼續執行。

從運行結果我們可以看到,100個子線程並不是按順序執行的,而是隨機的。

我們通過CountDownLatch的源碼來看一下是如何實現的。

private final Sync sync;

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在CountDownLatch中我們看到有一個Sync變量,從上一期AQS源碼解析內容中我們知道Sync是AQS的一個子類實現;

首先構造方法傳入的count值會作為參數賦值給Sync中的state變量。

然后我們來看一下在線程中的CountDownLath.countDown()方法會做些什么事情。

public void countDown() {
	// 釋放共享鎖
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

如果有看我上期AQS源碼解析的同學一定很熟悉,這段代碼就是共享鎖的解鎖過程,本質上就是對state-1。

那么主線程是如何實現的等待呢?我們猜一下,應該是去判斷state有沒有減為0,如果減為0則代表所有的線程都執行完countDown()方法,則可以繼續執行,如果state還不等於0,則表示還有線程正在執行,等待就OK啦。

我們來看看源碼,是否和我們猜想的一樣呢?

public void await() throws InterruptedException {
    // 可中斷地獲取共享鎖
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 嘗試獲取共享鎖
    if (tryAcquireShared(arg) < 0)
        // state還不是1
        doAcquireSharedInterruptibly(arg);
}

// 獲取鎖狀態,當state減為0時,返回1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 排入隊尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 線程在這里park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以發現await()方法和我們昨天看到的共享鎖解鎖過程一模一樣,符合我們的猜想。

所以,CountDownLatch的底層實現也是依靠AQS來完成的,現在大家肯定對於AQS有更深刻的認識了。

區別

我們現在來對比一下Thread.join()和CountDownLatch有哪些區別:

  • Thread.join()是Thread類的一個方法,而CountDownLatch是JUC包中的一個工具類;
  • Thread.join()的實現是依靠Object的wait()和notifyAll()來完成的,而CountDownLatch是通過AQS完成的;
  • Thread.join()只支持讓一個線程等待,不支持同時等待多個線程,而CountDownLatch可以支持,所以CountDownLatch的效率要更高。

好的,本期內容就到這里,我們下期見。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM