Semaphore
信號量,我們應該都在操作系統課程里學過,它是解決進程間通信和同步的常用工具,也是一種常見的模型。信號量是一個確定的二元組(s, q), s是正整型變量,q是初始狀態為空的隊列,s代表並發狀態,操作系統利用信號量的狀態s管理並發進程。如果s<=0,進程阻塞,如果s>0,進程繼續執行。為了實現對s值的修改,操作系統提供了P、V操作原語,P的操作包括:s值減1,若s<=0,則進程阻塞,並將該進程插入到等待隊列q中,V操作:s值加1,若s<=0,從等待隊列中移出一個進程,解除其阻塞狀態。
Java提供了經典信號量(Semaphore)的實現,它通常用於控制線程數來達到限制共享資源訪問的目的。
下面用信號量實現生產者消費者模型來演示下用法:
public class SemaphoreDemo { private static volatile int count = 0 ; private static final Semaphore full = new Semaphore(5); private static final Semaphore empty = new Semaphore(0); private static final Semaphore mutex = new Semaphore(1); public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(10); threadPool.execute(() -> { while(true){ try { full.acquire(); mutex.acquire(); count++; System.out.println("生產了1個資源, 目前還有" + count + "個資源"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); empty.release(); } } }); threadPool.execute(() -> { while(true){ try { empty.acquire(); mutex.acquire(); count--; System.out.println("消費了1個資源, 目前還有" + count + "個資源"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); full.release(); } } }); } }
full信號量限制了最多生產的資源數量,empty信號量限制了資源為空無法消費,mutex信號量相當於互斥鎖,避免count讀寫不一致。總的來說,可以看出Semaphore就是個計數器,其基本邏輯基於acquire/release,acquire獲取資源若獲取不到則阻塞,release釋放資源並喚醒阻塞的線程,信號量構造方法中還有個公平的參數這里沒有演示。
CountDownLatch
作用是允許一個或多個線程等待直到其他線程中的某個操作完成。使用給定數值初始化CountDownLatch,調用countDown方法會減少計數器的值,調用await方法會阻塞直到當前計數達到零,之后釋放所有等待的線程,但是注意計數器到0的時候不會自動重置(這也是和CyclicBarrier的區別之一)。
如果現在有這樣一個場景,一批人叫了很多出租車,但是一車只能坐5個人,坐完后等下一輛車繼續,用CountDownLatch如何實現?
代碼如下:
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(6); ExecutorService executor = Executors.newFixedThreadPool(10); for(int i = 0; i < 5; i++){ executor.execute(() -> { System.out.println("First batch has executed!"); latch.countDown(); }); } for(int i = 0; i < 5; i++){ executor.execute(() -> { try { latch.await(); System.out.println("Second batch has executed!"); } catch (InterruptedException e) { e.printStackTrace(); } }); } while(latch.getCount() != 1){ Thread.sleep(1000); } latch.countDown(); System.out.println("Wait for first batch finish!"); } }
執行結果如下:
第一批執行完后,由於第二批代碼中調用了await方法所以阻塞,直到coutdownlatch的計數器減到0為止。
CyclicBarrier
一種同步輔助工具,允許一組線程全部等待到達某個屏障。用這個工具類實現上述場景:
public class CyclicBarrierDemo { public static void main(String[] args) { //final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> new Thread(() -> System.out.println("Wait for batch finish!")).start()); final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("Wait for batch finish!")); for(int i = 0; i < 10; i++){ new Thread(() -> { try { System.out.println("The batch has executed!"); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
執行結果如下:
CyclicBarrier支持一個回調函數,每當一輪線程結束后,下一輪線程開始前,這個回調函數都會被調用一次,而且這個回調函數是執行在一個回合里最后執行await()的線程上。
注意上述代碼中注釋了一行,注釋的寫法意味着新開一個線程執行回調函數,那么回調函數會異步執行。
CountDownLatch和CyclicBarrier的區別:
CountDownLatch是不可以重置的,所以無法重用;而CyclicBarrier則沒有這種限制,可以重用。 CountDownLatch的基本操作組合是countDown/await。調用await的線程等待countDown直到足夠的次數,不管是在一個線程還是多個線程里countDown。
CyclicBarrier的基本操作是await,當所有的線程都調用了await,才會繼續進行任務,並自動進行重置。注意,正常情況下,CyclicBarrier的重置都是自動發生的,如果我們調用reset方法,但還有線程在等待,就會導致等待線程被打擾,拋出BrokenBarrierException異常。