Semaphore
信號量,我們應該都在操作系統課程里學過,它是解決進程間通信和同步的常用工具,也是一種常見的模型。信號量是一個確定的二元組(s, q), s是正整型變量,q是初始狀態為空的隊列,s代表並發狀態,操作系統利用信號量的狀態s管理並發進程。如果s<=0,進程阻塞,如果s>0,進程繼續執行。為了實現對s值的修改,操作系統提供了P、V操作原語,P的操作包括:s值減1,若s<=0,則進程阻塞,並將該進程插入到等待隊列q中,V操作:s值加1,若s<=0,從等待隊列中移出一個進程,解除其阻塞狀態。
Java提供了經典信號量(Semaphore)的實現,它通常用於控制線程數來達到限制共享資源訪問的目的。
下面用信號量實現生產者消費者模型來演示下用法:
public class SemaphoreDemo {
private static volatile int count = 0 ;
private static final Semaphore full = new Semaphore(5);
private static final Semaphore empty = new Semaphore(0);
private static final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
threadPool.execute(() -> {
while(true){
try {
full.acquire();
mutex.acquire();
count++;
System.out.println("生產了1個資源, 目前還有" + count + "個資源");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
empty.release();
}
}
});
threadPool.execute(() -> {
while(true){
try {
empty.acquire();
mutex.acquire();
count--;
System.out.println("消費了1個資源, 目前還有" + count + "個資源");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
full.release();
}
}
});
}
}
full信號量限制了最多生產的資源數量,empty信號量限制了資源為空無法消費,mutex信號量相當於互斥鎖,避免count讀寫不一致。總的來說,可以看出Semaphore就是個計數器,其基本邏輯基於acquire/release,acquire獲取資源若獲取不到則阻塞,release釋放資源並喚醒阻塞的線程,信號量構造方法中還有個公平的參數這里沒有演示。
CountDownLatch
作用是允許一個或多個線程等待直到其他線程中的某個操作完成。使用給定數值初始化CountDownLatch,調用countDown方法會減少計數器的值,調用await方法會阻塞直到當前計數達到零,之后釋放所有等待的線程,但是注意計數器到0的時候不會自動重置(這也是和CyclicBarrier的區別之一)。
如果現在有這樣一個場景,一批人叫了很多出租車,但是一車只能坐5個人,坐完后等下一輛車繼續,用CountDownLatch如何實現?
代碼如下:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
ExecutorService executor = Executors.newFixedThreadPool(10);
for(int i = 0; i < 5; i++){
executor.execute(() -> {
System.out.println("First batch has executed!");
latch.countDown();
});
}
for(int i = 0; i < 5; i++){
executor.execute(() -> {
try {
latch.await();
System.out.println("Second batch has executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
while(latch.getCount() != 1){
Thread.sleep(1000);
}
latch.countDown();
System.out.println("Wait for first batch finish!");
}
}
執行結果如下:

第一批執行完后,由於第二批代碼中調用了await方法所以阻塞,直到coutdownlatch的計數器減到0為止。
CyclicBarrier
一種同步輔助工具,允許一組線程全部等待到達某個屏障。用這個工具類實現上述場景:
public class CyclicBarrierDemo {
public static void main(String[] args) {
//final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> new Thread(() -> System.out.println("Wait for batch finish!")).start());
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("Wait for batch finish!"));
for(int i = 0; i < 10; i++){
new Thread(() -> {
try {
System.out.println("The batch has executed!");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
執行結果如下:
CyclicBarrier支持一個回調函數,每當一輪線程結束后,下一輪線程開始前,這個回調函數都會被調用一次,而且這個回調函數是執行在一個回合里最后執行await()的線程上。
注意上述代碼中注釋了一行,注釋的寫法意味着新開一個線程執行回調函數,那么回調函數會異步執行。
CountDownLatch和CyclicBarrier的區別:
CountDownLatch是不可以重置的,所以無法重用;而CyclicBarrier則沒有這種限制,可以重用。 CountDownLatch的基本操作組合是countDown/await。調用await的線程等待countDown直到足夠的次數,不管是在一個線程還是多個線程里countDown。
CyclicBarrier的基本操作是await,當所有的線程都調用了await,才會繼續進行任務,並自動進行重置。注意,正常情況下,CyclicBarrier的重置都是自動發生的,如果我們調用reset方法,但還有線程在等待,就會導致等待線程被打擾,拋出BrokenBarrierException異常。
