前言
最近在看一些JUC下的源碼,更加意識到想要學好Java多線程,基礎是關鍵,比如想要學好ReentranLock源碼,就得掌握好AQS源碼,而AQS源碼中又有很多Java多線程經典的一些應用;再比如看了線程池的核心源碼實現,又學到了很多核心實現,其實這些都可以提出來慢慢消化並變成自己的知識點,今天這個Java等待/通知模式其實是Thread.join()實現的關鍵,還有線程池工作線程中線程跟線程之間的通信的核心所在,故在此為了加深理解,做此記錄!
推薦結合學習JUC源碼(3)——Condition等待隊列(源碼分析結合圖文理解)一起學習!
參考資料《Java並發編程藝術》(電子PDF版),有需要的朋友的可以私信或者評論
一、什么是Java線程的等待/通知模式
1、等待/通知模式概述
首先先介紹下官方的一個正式的介紹:
等待/通知機制,是指一個線程A調用了對象object的wait()方法進入等待狀態,而另一個線程B調用了對象object的notify或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而還行后續操作。
而我的理解是(舉例說明):
假設工廠里有兩條流水線,某個工作流程需要這兩個流水線配合完成,這兩個流水線分別是A和B,其中A負責准備各種配件,B負責租裝配件之后產出輸出到工作台。B的工作需要A的配件准備充分,否則就會一直等待A准備好配件,並且A准備好配件后會通過一個開頭通知告訴B我已經准備好了,你那邊不用一直等待了,可以繼續執行任務了。流程A與流程B就是對應的線程A與線程B之間的通信,即可以理解為相互配合,具體也就是“”通知/等待“”機制!
2、需要注意的細節
那么,我們都知道超類Object有wait()方法與notify()/notifyAll()方法,在進行正式代碼舉例之前,應該先加深下對這三個方法的理解與一些細節(有一些細節確實容易被忽略)
- 調用wait()方法,會釋放鎖(這一點我想大部分人都知道),線程狀態由RUNNING->WAITNG,當前線程進入對象等待隊列中;
- 調用notify()/notifyAll()方法不會立馬釋放鎖(這一點我大家人也應該知道,但是什么時候釋放鎖呢?--------請看下一條),notify()方法是將等待隊列中的線程移到同步隊列中,而notifyAll()則是全部移到同步隊列中,被移出的線程狀態WAITING-->BLOCKED;
- 當前調用notify()/notifyAll()的線程釋放鎖了才算釋放鎖,才有機會喚醒wait線程返回(為什么有才有機會返回呢?------繼續看下一條)
- 從wait()返回的前提是必須獲得調用對象鎖,也就是說notify()與notifyAll()釋放鎖之后,wait()進入BLOCKED狀態,如果其他線程有競爭當前鎖的話,wait線程繼續爭取鎖資格(不好理解的話,請看下面的代碼舉例)
- 使用wait()、notify()、notifyAll()方法時需要先調對象加鎖(這可能是最容易忽視的點了,至於為什么,請先看了代碼之后,看本篇博文最后補充:wait()、notify()、notifyAll()加鎖的原因----防止線程即飢餓)
二、代碼舉例
1、結合代碼理解
結合上述的“工廠流程裝配配件並產出的例子”,我們有兩個線程(流水線)WaitThread與NotifyThread、其中WaitThread是被通知的任務,完成主要的工作(組裝配件完成產品),需要時刻判斷標志位(開關);NotifyThread是需要通知的任務,需要對WaitThread進行“監督通知”,兩個配合才能更好完成產品的組裝並輸出。
public class WaitNotify { static Object lock = new Object(); static boolean flag = false; public static void main(String[] args) { new Thread(new WaitThread(), "WaitThread").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new NotifyThread(), "NotifyThread").start(); } /** * 流水線A,完成主要任務 */ static class WaitThread implements Runnable{ @Override public void run() { // 獲取object對象鎖 synchronized (lock){ // 條件不滿足時一直在等,等另外的線程改變該條件,並通知該wait線程 while (!flag){ try { System.out.println(Thread.currentThread() + " is waiting, flag is "+flag); // wait()方法調用就會釋放鎖,當前線程進入等待隊列。 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // TODO 條件已經滿足,不繼續while,完成任務 System.out.println(Thread.currentThread() + " is running, flag is "+flag); } } } /** * 流水線B,對開關進行控制,並通知流水線A */ static class NotifyThread implements Runnable{ @Override public void run() { // 獲取等wait線程同一個object對象鎖 synchronized (lock){ flag = true; // 通知wait線程,我已經改變了條件,你可以繼續返回執行了(返回之后繼續判斷while) // 但是此時通知notify()操作並立即不會釋放鎖,而是要等當前線程釋放鎖 // TODO 我准備好配件了,我需要通知全部的組裝流水線A..... lock.notifyAll(); System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag); } } } }
運行main函數,輸出:
Thread[WaitThread,5,main] is waiting, flag is false Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true Thread[WaitThread,5,main] is running, flag is true
車床流水工作開啟,流水線的開關一開始是關閉的(flag=false),流水線B(NotifyThread)去開啟后,開始自動喚醒流水線A(WaitThread),整個流水線開始工作了......
- Thread[WaitThread,5,main] is waiting, flag is false: 一開始流水線A發現自己沒有配件可租裝,所以等流水線A准備好配件(這樣是不是覺得特別傻,哈哈哈,真正的流水線不會浪費時間等的,而且會有很多條流水線B准備配件的,這里只是舉例說明,望理解!);
- Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true:流水線B准備好了配件,開啟開關(flag=ture),並通知流水線A,讓流水線A開始工作;
- Thread[WaitThread,5,main] is running, flag is true,流水線B收到了通知,再次檢查開關是否開啟了,開啟的話就開始返回繼續完成工作了。
其實結合上述我舉的例子還是很好理解的,下面是大概的一個粗略時序圖:
2、擴展理解----wait()返回的前提是獲得了鎖
上述已經表達了這個注意的細節:從wait()返回的前提是必須獲得調用對象鎖,我們再增加能競爭lock的同步代碼塊(紅字部分)。
public class WaitNotify { static Object lock = new Object(); static boolean flag = false; public static void main(String[] args) { new Thread(new WaitThread(), "WaitThread").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new NotifyThread(), "NotifyThread").start(); } /** * 流水線A,完成主要任務 */ static class WaitThread implements Runnable{ @Override public void run() { // 獲取object對象鎖 synchronized (lock){ // 條件不滿足時一直在等,等另外的線程改變該條件,並通知該wait線程 while (!flag){ try { System.out.println(Thread.currentThread() + " is waiting, flag is "+flag); // wait()方法調用就會釋放鎖,當前線程進入等待隊列。 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // TODO 條件已經滿足,不繼續while,完成任務 System.out.println(Thread.currentThread() + " is running, flag is "+flag); } } } /** * 流水線B,對開關進行控制,並通知流水線A */ static class NotifyThread implements Runnable{ @Override public void run() { // 獲取等wait線程同一個object對象鎖 synchronized (lock){ flag = true; // 通知wait線程,我已經改變了條件,你可以繼續返回執行了(返回之后繼續判斷while) // 但是此時通知notify()操作並立即不會釋放鎖,而是要等當前線程釋放鎖 // TODO 我准備好配件了,我需要通知全部的組裝流水線A..... lock.notifyAll(); System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag); } // 模擬跟流水線B競爭 synchronized (lock){ System.out.println(Thread.currentThread() + " hold lock again"); } } } }
輸出結果:
Thread[WaitThread,5,main] is waiting, flag is false Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true Thread[NotifyThread,5,main] hold lock again Thread[WaitThread,5,main] is running, flag is true
其中第三條跟第四條順序可能會反着來的,這就是因為lock鎖可能被紅字部分的synchronized代碼塊競爭獲取(這樣wait()方法可能獲取不到lock鎖,不會返回),也可能被waitThread獲取從wait()方法返回。
Thread[WaitThread,5,main] is waiting, flag is false Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true Thread[WaitThread,5,main] is running, flag is true Thread[NotifyThread,5,main] hold lock again
三、等待/通知模式的應用
1、Thread.join()中源碼應用
Thread.join()作用:當線程A等待thread線程終止之后才從thread.join()返回, 每個線程終止的前提是前驅線程終止,每個線程等待前驅線程終止后,才從join方法返回,這里涉及了等待/通知機制(等待前驅線程結束,接收前驅線程結束通知)。
Thread.join()源碼中,使用while選好判斷前驅線程是否活着,如果前驅線程還活着就一直wait等待,當然如果超時的話就直接返回。
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } // 這里的while(){wait(millis)} 就是利用等待/通知中的等待模式,只不過加上了超時設置 if (millis == 0) { // while循環,當線程還活着的時候就一直循環等待,直到線程終止 while (isAlive()) { // wait等待 wait(0); } // 條件滿足時返回 } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
2、其它的應用
線程池的本質是使用一個線程安全的工作隊列連接工作者線程和客戶端線程,客戶端線程將任務放入工作隊列后便返回,而工作者線程則不斷地從工作隊列中取出工作並執行。那么,在這里的等待/通知模式的應用就是:
工作隊列中線程job沒有的話也就是工作隊列為空的情況下,等待客戶端放入工作隊列線程任務,並通知工作線程繼續從工作隊列中獲取線程執行。
注:關於線程池的應用源碼這里不做介紹,因為一時也講不完(自己也還沒有完全消化),先簡單介紹下應用到的地方還有概念。
補充:其實數據庫的連接池也類似線程池這種工作流程,也會涉及等待/通知模式。
3、等待/通知范式
介紹了那么多應用,這種模式應該有個統一的范式來套用。對的,必然是有的:
對於等待者(也可以稱之為消費者):
synchronized (對象lock) { while (條件不滿足) { 對象.wait(); } // TODO 處理邏輯 }
對於通知者(也可以稱之為生產者):
synchronized (對象lock) { while (條件滿足) { 改變條件 對象.notify(); } }
注意:實際開發中最好采用的是超時等待/通知模式,在thread.join()源碼方法中完美體現
四、wait()、notify()、notifyAll()使用前需要加鎖的原因----防止線程即飢餓
(1)其實根據wait()注意事項也能明白,wait()是釋放鎖的,那么不加鎖哪來釋放鎖!
(2)wait()與notify()或者notifyAll()必須是搭配一起使用的,否則線程調用object.wait()之后,沒有超時機制,也沒有調用notify()或者notifyAll()喚醒的話,就一直處於WAITING狀態,造成調用wait()的線程一直都是飢餓狀態。
(3)由於第2條的,我們已知:即便我們使用了notify()或者notifyAll()去喚醒線程,但是沒有在適當的時機喚醒(比如調用wait()之前就喚醒了),那么仍然調用wait()線程處於WAITING狀態,所以我們必須保證wait()方法要么不執行,要么就執行完在被喚醒。也就是下列代碼中1那里不能允許插入調用notify/notifyAll,自然而然就增加synchronized關鍵字,保證wait()操作整體執行不被破壞!
synchronized (對象lock) { while (條件不滿足) { // 1 這里如果先執行了notify/notifyAll方法,那么2執行之后,該線程就一直WAITING 對象.wait(); // 2 } // TODO 處理邏輯 }
用圖片展示執行順序就是:
(4)注意synchronized代碼塊中,代碼錯誤或者其它原因線程終止的話,沒有執行到wait()方法的話,是會自動釋放鎖的,不必擔心會死鎖。
-------------------------------------------2020/12/1 補充--------------------------------------------------
這里介紹的等待/通知模式實際上就是Object的監視器方法(wait()、notify()等方法)配合Synchronized實現,而除此之外還有:
- Condition配合Lock實現的等待/通知模式(學習JUC源碼(3)——Condition等待隊列(源碼分析結合圖文理解))
- LockSupport阻塞park與喚醒unpark方式(AQS源碼中用到的多)