並發編程之:JUC並發控制工具


大家好,我是小黑,一個在互聯網苟且偷生的農民工。

在上一期我們講了Thread.join()方法和CountDownLatch,這兩者都可以做到等待一個線程執行完畢之后當前線程繼續執行,並且CountDownLatch要更優秀,能滿足同時等待多個線程執行,我們通過查看源碼知道CountDownLatch是通過AQS實現的。

那么在java.util.concurrent包中除了像CountDownLatch這樣的並發控制工具外,還有哪些呢?今天帶大家一起來看一看。

CountDownLatch

等待一個或多個線程直到線程中執行的一組操作完成的同步輔助工具。

CountDownLatch從字面理解為“計數器“,回顧昨天的內容,CountDownLatch可以實現等待其他線程執行,並且可以指定等待時間。

舉個例子,比如有一個考試,在開考之前老師要等學生到考場,如果所有學生都提前到達,老師可以提前發試卷,但是如果到考試時間有學生還沒有到,老師則可以不等,直接開始,我們通過CountDownLatch來模擬一下。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch count = new CountDownLatch(5);
        for (int i = 1; i <= 5; i++) {
            new Student("學生" + i, count).start();
        }
        // 只等待5秒,5秒之后開始發試卷
        count.await(5, TimeUnit.SECONDS);
        System.out.println("所有學生已到達,老師開始發卷子~");
    }
}

class Student extends Thread {
    private final CountDownLatch count;

    public Student(String name, CountDownLatch count) {
        super(name);
        this.count = count;
    }
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + "到達考場~");
            count.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image-20210903190932075

首先我們看一下代碼,主線程中老師等待的時間是5秒,所以5秒中之后開始考試;每個Student在run方法中會sleep() 2秒模擬每個學生到達花費的時間最少是2秒;

我們從結果來看,5個學生都在老師開發發卷子之前到達了考場,說明5個Student到達考場的時間並沒有超過5秒,所以肯定的是這5個線程不是串行執行的;

老師在等到之后確實開始考試了;如果把老師等待的時間往小調整,或者增大某個線程到達考場的時間,會發現會在到達考場之前開始發卷子,篇幅原因這里就不放代碼了。通過這個例子想必你已經掌握了CountDownLatch的用法。

Semaphore

一個計數信號量。 在概念上,信號量維持一組許可證。

Semaphore字面意思翻譯是信號量。信號量通常用於限制線程數量,而不是限制訪問某些共享資源。

我們還是通過生活中的例子來模擬,比如說,我們去座摩天輪,一個摩天輪上能容納的游客人數是固定的,所以在有人要上去之前需要先看是否還有剩余的空位,如果有則放行,如果沒有則讓游客等待,直到有人從摩天輪上離開。我們使用Semaphore來模擬這個場景。

public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        for (int i = 1; i <= 5; i++) {
            new Visitor("游客" + i, semaphore).start();
        }
    }

}

class Visitor extends Thread {
    private Semaphore semaphore;

    public Visitor(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "坐上了摩天輪,真開心~");
            TimeUnit.SECONDS.sleep(2);
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "從摩天輪下來了-------");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image-20210902231424496

從運行結果我們可以看出,游客1,2,3同時坐上去,但是游客4,5這時候是沒有坐上去的,只能等有人下來之后,游客4,5才能再上去。因為我們在創建Semaphore時只給了3個許可,當1,2,3占用之后,4,5獲取不到,只能等待許可再次可用時才能獲取。

我們來看看Semaphore都有哪些方法。

構造方法

// 指定許可數量
new Semaphore(3);
// 指定許可數量,同時設置等待線程用公平的方式獲取許可
new Semaphore(3, true);
// 指定許可數量,同時設置等待線程用非公平的方式獲取許可,默認為false
new Semaphore(3, false);

成員方法

