Semaphore、CountDownLatch和CyclicBarrier


這三者都是java並發包的工具類,提供了比synchronized更加高級的各種同步結構,可以實現更加豐富的多線程操作。
 

Semaphore

信號量,我們應該都在操作系統課程里學過,它是解決進程間通信和同步的常用工具,也是一種常見的模型。信號量是一個確定的二元組(s, q), s是正整型變量,q是初始狀態為空的隊列,s代表並發狀態,操作系統利用信號量的狀態s管理並發進程。如果s<=0,進程阻塞,如果s>0,進程繼續執行。為了實現對s值的修改,操作系統提供了P、V操作原語,P的操作包括:s值減1,若s<=0,則進程阻塞,並將該進程插入到等待隊列q中,V操作:s值加1,若s<=0,從等待隊列中移出一個進程,解除其阻塞狀態。

Java提供了經典信號量(Semaphore)的實現,它通常用於控制線程數來達到限制共享資源訪問的目的。

下面用信號量實現生產者消費者模型來演示下用法:

public class SemaphoreDemo {

    private static volatile int count = 0 ;
    private static final Semaphore full = new Semaphore(5);
    private static final Semaphore empty = new Semaphore(0);
    private static final Semaphore mutex = new Semaphore(1);
    
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        threadPool.execute(() -> {
            while(true){
                try {
                    full.acquire();
                    mutex.acquire();
                    count++;
                    System.out.println("生產了1個資源, 目前還有" + count + "個資源");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    empty.release();
                }
            }
        });

        threadPool.execute(() -> {
            while(true){
                try {
                    empty.acquire();
                    mutex.acquire();
                    count--;
                    System.out.println("消費了1個資源, 目前還有" + count + "個資源");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    full.release();
                }
            }
        });
    }
}

full信號量限制了最多生產的資源數量,empty信號量限制了資源為空無法消費,mutex信號量相當於互斥鎖,避免count讀寫不一致。總的來說,可以看出Semaphore就是個計數器,其基本邏輯基於acquire/release,acquire獲取資源若獲取不到則阻塞,release釋放資源並喚醒阻塞的線程,信號量構造方法中還有個公平的參數這里沒有演示。

 

CountDownLatch

作用是允許一個或多個線程等待直到其他線程中的某個操作完成。使用給定數值初始化CountDownLatch,調用countDown方法會減少計數器的值,調用await方法會阻塞直到當前計數達到零,之后釋放所有等待的線程,但是注意計數器到0的時候不會自動重置(這也是和CyclicBarrier的區別之一)。

如果現在有這樣一個場景,一批人叫了很多出租車,但是一車只能坐5個人,坐完后等下一輛車繼續,用CountDownLatch如何實現?

代碼如下:

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(6);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        for(int i = 0; i < 5; i++){
            executor.execute(() -> {
                System.out.println("First batch has executed!");
                latch.countDown();
            });
        }

        for(int i = 0; i < 5; i++){
            executor.execute(() -> {
                try {
                    latch.await();
                    System.out.println("Second batch has executed!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while(latch.getCount() != 1){
            Thread.sleep(1000);
        }
        latch.countDown();
        System.out.println("Wait for first batch finish!");

    }
}

執行結果如下:

第一批執行完后,由於第二批代碼中調用了await方法所以阻塞,直到coutdownlatch的計數器減到0為止。

 

CyclicBarrier

一種同步輔助工具,允許一組線程全部等待到達某個屏障。用這個工具類實現上述場景:

public class CyclicBarrierDemo {

    public static void main(String[] args) {
       //final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> new Thread(() -> System.out.println("Wait for batch finish!")).start());
       final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("Wait for batch finish!"));

        for(int i = 0; i < 10; i++){
            new Thread(() -> {
                try {
                    System.out.println("The batch has executed!");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

執行結果如下:

 

CyclicBarrier支持一個回調函數,每當一輪線程結束后,下一輪線程開始前,這個回調函數都會被調用一次,而且這個回調函數是執行在一個回合里最后執行await()的線程上。

注意上述代碼中注釋了一行,注釋的寫法意味着新開一個線程執行回調函數,那么回調函數會異步執行。

 

CountDownLatch和CyclicBarrier的區別

CountDownLatch是不可以重置的,所以無法重用;而CyclicBarrier則沒有這種限制,可以重用。 CountDownLatch的基本操作組合是countDown/await。調用await的線程等待countDown直到足夠的次數,不管是在一個線程還是多個線程里countDown。

CyclicBarrier的基本操作是await,當所有的線程都調用了await,才會繼續進行任務,並自動進行重置。注意,正常情況下,CyclicBarrier的重置都是自動發生的,如果我們調用reset方法,但還有線程在等待,就會導致等待線程被打擾,拋出BrokenBarrierException異常。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM