線程間的協作機制


上篇文章我們介紹了 synchronized 這個關鍵字,通過它可以基本實現線程間在臨界區對臨界資源正確的訪問與修改。但是,它依賴一個 Java 對象內置鎖,某個時刻只能由一個線程占有該鎖,其他試圖占有的線程都得阻塞在對象的阻塞隊列上。

但實際上還有一種情況也是存在的,如果某個線程獲得了鎖但在執行過程中由於某些條件的缺失,比如數據庫查詢的資源還未到來,磁盤讀取指令的數據未返回等,這種情況下,讓線程依然占有 CPU 等待是一種資源上的浪費。

所以,每個對象上也存在一個等待隊列,這個隊列上阻塞了所有獲得鎖並處於運行期間缺失某些條件的線程,所以整個對象的鎖與隊列狀況是這樣的。

image

Entry Set 中阻塞了所有試圖獲得當前對象鎖而失敗的線程,Wait Set 中阻塞了所有在獲得鎖運行期間由於缺失某些條件而交出 CPU 的線程集合。

而當某個現場稱等待的條件滿足了,就會被移除等待隊列進入阻塞隊列重新競爭鎖資源。

wait/notify 方法

Object 類中有幾個方法我們雖然不常使用,但是確實線程協作的核心方法,我們通過這幾個方法控制線程間協作。

public final native void wait(long timeout)

public final void wait()

public final native void notify();

public final native void notify();

wait 類方法用於阻塞當前線程,將當前線程掛載進 Wait Set 隊列,notify 類方法用於釋放一個或多個處於等待隊列中的線程。

所以,這兩個方法主要是操作對象的等待隊列,也即是將那些獲得鎖但是運行期間缺乏繼續執行的條件的線程阻塞和釋放的操作。

但是有一個前提大家需要注意,wait 和 notify 操作的是對象內置鎖的等待隊列,也就是說,必須在獲得對象內置鎖的前提下才能阻塞和釋放等待隊列上的線程。簡單來說,這兩個方法的只能在 synchronized 修飾的代碼塊內部進行調用

下面我們看一段代碼:

