03 Java的wait/notify及其應用


3 Java中wait/notify

3-1 原理

如上圖所示:

step1:線程1之前獲得過Monitor,在執行臨界區代碼時發現部分條件不滿足,無法執行完代碼,因此主動調用wait讓出坑位,自己進入WaitSet ,讓其他阻塞的線程能夠獲得Monitor,避免浪費資源。

step2: 線程1主動放棄Monitor,會喚醒BLOCKED的線程去獲得Monitor,圖中線程2獲得了Monitor。

  • 如果條件滿足,此時線程2可以主動調用 notify 或 notifyAll 去喚醒WAITING的線程,喚醒后並不意味者立刻獲得鎖,仍需進入EntryList 重新競爭

3-2 wait與notify API的使用

obj.wait()      讓進入 object 監視器的線程到 waitSet 等待
obj.wait(n)     讓進入 object 監視器的線程到 waitSet 等待一定時間然后喚醒
obj.notify()    在 object 上正在 waitSet 等待的線程中隨機挑一個喚醒
obj.notifyAll() 讓 object 上正在 waitSet 等待的線程全部喚醒

注意:

  • 都是線程之間進行協作的手段,都屬於 Object 對象的方法。(Java中所有類都是Object的子類)不是Thread類專屬方法。
  • 必須獲得此對象的鎖,才能調用這幾個方法。否則會產生異常(如下所示)。
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
    public static void main(String[] args) throws InterruptedException {
        String tmp = "";
        tmp.wait();
    }
}

運行結果:

Exception in thread "main" java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at chapter3.Test6.main(Test6.java:8)
使用wait與notify的簡單的實例
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
    public static void main(String[] args) throws InterruptedException {
        String tmp = "";
        new Thread(()->{
            synchronized (tmp){
                log.warn("this is thread 1");
                try {
                    tmp.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.warn("thread 1 run after waiting.");
            }
        },"t1").start();
        
        new Thread(()->{
            synchronized (tmp){
                log.warn("this is thread 2");
                try {
                    tmp.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.warn("thread 2 run after waiting.");
            }
        },"t2").start();
        Thread.sleep(5000);            // 主線程休眠一會
        // 必須成為monitor的ownerh后(獲得鎖),才有資格wait,notify
        log.warn("start notify threads.");
        synchronized (tmp){
            tmp.notify();             // 主線程任意喚醒waitset中的一個線程
//            tmp.notifyAll();        // 主線程任意喚醒waitset中的所有線程
        }
    }
}

運行結果:

[t1] WARN c.Test6 - this is thread 1
[t2] WARN c.Test6 - this is thread 2
[main] WARN c.Test6 - start notify threads.
[t1] WARN c.Test6 - thread 1 run after waiting.

3-3 wait與sleep的區別(等待與睡眠的區別)

sleep(n)與wait(n):

  • 不同點
    • sleep是Thread類的靜態方法,而wait是所有對象的方法。
    • wait的使用必須配合synchronized,並且會釋放對象鎖,而在synchroized代碼塊中sleep,鎖不會被釋放。這一點是可以用於區分它們的使用場景。
  • 共同點:
    • 使用后,線程都會進入JAVA API層面的TIME WAITING狀態

3-4 wait與notify的使用模板

虛假喚醒:當多個線程由於執行條件不滿足使用wait進入 WAITING狀態。此時只有部分線程的條件滿足卻notify所有線程。對於那些執行條件仍未滿足的線程來說就是虛假喚醒。

  • 因此在代碼編寫時,當線程被虛假喚醒后仍然需要判斷條件是否滿足,不滿足則繼續wait。

避免虛假喚醒引發問題的模板如下:

synchronized(lock) {
	while(條件不成立) {       //while語句保證線程執行的條件必須滿足,避免條件不滿足的情況下執行程序
    	lock.wait();
    }
    // 干活
}

//另一個線程
synchronized(lock) {
    lock.notifyAll();
}

4 同步模式-保護性暫停(wait與notify的應用)

4-1 基礎

定義:保護性暫停(Guarded Suspension),針對一個線程等待另外一個線程的場景

知識點

  • 傳遞結果的2個線程之間都關聯同一個GuardedObject
  • JDK 中,join 的實現、Future 的實現,采用的就是此模式
  • 保護性暫停是同步模式

保護性暫停的一個實例

package chapter3;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

class GuardedObject{
    private Object response = null;
    public Object get(){
        synchronized (this){
            while(response == null){
                try{
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return response;
    }

    public void complete(Object response){
        synchronized (this){
            this.response = response;
            this.notifyAll();
        }
    }

}

@Slf4j(topic = "c.Test7")
class Test7{
    public static void main(String[] args) {
        log.warn("start the main thread!");
        GuardedObject guard = new GuardedObject();

        // 定義一個線程t1去獲取另外一個線程t2網頁下載的結果
        new Thread(()->{
            log.warn("等待網頁下載");
            List<String> downres = (List<String>) guard.get();
            log.warn("已經獲得下載的網頁");
            log.warn("網頁的大小{}",downres.size());
        },"t1").start();

        new Thread(()->{
            List<String> tmp;
            try {
                tmp = Downloader.download();
                guard.complete(tmp);
                log.warn("下載完成");
            } catch (IOException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}

class Downloader{
    public  static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
        ){
                String tmp;
                while((tmp = reader.readLine()) != null){
                    lines.add(tmp);
                }

            };
        return lines;
    }
}

運行結果

[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - 等待網頁下載
[t2] WARN c.Test7 - 下載完成
[t1] WARN c.Test7 - 已經獲得下載的網頁
[t1] WARN c.Test7 - 網頁的大小3

總結:上面的代碼中線程t1等待線程t2准備好對象,並通過GuardedObject獲得對象。

優勢:

  • 在使用join實現2個線程的同步時,必須把傳遞的對象設為公共變量。

有時間限制的保護性暫停的實例

package chapter3;

import javafx.beans.binding.ObjectExpression;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

// 定義了一個guard  object進行有時限的等待
class GuardedObjectTime{
    private Object response = null;
    // timeout:線程最多等待的時間
    public Object get(long timeout){
        synchronized (this){
            long begin = System.currentTimeMillis();
            long passedTime = 0;
            while(response == null){
                // 之所以單獨弄一個waittime是因為需要考慮虛假喚醒的情況
                // 虛假喚醒后的線程之前等待的時間也要從總的等待時間減去
                long waittime = timeout - passedTime;
                if(waittime <= 0)
                    break;
                try{
                    this.wait(waittime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                passedTime = System.currentTimeMillis()-begin;
            }
        }
        return response;
    }

    public void complete(Object response){
        synchronized (this){
            this.response = response;
            this.notifyAll();
        }
    }

}

@Slf4j(topic = "c.Test7")
class Test8{
    public static void main(String[] args) {
        log.warn("start the main thread!");
        GuardedObjectTime guard = new GuardedObjectTime();

        // 定義一個線程t1去獲取另外一個線程t2的結果
        new Thread(()->{
            Object response = guard.get(2000);      // 獲取結果最多等待2s
            log.warn("The result is {}",response);
        },"t1").start();

        new Thread(()->{
            Object tmp = new Object();
            try {
//                Thread.sleep(3000);            // 線程等待時間超過規定時間,無法得到任何結果
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            guard.complete(tmp);
        },"t2").start();
    }
}

線程t2睡眠時間為3000ms的運行結果:

[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is null

線程t2睡眠時間為1000ms的運行結果:

[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is java.lang.Object@54ed57cd

4-2 Java的join的原理(底層使用也是wait)

join方法的源代碼如下

    /**
     * Waits for this thread to die.
     *
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     *
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
     // 可以看到沒有參數的join調用的是帶時間的join
    public final void join() throws InterruptedException {
        join(0);
    }
    
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
        // 情況1:等待時間小於0
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        // 情況2:等待時間等於0
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
        // ============================================================================
        // 情況3:等待時間>0, 可以看到這段代碼的邏輯 *有時間限制的保護性暫停的實例*中get的方法邏輯相一致
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        // =============================================================================
        }
    }
    
    

總結:join的源碼的實現也是利用了保護性暫停的設計模式

4-3 同步模式之保護性暫停擴展(多個線程使用多個GuardObject的解耦方法)

場景:考慮郵遞員,收件人,信箱三種線程對象:

  • 多名郵遞員(產生結果的線程):每個郵遞員發送郵件
  • 信箱(多個GuardObject)
  • 多名收件人(接受結果線程)

動機:多個類之間使用GuardedObject對象作為參數傳遞不是很方便,因此需要設計一個解耦的中間類,能夠做到以下幾點:

  • 支持多個任務的管理
  • 解耦“結果等待者”和“結果生產者”。

實例:

基本思想:

step1: 為guardedObject添加id。

step2: 定義管理類管理多個guardedObject,有以下功能:

  • 創建guardedObject實例並產生唯一id並放入容器管理
  • 根據id返回對應的guardedObject對象

step3:業務類調用管理類實現線程之間的通信

  • 生產者利用管理類創建其對應的guardedObject實例用於接收信息
  • 消費者利用管理類獲取對應的guardedObject實例用於發送信息。

注意點:

  • 下面代碼中 Mailboxes 是通用類,用於管理多個guardedObject對象,可以復用。
  • 下面代碼中Postman與People是業務相關的類。
package chapter4;

import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import chapter2.Sleeper;
import lombok.extern.slf4j.Slf4j;

class GuardedObject {
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    private Object response;
    public Object get(long timeout) {
        synchronized (this) {
            long begin = System.currentTimeMillis();
            long passedTime = 0;
            while (response == null) {
                long waitTime = timeout - passedTime;
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虛假喚醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
            }
            return response;
        }
    }
    public void complete(Object response) {
        synchronized (this) {
            this.response = response;
            this.notifyAll();
        }
    }
}

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 產生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    // 由於HashTABLE是線程安全的,所以下面2個方法不需要去synchroized
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        // 收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.warn("開始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.warn("收到信 id:{}, 內容:{}", guardedObject.getId(), mail);
    }
}

@Slf4j(topic = "c.PostMan")
class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.warn("送信 id:{}, 內容:{}", id, mail);
        guardedObject.complete(mail);
    }
}

@Slf4j(topic = "c.test1")
public class test1 {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        System.out.println(Mailboxes.getIds());
        Thread.sleep(1000);
        System.out.println(Mailboxes.getIds());
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "內容" + id).start();
        }
    }
}

運行結果

[]
[Thread-2] WARN c.People - 開始收信 id:3
[Thread-0] WARN c.People - 開始收信 id:2
[Thread-1] WARN c.People - 開始收信 id:1
[3, 2, 1]
[Thread-3] WARN c.PostMan - 送信 id:3, 內容:內容3
[Thread-4] WARN c.PostMan - 送信 id:2, 內容:內容2
[Thread-2] WARN c.People - 收到信 id:3, 內容:內容3
[Thread-0] WARN c.People - 收到信 id:2, 內容:內容2
[Thread-5] WARN c.PostMan - 送信 id:1, 內容:內容1
[Thread-1] WARN c.People - 收到信 id:1, 內容:內容1

5 異步模式-生產者與消費者模式(wait/notify應用)

5-1 概述

生產者與消費者模式特點

  • 該模式使用消息隊列來平衡生產和消費的線程資源
  • 消息隊列是有容量限制的,滿時不會再加入數據,空時不會再消耗數據

應用場景:JDK 中各種阻塞隊列,采用的就是這種模式

問題:生產者與消費者模式與保護性暫停模式的區別?

  • 保護性暫停模式必須是一對一的,線程發送的信息可以被其對應的接受線程立刻接受到,所以是同步模式
  • 生產者與消費者模式沒有一對一的限制,此外需要消息對列傳遞信息,信息的傳遞存在延遲,所以是異步模式。

5-2 代碼實踐

package chapter4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
// 獲取網頁下載的內容,並存入到List
class Downloader{
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
        ){
            String tmp;
            while((tmp = reader.readLine()) != null){
                lines.add(tmp);
            }

        };
        return lines;
    }
}

final class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}

@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    // 使用雙向鏈表實現消息隊列,用於存儲message instance
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
        synchronized (queue) {
            // 消費者線程判斷消息隊列是否為空,為空則等待
            while (queue.isEmpty()) {
                log.warn("沒貨了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 從隊列的頭部獲取message
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message) {
        synchronized (queue) {
            // 生產者線程判斷隊列是否已滿,為空則等待
            while (queue.size() == capacity) {
                log.warn("庫存已達上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

@Slf4j(topic = "c.test2")
public class test2 {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
        // 4 個生產者線程, 下載任務
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try{
                    List<String> response = Downloader.download();
                    log.warn("try put message({})", id);
                    messageQueue.put(new Message(id, response));
                 } catch (IOException e) {
                    e.printStackTrace();
                 }

            }, "生產者" + i).start();
        }
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                log.warn("take message({}): [{}] lines", message.getId(), response.size());
            }
        }, "消費者").start();
    }
}

總結:生產者與消費者模式本質上依舊是wait/notify組合的應用。

  • 上述代碼中采用雙端隊列LinkedList充當消息隊列,並限制了消息隊列的大小。
  • 生產者
    • 消息隊列未滿的情況下可以放入東西,通過notify通知消費者“東西放好了,繼續拿
    • 消息隊列滿的情況下wait
  • 消費者
    • 消息隊列未空的情況下可以拿走東西,通過notify通知生產者“東西拿走了,繼續放”
    • 消費隊列為空,則wait

參考資料

JAVA中的util參考手冊

並發編程課程


20210228


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM