Java並發之線程間的協作


     上篇文章我們介紹了synchronized關鍵字,使用它可以有效的解決我們多線程所帶來的一些常見問題。例如:競態條件,內存可見性等。並且,我們也說明了該關鍵字主要是一個加鎖和釋放鎖的集成,所有為能獲得鎖的線程都將被阻塞在某個對象的阻塞隊列上。而我們本篇將要介紹的線程間的協作則主要是對對象的另一個隊列的使用(條件隊列),所有因條件不滿足而無法繼續運行的線程都將在條件隊列上進行等待。主要涉及內容如下:

  • 理解wait/notify這兩個方法
  • 典型的生產者消費者問題
  • 理解join方法的實現原理

一、理解wait/notify這兩個方法
     這兩個方法是我們本篇文章的主角,它們被定義在根類Object中。

public final void wait()
public final native void wait(long timeout)

public final native void notify();
public final native void notifyAll();

兩個wait方法,無參的wait相當於wait(0)表示無限期等待,有參數的wait方法則指定該線程等待多長時間。notify方法用於釋放一個在條件隊列上等待的線程,而notifyall方法則是用於釋放所有在條件隊列上進行等待的線程。那么究竟什么時候調用wait方法讓線程到條件隊列上去等待,什么時候調用notify釋放條件隊列上的線程呢?

我們說過一個對象有一把鎖和兩個隊列,對於所有無法獲取到鎖的線程都將被阻塞在阻塞隊列上,而對於獲取到鎖以后,於運行過程中由於缺少某些條件而不得不終止程序的線程將被阻塞在條件隊列上並讓出CPU。而且需要注意一點的是,線程被阻塞在阻塞隊列上和條件隊列上,所表現出的狀態是不一樣的。例如:

/*定義一個線程類*/
public class MyThread extends Thread{