public class Test {
    private static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(){
            @Override
            public void run(){
                synchronized (lock){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread2 = new Thread(){
            @Override
            public void run(){
                synchronized (lock){
                    System.out.println("hello");
                }
            }
        };
        thread1.start();
        thread2.start();

        Thread.sleep(2000);

        System.out.println(thread1.getState());
        System.out.println(thread2.getState());
    }
}

運行結果:

image

可以看到,程序是沒有正常結束的,也就是說,有線程還未正常退出。線程一優先啟動於線程二,所以它將先獲得 lock 鎖,接着調用 wait 方法將自己阻塞在 lock 對象的等待隊列上,並釋放鎖交出 CPU。

線程二啟動時可能由於線程一依然占有鎖而阻塞,但當線程一釋放鎖以后,線程二將獲得鎖並執行打印語句,隨后同步方法結束並釋放鎖。

此時,線程一依然阻塞在 lock 對象的等待隊列上,所以整個程序沒有正常退出。

演示這么一段程序的意義是什么呢?就是想告訴大家,雖然阻塞隊列和等待隊列上的線程都不能得到 CPU 正常執行指令,但是它們卻屬於兩種不同的狀態,阻塞隊列上的線程在得知鎖已經釋放后將公平競爭鎖資源,而等待隊列上的線程則必須有其他線程通過調用 notify 方法通知並移出等待隊列進入阻塞隊列,重新競爭鎖資源。

相關方法的實現

1、sleep 方法

sleep 方法用於阻塞當前線程指定時長,線程狀態隨即變成 TIMED_WAITING,但區別於 wait 方法。兩者都是讓出 CPU,但是 sleep 方法不會釋放當前持有的鎖。

也就是說,sleep 方法不是用於線程間同步協作的方法,它只是讓線程暫時交出 CPU,暫停運行一段時間,時間到了將由系統調度分配 CPU 繼續執行。

2、join 方法

join 方法用於實現兩個線程之間相互等待的一個操作,看段代碼:

public void testJoin() throws InterruptedException {
    Thread thread = new Thread(){
        @Override
        public void run(){
            for (int i=0; i<1000; i++)
                System.out.println(i);
        }
    };
    thread.start();

    thread.join();
    System.out.println("main thread finished.....");
}

拋開 join 方法不談,main 線程中的打印方法一定是先執行的,而實際上這段程序會在線程 thread 執行完成之后才執行主線程的打印方法。

實現機理區別於 sleep 方法,我們一起看看:

image

image

方法的核心就是調用 wait(delay) 阻塞當前線程,當線程被喚醒計算從進入方法到當前時間共經過了多久。

接着比較 millis 和 這個 now,如果 millis 小於 now 說明,說明等待時間已經到了,可以退出方法返回了。否則則說明線程提前被喚醒,需要繼續等待。

需要注意的是,既然是調用的 wait 方法,那么等待的線程必然是需要釋放持有的當前對象內置鎖的,這區別於 sleep 方法。

一個典型的線程同步問題

下面我們寫一個很有意思的代碼,實現操作系統中的生產者消費者模型,借助我們的 wait 和 notify 方法。

生產者不停生產產品到倉庫中直到倉庫滿,消費者不停的從倉庫中取出產品直到倉庫為空。如果生產者發現倉庫已經滿了,就不能繼續生產產品,而消費者如果發現倉庫為空,就不能從倉庫中取出產品。

public class Repository {
    private List<Integer> list = new ArrayList<>();
    private int limit = 10;  //設置倉庫容量上限

    public synchronized void addGoods(int count) throws InterruptedException {
        while(list.size() == limit){
            //達到倉庫上限,不能繼續生產
            wait();
        }
        list.add(count);
        System.out.println("生產者生產產品:" + count);
        //通知所有的消費者
        notifyAll();
    }

    public synchronized void removeGoods() throws InterruptedException {
        while(list.size() <= 0){
            //倉庫中沒有產品
            wait();
        }

        int res = list.get(0);
        list.remove(0);
        System.out.println("消費者消費產品:" + res);
        //通知所有的生產者
        notifyAll();
    }
}

寫一個倉庫類,該類提供兩個方法供外部調用,一個是往倉庫放產品,如果倉庫滿了則阻塞到倉庫對象的等待隊列上,一個是從倉庫中取出產品,如果倉庫為空則阻塞在倉庫的等待隊列上。

public class Producer extends Thread{
    Repository repository = null;

    public Producer(Repository p){
        this.repository = p;
    }

    @Override
    public void run(){
        int count = 1;
        while(true){
            try {
                Thread.sleep((long) (Math.random() * 500));
                repository.addGoods(count++);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定義一個生產者類,生產者隨機的向倉庫添加產品。如果沒有能成功的添加,會被阻塞在循環里。

public class Customer extends Thread{
    Repository repository = null;

    public Customer(Repository p){
        this.repository = p;
    }

    @Override
    public void run(){
        while(true){
            try {
                Thread.sleep((long) (Math.random() * 500));
                repository.removeGoods();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定義一個消費者類,消費者類隨機的從倉庫中取一個產品。如果沒有成功的取出一個產品,同樣會被阻塞在循環里。

public void testProducerAndCustomer() {
    Repository repository = new Repository();
    Thread producer = new Producer(repository);
    Thread consumer = new Customer(repository);

    producer.start();
    consumer.start();

    producer.join();
    consumer.join();
    System.out.println("main thread finished..");
}

主線程啟動這兩個線程,程序運行的情況大致是這樣的:

生產者生產產品:1
消費者消費產品:1
生產者生產產品:2
消費者消費產品:2
生產者生產產品:3
消費者消費產品:3
。。。。。
。。。。。
消費者消費產品:17
生產者生產產品:21
消費者消費產品:18
生產者生產產品:22
消費者消費產品:19
生產者生產產品:23
消費者消費產品:20
生產者生產產品:24
生產者生產產品:25
生產者生產產品:26
消費者消費產品:21
生產者生產產品:27
生產者生產產品:28
消費者消費產品:22
消費者消費產品:23
生產者生產產品:29
生產者生產產品:30
。。。。。。
。。。。。。

仔細觀察,你會發現,消費者者永遠不會消費一個不存在的產品,消費的一定是生產者生產的產品。剛開始可能是生產者生產一個產品,消費者消費一個產品,而一旦消費者線程執行的速度超過了生產者,必然會由於倉庫容量為空而被阻塞。

生產者線程的執行速度可以超過消費者線程,而消費者線程的執行速度如果一直超過生產者就會導致倉庫容量為空而致使自己被阻塞。

總結一下,synchronized 修飾的代碼塊是直接使用的對象內置鎖的阻塞隊列,線程獲取不到鎖自然被阻塞在該隊列上,而 wait/notify 則是我們手動的控制等待隊列的入隊和出隊操作。但本質上都是利用的對象內置鎖的兩個隊列。

這兩篇文章介紹的是利用 Java 提供給我們的對象中的內置鎖來完成基本的線程間同步操作,這部分知識是后續介紹的各種同步工具,集合類框架等實現的底層原理。


文章中的所有代碼、圖片、文件都雲存儲在我的 GitHub 上:

(https://github.com/SingleYam/overview_java)

歡迎關注微信公眾號:OneJavaCoder,所有文章都將同步在公眾號上。

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM