本文部分摘自《Java 並發編程的藝術》
volatile 和 synchronize 關鍵字
每個處於運行狀態的線程,如果僅僅是孤立地運行,那么它產生的作用很小,如果多個線程能夠相互配合完成工作,則將帶來更大的價值
Java 支持多個線程同時訪問一個對象或者對象的成員變量,使用 volatile 關鍵字可以保證被修飾變量的可見性,意味着任一線程對該變量的任何修改,其他線程都可以立即感知到
synchronize 關鍵字可以修飾方法或者同步塊,它主要確保多個線程在同一時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。synchronize 關鍵字的實現,本質是對一個對象的監視器(monitor)進行獲取,而這個獲取過程是排他的,也就是同一時刻只能有一個線程獲取到由 synchronize 所保護對象的監視器
任何一個對象都擁有自己的監視器,任意一個線程對 Object 的訪問(Object 由 synchronize 保護)的訪問,首先要獲得 Object 的監視器。如果獲取失敗,線程進入同步隊列,線程狀態變為 BLOCKED。當訪問 Object 的前驅(獲得了鎖的線程)釋放了鎖,則該釋放操作將喚醒阻塞在同步隊列中的線程,使其重新嘗試獲取監視器
等待 - 通知機制
一個線程修改了一個對象的值,另一個線程感知到變化,然后進行相應的操作,前者是生產者,后者是消費者,這種通信方式實現了解耦,更具伸縮性。在 Java 中為了實現類似的功能,我們可以讓消費者線程不斷地循環檢查變量是否符合預期,條件滿足則退出循環,從而完成消費者的工作
while(value != desire) {
Thread.sleep(1000);
}
doSomething();
睡眠一段時間的目的是防止過快的無效嘗試,這種實現方式看似能滿足需求,但存在兩個問題:
-
難以確保及時性
如果睡眠時間太長,就難以及時發現條件已經變化
-
難以降低開銷
如果降低睡眠時間,又會消耗更多的處理器資源
使用 Java 提供了內置的等待 - 通知機制能夠很好地解決上述問題,等待 - 通知的相關方法是任意 Java 對象都具備的
方法名稱 | 描述 |
---|---|
notify() | 通知一個在對象上等待的線程,使其從 wait() 方法返回,返回的前提是該線程獲取到了對象的鎖 |
notifyAll() | 通知所有等待在該對象上的線程 |
wait() | 調用該方法的線程進入 WAITING 狀態,只有等待另外的線程通知或被中斷才返回,調用此方法會釋放對象的鎖 |
wait(long) | 超時等待一段時間,參數時間是毫秒 |
wait(long, int) | 對於超時時間更細粒度的控制,可以達到納秒 |
等待 - 通知機制,是指一個線程 A 調用了對象 O 的 wait() 方法進入等待狀態,而另一個線程 B 調用了對象 O 的 notify() 或者 notifyAll() 方法,線程 A 收到通知后從對象 O 的 wait() 方法返回,進而執行后續操作。上述兩個線程通過對象 O 來完成交互,而對象上的 wait() 和 notify/notifyAll() 的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作
下述例子中,創建兩個線程 WaitThread 和 NotifyThread,前者檢查 flag 值是否為 false,如果符合要求,進行后續操作,否則在 lock 上等待,后者在睡眠一段時間后對 lock 進行通知
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
@Override
public void run() {
// 加鎖,擁有 lock 的 Monitor
synchronized (lock) {
// 繼續 wait,同時釋放 lock 的鎖
while (flag) {
try {
System.out.println(Thread.currentThread() + "flag is true. wait @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 完成工作
System.out.println(Thread.currentThread() + "flag is false. running @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
// 加鎖,擁有 lock 的 Monitor
synchronized (lock) {
// 獲取 lock 的鎖,然后進行通知,通知時不會釋放 lock 的鎖
// 直到當前線程釋放 lock 后,WaitThread 才能從 wait 方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加鎖
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
運行結果如下
上述結果的第三行和第四行順序可能會互換,下面簡單描述一下代碼的執行過程
- WaitThread 線程先啟動,NotifyThread 線程后啟動,由於中間有睡眠一秒的操作,所以 WaitThread 線程首先獲得鎖
- WaitThread 線程循環判斷條件是否滿足,不滿足則調用執行 lock.wait() 方法,釋放 lock 對象上的鎖,進入 lock 對象的等待隊列中,進入等待狀態
- 由於 WaitThread 線程釋放了鎖,所以 NotifyThread 獲得 lock 對象上的鎖,執行 lock.notifyAll() 方法,但並不會立即釋放鎖,只是通知所有等待在 lock 上的線程可以參與競爭鎖了(notify 也同理),並把 flag 設為 false,本段代碼執行結束,NotifyThread 線程釋放鎖,此時 WaitThread 線程和 NotifyThread 線程共同競爭 lock 的鎖
- 無論誰先拿到鎖,WaitThread 線程和 NotifyThread 線程都能順利完成任務
等待 - 通知機制的經典范式
從上節的內容中,我們可以提煉出等待 - 通知機制的經典范式,該范式分為兩部分,分別針對等待方(消費方)和通知方(生產者)
等待方遵循如下原則:
- 獲取對象上的鎖
- 如果條件不滿足,調用對象的 wait() 方法,被通知后仍要檢查條件
- 條件滿足則執行對應的邏輯
偽代碼如下:
synchronized(對象) {
while(條件不滿足) {
對象.wait();
}
對應的處理邏輯
}
通知方遵循如下原則:
- 獲取對象上的鎖
- 改變條件
- 通知所有等待在對象上的線程
偽代碼如下:
synchronized(對象) {
改變條件
對象.notifyAll();
}
Thread.join() 的使用
如果一個線程 A 執行了 thread.join() 語句,其含義是:當前線程 A 等待 thread 線程終止之后才從 thread.join() 返回
下面的例子中,創建 10 個線程,編號為 0 ~ 9,每個線程線程調用前一個線程的 join() 方法,也就是線程 0 結束了,線程 1 才能從 join() 方法中返回,而線程 0 需要等待 main 線程結束
public class Join {
public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每個線程擁有前一個線程的引用,需要等待前一個線程終止,才能從等待中返回
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " terminate.");
}
}
}
輸出如下
從上述輸出可以看到,每個線程等待前驅線程終止后,才從 join() 方法返回,這里涉及了等待 - 通知機制(等待前驅線程結束,接收前驅線程結束通知)
Thread.join() 源碼簡化如下
public final synchronized void join(long millis) throws InterruptedException {
// 省略前面代碼...
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
// 省略后面代碼...
}
假設當前線程是 main 線程,main 線程調用 thread.join() 方法,則 main 線程會獲取 thread 線程對象上的鎖,並循環判斷 thread 線程是否還存活,如果存活,調用 wait 方法釋放鎖並等待,如果 thread 線程已經結束,則 thread 線程會自動調用自身 notifyAll() 方法通知所有等待在自身線程對象上的線程,則 main 線程可以繼續執行后續操作