// 獲取許可,默認只獲取1個,阻塞直到獲取成功,或線程中斷interrupted
semaphore.acquire();
// 獲取給定數量的許可,阻塞直到獲取成功,或線程中斷interrupted
semaphore.acquire(2);
// 和acquire()一樣,但是不可以被中斷
semaphore.acquireUninterruptibly();
// 和acquire(2)一樣,但是不可以被中斷
semaphore.acquireUninterruptibly(2);
// 嘗試獲取1個許可,如果成功則返回true,失敗則立馬返回false
semaphore.tryAcquire();
// 嘗試獲取給定數量的許可,如果成功則返回true,失敗則立馬返回false
semaphore.tryAcquire(2);
// 嘗試獲取1個許可知道超時,如果獲取成功返回true,反之返回false
semaphore.tryAcquire(5, TimeUnit.SECONDS);
// 嘗試獲取給定數量的許可知道超時,如果獲取成功返回true,反之返回false
semaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
// 釋放1個許可
semaphore.release();
// 釋放給定數量的許可
semaphore.release(2);

如果看過我之前AQS源碼解析的朋友應該能猜到,Semaphore的底層也是通過AQS來實現的,是使用AQS的共享鎖相關的實現。

感興趣的同學可以回顧這篇文章。

CyclicBarrier

允許一組線程全部等待彼此達到共同屏障點的同步輔助工具。

CyclicBarrier從字面理解為“循環柵欄”,可以理解為一個可以循環使用的屏障。它的作用就是等待一組線程都完成執行后再進行下一步。

老樣子,我們再來舉個例子(我咋有這么多例子可把我牛逼壞了)。

我們進場會和朋友聚餐,那江湖規矩,要等大家都到了,才能開始吃,等大家都吃的差不多了,大家一起散場。有沒有發現可上面考試的例子有點像。

我們來使用CyclicBarrier來模擬一下這個場景。

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 1; i <= 3; i++) {
            new BBQer("吃貨" + i, cyclicBarrier).start();
        }
    }
}


class BBQer extends Thread {
    CyclicBarrier cyclicBarrier;

    public BBQer(String name, CyclicBarrier cyclicBarrier) {
        super(name);
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {

        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到達戰場~");
			// 等待其他人到場
            cyclicBarrier.await();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已飢餓難耐了,開始戰斗~");
            TimeUnit.SECONDS.sleep(1);
			// 等待其他人吃完
            cyclicBarrier.await();
            System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃飽喝足,回家睡覺");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

image-20210902234149511

通過這里例子大家應該明白CyclicBarrier的使用場景了吧,主要體現在可以循環上,這一點和CountDownLatch有很大的區別,CountDownLatch是一個計數器,只能有一次計數完成,之后不能再繼續歸零計算了。而CyclicBarrier可以循環設置這個屏障。

我們再來看一下CyclicBarrier都有哪些常用的方法。

構造方法

// 創建有指定個數線程的循環屏障
new CyclicBarrier(3);
// 創建有指定個數線程的循環屏障,在所有線程到達屏障后,運行Runnable的run()方法
new CyclicBarrier(3, Runnable);

成員方法

// 等其他所有線程到達
cyclicBarrier.await();
// 等所有線程到達,超時只有放棄等待
cyclicBarrier.await(1, TimeUnit.SECONDS);
// 獲取當前正在等待的線程數
cyclicBarrier.getNumberWaiting();
// 重置等待狀態到初始狀態
cyclicBarrier.reset();

Phaser

一個可重復使用的同步屏障,功能類似於CyclicBarrier和CountDownLatch但支持更靈活的使用。

Phaser從字面意思可以理解為”階段器“。通過上面這段話可以感覺到要比CyclicBarrier和CountDownLatch更牛逼一些,更加的靈活。

我們這次不重新舉新的例子,還用上面的吃飯的例子,加入說我們上面吃飯的例子,如果說在大家開始吃的時候,另一個朋友打電話說他也要來,這時候總不能不讓來吧,應該讓他來和我們一起吃,並且吃完一起走。

而這種場景通過上面說到的CountDownLatch,CyclicBarrier還是Semaphore都是不能做到的,我們來看看使用Phaser如何解決。

public class PhaserDemo {

    public static void main(String[] args) throws InterruptedException {
        // 剛開始飯局是3個人
        Phaser phaser = new Phaser(3);
        for (int i = 1; i <= 3; i++) {
            new Foodie("吃貨" + i, phaser).start();
        }
        TimeUnit.SECONDS.sleep(2);
        phaser.register();
        new Foodie("新來的", phaser).start();
        System.out.println("飯局人數:" + phaser.getRegisteredParties());
    }
}


class Foodie extends Thread {
    private Phaser phaser;

    public Foodie(String name, Phaser phaser) {
        super(name);
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            phaseOne();
            phaseTwo();
            phaseThree();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void phaseOne() throws InterruptedException {
        // 新來的不用等其他人
        if (Thread.currentThread().getName().equals("新來的")) {
            return;
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到達戰場~");
        // 到達飯局並加入等待
        phaser.arriveAndAwaitAdvance();
    }

    public void phaseTwo() throws InterruptedException {
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已飢餓難耐了,開始戰斗~");
        TimeUnit.SECONDS.sleep(2);
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃完了~");
        phaser.arriveAndAwaitAdvance();
    }

    public void phaseThree() {
        System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "回家睡覺");
        phaser.arriveAndDeregister();
    }
}

image-20210903004401888

從運行結果我們可以看到,剛開始1,2,3已經開始戰斗了(完成了第一個階段,進入第二個階段),這是后來了個新朋友加入,那飯局的人數變成了4人,然后再逐步完成后面的階段。

同樣我們來看一下Phaser的方法。

構造方法

// 創建一個階段器,沒有注冊方,沒有父級和初始階段
Phaser()
// 創建一個階段器,指定注冊方數量
Phaser(int parties)
// 相當於 Phaser(Phaser parent, 0)
Phaser(Phaser parent)
// 創建一個階段器,通過給定的父級階段器和給定的注冊方數
Phaser(Phaser parent, int parties)

成員方法

// 抵達這個階段,並不等待別人到達
arrive()
// 到達並阻塞等待其他到達
arriveAndAwaitAdvance()
// 到達並注銷
arriveAndDeregister()
// 等待從指定階段進入,如果不在該階段或階段器已終止則立即返回
awaitAdvance(int phase)
//同awaitAdvance(int phase),可被中斷
awaitAdvanceInterruptibly(int phase)
//同awaitAdvance(int phase),可被中斷,可超時
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
// 添加一個新的未到達節點
register()
// 添加指定個數新的未到達節點
bulkRegister(int parties)
// 強制終止階段器
forceTermination()
// 獲取到達數量
getArrivedParties()
// 獲取父階段器
getParent()
// 獲取根階段器
getRoot()
// 返回當前階段數
getPhase()
// 返回已注冊了的節點
getRegisteredParties()
// 返回未到達的節點
getUnarrivedParties()
// 判斷是否終止
isTerminated()
//在即將進行的節點提前執行動作的可覆蓋方法,並控制終止。
onAdvance(int phase, int registeredParties)

總結

我們來簡單總結一下,今天主要介紹JUC包中的線程同步工具。

CountDownLatch:主要用於計數,可完成等待多個線程執行,計數器每次減1,減到0之后釋放等待線程;歸零后無法重置,不可重復利用;

Semaphore:通常用於限制資源訪問,如限流,通過acquire()獲取后加1,release()之后減1,沒有許可時獲取會阻塞;

CyclicBarrier:循環屏障,相比CountDownLatch,await()方法每次加1,加到指定值釋放等待線程;加到指定值之后會重置,可循環利用;

Phaser:支持CountDownLatch和CyclicBarrier的功能,可以做到替換,並且可以動態的添加或減少設定值。


好的,本期內容就到這里,我們下期見,關注我的公眾號【小黑說Java】,更多干貨內容。

8cm二維碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM