信號量同步是指在不同線程之間,通過傳遞同步信號量來協調線程執行的先后次序。CountDownLatch是基於時間維度的Semaphore則是基於信號維度的。
1:基於執行時間的同步類CountDownLatch
例如現有3台服務器,需編寫一個獲取各個服務器狀態的接口,准備開三個子線程每個線程獲取一台服務器狀態后統一返回三台服務器狀態。主線程內定義計數器為3的CountDownLatch實例,各個子線程添加CountDownLatch實例引用,子線程執行完后對CountDownLatch進行countDown。主線程調用CountDownLatch實例的await方法等待所有子線程執行完后返回結果。不考慮異常情況的代碼示例如下。
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(3);
Thread getServer1Status = new GetDataStatusThread("服務器1", count);
Thread getServer2Status = new GetDataStatusThread("服務器2", count);
Thread getServer3Status = new GetDataStatusThread("服務器3", count);
getServer1Status.start();
getServer2Status.start();
getServer3Status.start();
//await 使當前線程等待直至CountDownLatch的計數為0,除非線程中斷
//count.await();
//await(long timeout, TimeUnit unit) 使當前線程等待直至CountDownLatch的計數為0,除非線程中斷或經過指定的等候時間
count.await(10,TimeUnit.SECONDS);
System.out.println("所有服務器狀態獲取完成");
}
}
class GetDataStatusThread extends Thread {
private final CountDownLatch count;
public GetDataStatusThread(String threadName, CountDownLatch count) {
this.setName(threadName);
this.count = count;
}
@Override
public void run() {
System.out.println("獲取" + this.getName() + "狀態成功");
//遞減CountDownLatch的計數,如果計數達到零,則釋放所有等待的線程
count.countDown();
}
}
注意:CountDownLatch的await方法建議使用帶超時間的。不使用帶超時時間await的線程若計數器初始值設置的值達不到countDown(遞減計數器計數)次數則該線程會一直等待至計數為0,除非線程中斷
(例如子線程執行過程中出現異常執行不到countDown方法,順便補充若子線程會拋出異常且該異常沒有被主線程捕獲到可通過線程方法setUncaughtExceptionHandler()捕獲)。
2:基於空閑信號的同步類Semaphore
Semaphore可看作一個管理“許可證”的池,創建Semaphore實例時指定許可證數量,所有包含Semaphore實例引用的線程運行時通過acquire方法獲取許可證,運行完成后通過release方法釋放許可證。獲取不到許可證等線程直到獲取到空閑許可證才會執行。如下代碼所示:某景區只有兩個買票窗口(許可池大小)所有游客排隊進行買票,准備買票的游客通過acquire占據當前窗口買票完成准備離開通過release方法表示當前窗口已空閑。
public class Window { public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); for (int i = 1; i <= 6; i++) { new ByTicketThread("客戶" + i, semaphore).start(); } } } class ByTicketThread extends Thread { private final Semaphore semaphore; public ByTicketThread(String threadName, Semaphore semaphore) { this.setName(threadName); this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println(this.getName() + "正在買票。"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println(this.getName() + "買票完成。"); } } }