    @Override
    public void run(){
        try {
            System.out.println(Thread.currentThread().getState());
            synchronized (this){
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*啟動線程*/
public static void main(String[] args) throws InterruptedException {
        Thread thread = new MyThread();
        thread.start();

        Thread.sleep(1000);
        System.out.println(thread.getState());
        System.out.println("main is out");

    }

輸出結果顯示:

這里寫圖片描述

主函數中啟動一個線程,該線程內部運行的時候先輸出當前線程狀態,然后調用wait方法將自己掛在當前線程對象的條件隊列上並讓出CPU,而我們在主函數中對該線程的狀態進行再一次的輸出, 從結果截圖來看,程序並沒有結束----說明子線程並沒有正常結束,阻塞在條件隊列上的線程的狀態是waiting,這和阻塞在阻塞隊列上的線程狀態blocked是完全兩種不同的狀態。但是,當我們調用notify或者notifyall方法將某個線程從條件隊列中釋放的時候,該線程要和外面的其他線程一樣去競爭對象的鎖,如果不能獲取到對象的鎖,依然會被阻塞在該對象的阻塞隊列上。

二、使用wait/notify解決生產者消費者問題
     生產者消費者問題是我們操作系統中的一個經典的問題。生產者向倉庫中源源不斷的放入產品,消費者從倉庫中源源不斷的拿出產品,當倉庫滿的時候,生產者就不能繼續往里面放入產品,當倉庫空的時候,消費者就不能從倉庫里取出產品。如何協調好生產者線程和消費者線程對倉庫的操作就是這個問題的核心。

public class Repository {

    private ArrayDeque<String> list = null;
    private int limit;     //倉庫容量

    public Repository(int limit){
        this.limit = limit;
        list = new ArrayDeque<String>(this.limit);
    }

    //倉庫提供給生產者存入操作
    public synchronized void addGoods(String data) throws InterruptedException {
        while(list.size() == limit){
            //說明倉庫已經滿了
            wait();
        }
        list.add(data);
        System.out.println("i produce a product:"+data);
        notifyAll();
    }

    //倉庫提供給消費者取出操作
    public synchronized String getGoods() throws InterruptedException {
        while(list.isEmpty()){
            //說明倉庫已經空了
            wait();
        }
        String result = list.poll();
        System.out.println("i consume a product:"+ result);
        notifyAll();
        return result;
    }

}

我們定義一個倉庫類,該倉庫提供給生產者投放的方法,提供給消費者取出的方法。我們使用雙端隊列實現對倉庫的模擬,limit參數限定倉庫容量。

生產者的投放方法,當生產者想要向倉庫投放產品時,如果倉庫已經滿了,則將將當前線程阻塞在條件隊列上,等待倉庫有空余位置為止。而如果倉庫沒滿,則向其中投入一個產品並喚醒被阻塞在條件隊列上的所有線程(在本例中實際上就是消費者線程)。一旦消費者線程從條件隊列上被釋放,他將重新和生產者線程競爭對象鎖,在獲取到對象鎖之后將回到上次因條件不足而被阻塞的程序位置。消費者的取出方法和生產者的投放方法類似,此處不再贅述。

public class Producer extends Thread {

    //生產者線程不停的生產產品直到倉庫滿

    private Repository repository = null;

    public Producer(Repository r){
        this.repository = r;
    }

    int count = 0;
    @Override
    public void run(){
        while(true){
            try {
               repository.addGoods(String.valueOf(count));
                count++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定義一個生產者類,生產者始終不停的生產產品,我們用count來模擬產品代號。

public class Consumer extends Thread {

    //消費者線程不停的從倉庫中取出產品直到倉庫空

    private Repository repository = null;

    public Consumer(Repository r){
        this.repository = r;
    }

    @Override
    public void run(){
        while(true){
            try {
                String result = repository.getGoods();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定義一個消費者類,消費者不停的從倉庫中取出產品。

public static void main(String[] args) throws InterruptedException {

        Repository repository = new Repository(20);
        Thread producer = new Producer(repository);
        Thread consumer = new Consumer(repository);

        producer.start();
        consumer.start();

        System.out.println("main thread is out");
    }

最后我們定義一個倉庫並通過構造方法的傳入使得生產者和消費者共同使用相同的倉庫對象。分別啟動兩個線程,程序將死循環的輸出生產者和消費者的生產和消費操作,以下是程序運行的部分結果:

這里寫圖片描述

我們可以看到生產者和消費者這兩個線程交替的輸出,偶爾會出現消費者滯后生產者的情況,但是消費者絕對不會超前生產者,因為只有生產者生產出產品之后,消費者才能取出。以上便是經典的生產者消費者問題,通過對該問題的實現,我們能夠對wait/notify這兩個操作有了一個更加深刻的認識。

三、join方法的實現原理
     join方法的內部其實使用的還是我們上述介紹的wait/notify機制。

public final void join() throws InterruptedException {
        join(0);
    }
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

兩個方法,核心的還是這個帶有參數的join方法。該方法大體上分為三種情況,如果millis小於0,拋出異常。如果millis等於0,就無限期等待,這一段代碼不知道大家理解的如何:

if (millis == 0) {
    while (isAlive()) {
        wait(0);
    }
}
Thread thread = new MyThread();
thread.start();
        
thread.join();

兩小段代碼,第一段代碼是jdk中關於millis 等於0的一個實現,第二段代碼則是我們調用join方法的一個基本格式。我們可以看到,由於join這個方法被synchronized關鍵字修飾,那么我們主線程在調用thread對象的該方法時就需要首先獲得thread對象的鎖。

進入到join方法的內部,當millis 等於0的時候,判斷只要線程對象活着,也就是thread對象活着,就調用wait(0)方法將當前線程(main)線程掛起到thread對象的條件隊列上。一旦thread線程對象執行結束,Java系統將調用notifyall來釋放所有掛在該對象的條件隊列上的線程,此時main線程將會被喚醒,從而實現了main線程等待thread線程執行結束的一個過程。至於millis 大於0的情況,只不過內部調用了wait(long timeout)方法,其他的實現原理基本類似,此處不再贅述。

本篇文章,我們主要介紹線程間的一種協作機制,使用wait/notify兩個方法來協作不同的線程。通過實現經典的生產者消費者模型增加了對wait/notify這兩個方法的理解,最后從源代碼的角度對Thread下的join方法進行了學習,該方法的核心就是利用wait/notify協作主線程和分支線程來實現等待的一個操作。總結不到之處,望指出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM