3 Java中wait/notify
3-1 原理
如上圖所示:
step1:線程1之前獲得過Monitor,在執行臨界區代碼時發現部分條件不滿足,無法執行完代碼,因此主動調用wait讓出坑位,自己進入WaitSet ,讓其他阻塞的線程能夠獲得Monitor,避免浪費資源。
step2: 線程1主動放棄Monitor,會喚醒BLOCKED的線程去獲得Monitor,圖中線程2獲得了Monitor。
- 如果條件滿足,此時線程2可以主動調用 notify 或 notifyAll 去喚醒WAITING的線程,喚醒后並不意味者立刻獲得鎖,仍需進入EntryList 重新競爭。
3-2 wait與notify API的使用
obj.wait() 讓進入 object 監視器的線程到 waitSet 等待
obj.wait(n) 讓進入 object 監視器的線程到 waitSet 等待一定時間然后喚醒
obj.notify() 在 object 上正在 waitSet 等待的線程中隨機挑一個喚醒
obj.notifyAll() 讓 object 上正在 waitSet 等待的線程全部喚醒
注意:
- 都是線程之間進行協作的手段,都屬於 Object 對象的方法。(Java中所有類都是Object的子類)不是Thread類專屬方法。
- 必須獲得此對象的鎖,才能調用這幾個方法。否則會產生異常(如下所示)。
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args) throws InterruptedException {
String tmp = "";
tmp.wait();
}
}
運行結果:
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at chapter3.Test6.main(Test6.java:8)
使用wait與notify的簡單的實例
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args) throws InterruptedException {
String tmp = "";
new Thread(()->{
synchronized (tmp){
log.warn("this is thread 1");
try {
tmp.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("thread 1 run after waiting.");
}
},"t1").start();
new Thread(()->{
synchronized (tmp){
log.warn("this is thread 2");
try {
tmp.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("thread 2 run after waiting.");
}
},"t2").start();
Thread.sleep(5000); // 主線程休眠一會
// 必須成為monitor的ownerh后(獲得鎖),才有資格wait,notify
log.warn("start notify threads.");
synchronized (tmp){
tmp.notify(); // 主線程任意喚醒waitset中的一個線程
// tmp.notifyAll(); // 主線程任意喚醒waitset中的所有線程
}
}
}
運行結果:
[t1] WARN c.Test6 - this is thread 1
[t2] WARN c.Test6 - this is thread 2
[main] WARN c.Test6 - start notify threads.
[t1] WARN c.Test6 - thread 1 run after waiting.
3-3 wait與sleep的區別(等待與睡眠的區別)
sleep(n)與wait(n):
- 不同點
- sleep是Thread類的靜態方法,而wait是所有對象的方法。
- wait的使用必須配合synchronized,並且會釋放對象鎖,而在synchroized代碼塊中sleep,鎖不會被釋放。這一點是可以用於區分它們的使用場景。
- 共同點:
- 使用后,線程都會進入JAVA API層面的TIME WAITING狀態
3-4 wait與notify的使用模板
虛假喚醒:當多個線程由於執行條件不滿足使用wait進入 WAITING狀態。此時只有部分線程的條件滿足卻notify所有線程。對於那些執行條件仍未滿足的線程來說就是虛假喚醒。
- 因此在代碼編寫時,當線程被虛假喚醒后仍然需要判斷條件是否滿足,不滿足則繼續wait。
避免虛假喚醒引發問題的模板如下:
synchronized(lock) {
while(條件不成立) { //while語句保證線程執行的條件必須滿足,避免條件不滿足的情況下執行程序
lock.wait();
}
// 干活
}
//另一個線程
synchronized(lock) {
lock.notifyAll();
}
4 同步模式-保護性暫停(wait與notify的應用)
4-1 基礎
定義:保護性暫停(Guarded Suspension),針對一個線程等待另外一個線程的場景。
知識點:
- 傳遞結果的2個線程之間都關聯同一個GuardedObject
- JDK 中,join 的實現、Future 的實現,采用的就是此模式
- 保護性暫停是同步模式
保護性暫停的一個實例
package chapter3;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
class GuardedObject{
private Object response = null;
public Object get(){
synchronized (this){
while(response == null){
try{
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return response;
}
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}
@Slf4j(topic = "c.Test7")
class Test7{
public static void main(String[] args) {
log.warn("start the main thread!");
GuardedObject guard = new GuardedObject();
// 定義一個線程t1去獲取另外一個線程t2網頁下載的結果
new Thread(()->{
log.warn("等待網頁下載");
List<String> downres = (List<String>) guard.get();
log.warn("已經獲得下載的網頁");
log.warn("網頁的大小{}",downres.size());
},"t1").start();
new Thread(()->{
List<String> tmp;
try {
tmp = Downloader.download();
guard.complete(tmp);
log.warn("下載完成");
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class Downloader{
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
){
String tmp;
while((tmp = reader.readLine()) != null){
lines.add(tmp);
}
};
return lines;
}
}
運行結果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - 等待網頁下載
[t2] WARN c.Test7 - 下載完成
[t1] WARN c.Test7 - 已經獲得下載的網頁
[t1] WARN c.Test7 - 網頁的大小3
總結:上面的代碼中線程t1等待線程t2准備好對象,並通過GuardedObject獲得對象。
優勢:
- 在使用join實現2個線程的同步時,必須把傳遞的對象設為公共變量。
有時間限制的保護性暫停的實例
package chapter3;
import javafx.beans.binding.ObjectExpression;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
// 定義了一個guard object進行有時限的等待
class GuardedObjectTime{
private Object response = null;
// timeout:線程最多等待的時間
public Object get(long timeout){
synchronized (this){
long begin = System.currentTimeMillis();
long passedTime = 0;
while(response == null){
// 之所以單獨弄一個waittime是因為需要考慮虛假喚醒的情況
// 虛假喚醒后的線程之前等待的時間也要從總的等待時間減去
long waittime = timeout - passedTime;
if(waittime <= 0)
break;
try{
this.wait(waittime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis()-begin;
}
}
return response;
}
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}
@Slf4j(topic = "c.Test7")
class Test8{
public static void main(String[] args) {
log.warn("start the main thread!");
GuardedObjectTime guard = new GuardedObjectTime();
// 定義一個線程t1去獲取另外一個線程t2的結果
new Thread(()->{
Object response = guard.get(2000); // 獲取結果最多等待2s
log.warn("The result is {}",response);
},"t1").start();
new Thread(()->{
Object tmp = new Object();
try {
// Thread.sleep(3000); // 線程等待時間超過規定時間,無法得到任何結果
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
guard.complete(tmp);
},"t2").start();
}
}
線程t2睡眠時間為3000ms的運行結果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is null
線程t2睡眠時間為1000ms的運行結果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is java.lang.Object@54ed57cd
4-2 Java的join的原理(底層使用也是wait)
join方法的源代碼如下:
/**
* Waits for this thread to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
// 可以看到沒有參數的join調用的是帶時間的join
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
// 情況1:等待時間小於0
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 情況2:等待時間等於0
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
// ============================================================================
// 情況3:等待時間>0, 可以看到這段代碼的邏輯 *有時間限制的保護性暫停的實例*中get的方法邏輯相一致
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
// =============================================================================
}
}
總結:join的源碼的實現也是利用了保護性暫停的設計模式。
4-3 同步模式之保護性暫停擴展(多個線程使用多個GuardObject的解耦方法)
場景:考慮郵遞員,收件人,信箱三種線程對象:
- 多名郵遞員(產生結果的線程):每個郵遞員發送郵件
- 信箱(多個GuardObject)
- 多名收件人(接受結果線程)
動機:多個類之間使用GuardedObject對象作為參數傳遞不是很方便,因此需要設計一個解耦的中間類,能夠做到以下幾點:
- 支持多個任務的管理
- 解耦“結果等待者”和“結果生產者”。
實例:
基本思想:
step1: 為guardedObject添加id。
step2: 定義管理類管理多個guardedObject,有以下功能:
- 創建guardedObject實例並產生唯一id並放入容器管理
- 根據id返回對應的guardedObject對象
step3:業務類調用管理類實現線程之間的通信
- 生產者利用管理類創建其對應的guardedObject實例用於接收信息
- 消費者利用管理類獲取對應的guardedObject實例用於發送信息。
注意點:
- 下面代碼中 Mailboxes 是通用類,用於管理多個guardedObject對象,可以復用。
- 下面代碼中Postman與People是業務相關的類。
package chapter4;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import chapter2.Sleeper;
import lombok.extern.slf4j.Slf4j;
class GuardedObject {
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
private Object response;
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passedTime = 0;
while (response == null) {
long waitTime = timeout - passedTime;
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虛假喚醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
public void complete(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 產生唯一 id
private static synchronized int generateId() {
return id++;
}
// 由於HashTABLE是線程安全的,所以下面2個方法不需要去synchroized
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.warn("開始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.warn("收到信 id:{}, 內容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.PostMan")
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.warn("送信 id:{}, 內容:{}", id, mail);
guardedObject.complete(mail);
}
}
@Slf4j(topic = "c.test1")
public class test1 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
System.out.println(Mailboxes.getIds());
Thread.sleep(1000);
System.out.println(Mailboxes.getIds());
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "內容" + id).start();
}
}
}
運行結果
[]
[Thread-2] WARN c.People - 開始收信 id:3
[Thread-0] WARN c.People - 開始收信 id:2
[Thread-1] WARN c.People - 開始收信 id:1
[3, 2, 1]
[Thread-3] WARN c.PostMan - 送信 id:3, 內容:內容3
[Thread-4] WARN c.PostMan - 送信 id:2, 內容:內容2
[Thread-2] WARN c.People - 收到信 id:3, 內容:內容3
[Thread-0] WARN c.People - 收到信 id:2, 內容:內容2
[Thread-5] WARN c.PostMan - 送信 id:1, 內容:內容1
[Thread-1] WARN c.People - 收到信 id:1, 內容:內容1
5 異步模式-生產者與消費者模式(wait/notify應用)
5-1 概述
生產者與消費者模式特點:
- 該模式使用消息隊列來平衡生產和消費的線程資源
- 消息隊列是有容量限制的,滿時不會再加入數據,空時不會再消耗數據
應用場景:JDK 中各種阻塞隊列,采用的就是這種模式
問題:生產者與消費者模式與保護性暫停模式的區別?
- 保護性暫停模式必須是一對一的,線程發送的信息可以被其對應的接受線程立刻接受到,所以是同步模式
- 生產者與消費者模式沒有一對一的限制,此外需要消息對列傳遞信息,信息的傳遞存在延遲,所以是異步模式。
5-2 代碼實踐
package chapter4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
// 獲取網頁下載的內容,並存入到List
class Downloader{
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
){
String tmp;
while((tmp = reader.readLine()) != null){
lines.add(tmp);
}
};
return lines;
}
}
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
// 使用雙向鏈表實現消息隊列,用於存儲message instance
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
// 消費者線程判斷消息隊列是否為空,為空則等待
while (queue.isEmpty()) {
log.warn("沒貨了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 從隊列的頭部獲取message
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
// 生產者線程判斷隊列是否已滿,為空則等待
while (queue.size() == capacity) {
log.warn("庫存已達上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
@Slf4j(topic = "c.test2")
public class test2 {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
// 4 個生產者線程, 下載任務
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
try{
List<String> response = Downloader.download();
log.warn("try put message({})", id);
messageQueue.put(new Message(id, response));
} catch (IOException e) {
e.printStackTrace();
}
}, "生產者" + i).start();
}
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.warn("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消費者").start();
}
}
總結:生產者與消費者模式本質上依舊是wait/notify組合的應用。
- 上述代碼中采用雙端隊列LinkedList充當消息隊列,並限制了消息隊列的大小。
- 生產者
- 消息隊列未滿的情況下可以放入東西,通過notify通知消費者“東西放好了,繼續拿”
- 消息隊列滿的情況下wait
- 消費者
- 消息隊列未空的情況下可以拿走東西,通過notify通知生產者“東西拿走了,繼續放”
- 消費隊列為空,則wait
參考資料
20210228