【JAVA並發第三篇】線程間通信


線程間的通信

JVM在運行時會將自己管理的內存區域,划分為不同的數據區,稱為運行時數據區。每個線程都有自己私有的內存空間,如下圖示:

在這里插入圖片描述

Java線程按照自己虛擬機棧中的方法代碼一步一步的執行下去,在這一過程中不可避免的會使用到線程共享的內存區域堆或方法區。為了防止多個線程在同一時刻訪問同一個內存地址,需要互相告知自己的狀態以避免資源爭奪。

線程的通信方式主要分為三種方式:①共享內存②消息傳遞③管道流

共享內存:線程之間通過對共享內存的讀-寫來實現隱式通信。Java中的具體實現是:volatile共享內存。

消息傳遞:線程之間通過明確的發送消息來實現顯示通信。Java中的具體實現是:等待/通知機制(wait/notify),join方法。

管道流:管道輸入/輸出流。

1、等待/通知機制

其過程是:線程A由於某些原因,自主調用了對象o的wait方法,進入WAITING狀態,釋放占有的鎖並等待通知。而線程B則調用對象o的notify方法或notifyall方法進行通知,線程A會收到通知,並從wait方法中返回,繼續執行后面的代碼。

可以發現,線程A和線程B就是通過對象o的wait方法和notify方法來發送消息,進行通信。

wait方法和notify方法是Object類的方法,而Object類是所有類的父類,因此所有對象都實現了Object類的方法。即所有的對象都具有wait方法和notify方法。

方法 作用 備注
wait 線程調用共享對象的wait()方法后會進入WAITING狀態,釋放占有的對象鎖並等待其他線程的通知或中斷才從該方法返回。 該方法可以傳參數,wait(long n):超時等待n毫秒,進入TIME-WAITING狀態,如果在n毫秒內沒有通知或中斷,則自行返回
notify 線程調用共享對象的notify()方法后會通知一個調用了wait方法並在此等待的線程返回。但由於在共享變量上等待的線程可能不止一個,故具體通知哪一個線程是隨機的。 notifyAll()方法與notify()方法作用一致,不過notify是隨機通知一個線程,而notifyAll則是通知所有在該共享變量上等待的線程

由於線程的等待/通知機制需要借助共享對象,所以在調用wait方法前,線程必須先獲得該對象的鎖,即只能在同步方法或同步塊(synchronized代碼塊)中調用wait方法,在調用wait方法后,線程釋放鎖。

同樣的notify方法在調用前也需要獲得對象的鎖,即也只能在同步方法或同步塊中調用notify方法。若有多個線程在等待,則線程調度器會隨機挑選一個線程來通知。需要注意的是,被通知的線程並不會在得到通知后就馬上從wait方法返回,而是需要等待獲得對象的鎖后才能從wait方法返回。而調用了notify方法的線程也並不會在調用時就馬上釋放對象的鎖,而是在執行完同步方法或同步塊(synchronized代碼塊)后,才釋放對象的鎖。因此,被通知的線程要等調用了notify的線程釋放鎖后,才能從wait方法中返回。

綜上所述,等待/通知機制的經典范式如下:

/**
 * 等待線程(調用wait方法的線程)
 */
synchronized(共享對象){ //同步代碼塊,進入條件是獲得鎖
    while(判斷條件){ //進行wait線程任務的條件不滿足時進入
        共享對象.wait()
    }
    線程任務代碼
}

/**
 * 通知線程(調用notify方法的線程)
 */
synchronized(共享對象){ //同步代碼塊,進入條件是獲得鎖
    線程任務代碼
    改變wait線程任務的條件
    共享對象.notify()
}

根據以上范式,有代碼如下:

public class WaitNotify {
    static boolean flag = true; //等待線程繼續執行往下執行的條件
    static Object lock = new Object(); //上鎖的對象

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new WaitRunnable(),"waitThread");   //以WaitRunnable為任務類的線程
        Thread notifyThread = new Thread(new NotifyRunnable(),"notifyThread");   //以NotifyRunnable為任務類的線程
        waitThread.start(); //wait線程啟動
        Thread.sleep(2000); //主線程休眠2s
        notifyThread.start(); //notify線程啟動
    }

    /**
     * Runnable等待實現類
     * synchronized關鍵字:可以修飾方法或者以同步塊的形式來使用
     */
    static class WaitRunnable implements Runnable{
        @Override
        public void run() {
            //對lock加鎖
            synchronized(lock){
                //判斷,若flag為true,則繼續等待(wait)
                while(flag){
                    try {
                        System.out.println(
                                Thread.currentThread().getName()+
                                "---flag為true,等待 @"+
                                new SimpleDateFormat("hh:mm:ss").format(new Date())
                        );
                        lock.wait(); //等待,並釋放鎖資源
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //若flag為false,則進行工作
                System.out.println(
                        Thread.currentThread().getName()+
                        "---flag為false,運行 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
            }
        }
    }

    /**
     * Runnable通知實現類
     */
    static class NotifyRunnable implements Runnable{
        @Override
        public void run(){
            //對lock加鎖
            synchronized(lock){
                //以NotifyRunnable為任務類的線程釋放lock鎖,並進行通知后,以Wait為任務類的線程才可以跳出循環
                System.out.println(
                        Thread.currentThread().getName()+ 
                        "---當前持有鎖,釋放 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
                lock.notifyAll(); //通知所有正在等待的線程從wait返回
                flag = false;
                try {
                    Thread.sleep(5000); //notifyThread線程休眠5s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //再次對lock加鎖,並休眠
            synchronized (lock){
                System.out.println(
                        Thread.currentThread().getName()+
                        "---再次持有鎖,休眠 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
                try {
                    Thread.sleep(2000); //再次讓notifyThread線程休眠2s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


//該代碼示例來自《Java並發編程的藝術》

其結果如下:

waitThread---flag為true,等待 @01:53:51
notifyThread---當前持有鎖,釋放 @01:53:53
waitThread---flag為false,運行 @01:53:58
notifyThread---再次持有鎖,休眠 @01:53:58

以上代碼根據等待/通知的經典范式,設置一個線程是否繼續往下執行的條件變量flag,以及一個共享對象lock,並使用synchronized關鍵字對lock上鎖。

waitThread線程是等待線程,在啟動時會嘗試獲得鎖,成功則進入synchronized代碼塊。在synchronized代碼塊中,如果條件不滿足(即flag為true),則waitThread線程會進入while循環,並在循環體中調用wait方法,進入WAITING狀態及釋放鎖資源。直到有其他線程調用notify方法通知才從wait方法返回。

notifyThread線程是通知線程,在啟動時也會嘗試獲得鎖,成功則同樣進入synchronized代碼塊。在synchronized代碼塊中,notifyThread線程會改變條件,使waitThread線程可以繼續往下執行(即令flag為false),同時notifyThread線程也會調用notyfiAll方法,讓waitThread線程收到通知。

但注意,notifyThread線程並不會在調用notyfiAll方法后就馬上釋放鎖,而是在執行完synchronized代碼塊的內容后才釋放鎖。我們在notifyThread線程調用notyfiAll后,將該線程休眠5s。可以從打印結果發現,在notifyThread線程休眠的5s中,即使waitThread線程得到了通知,且繼續運行的條件也已滿足(flag為flase),但waitThread線程在這5s中依然沒有得到執行。在notifyThread線程5s的休眠時間結束后,並從synchronized代碼塊退出,waitThread線程才繼續執行。所以,等待線程在得到通知后,仍然需要等待通知線程釋放鎖,並且在嘗試獲得鎖成功后才能真正從wait方法中返回,並繼續執行。

2、共享內存

有如下代碼,


/**
 * @Author Feng Jian
 * @Date 2021/1/20 13:18
 * @Version 1.0
 */
public class JMMTest {
    private static boolean run = true;

    public static void main(String[] args) throws InterruptedException {
        Thread My_Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(run){
                    //...
                }
            }
        }, "My_Thread");

        My_Thread.start();  //啟動My_Thread線程

        System.out.println(Thread.currentThread().getName()+"正在休眠@"+new SimpleDateFormat("hh:mm:ss").format(new Date())+"--"+run);

        Thread.sleep(1000);  //主線程休眠1s
        run = false;  //改變My_Thread線程運行條件,但My_Thread線程並不會停下

        System.out.println(Thread.currentThread().getName()+"正在運行@"+new SimpleDateFormat("hh:mm:ss").format(new Date())+"--"+run);
    }
}

定義了一個變量run,並以此作為My_Thread線程中while循環執行的條件。在啟動My_Thread線程,並使主線程休眠1s后,改變變量run的值。其結果如下:

在這里插入圖片描述

可以看出,即使是run的值已經改變,但My_Thread線程依然不會停下來。為什么呢?這就需要了解Java的內存模型(JMM)。

我們知道,CPU要從內存中讀取出數據來進行計算,但實際上CPU並不總是直接從內存中讀取數據。由於CPU和內存間(常稱之為主存)的速度不匹配(CPU的速度比主存快得多),為了有效利用CPU,使用多級cache的機制,如圖

在這里插入圖片描述

因此,CPU讀取數據的順序是:寄存器-高速緩存-主存。主存中的部分數據,會先拷貝一份放到cache中,當CPU計算時,會直接從cache中讀取數據,計算完畢后再將計算結果放置到cache中,最后在主存中刷新計算結果。因此每個CPU都會擁有一份拷貝。

以上只是CPU訪問內存,進行計算的基本方式。實際上,不同的硬件,訪問過程會存在不同程度的差異。比如,不同的計算機,CPU和主存間可能會存在三級緩存、四級緩存、五級緩存等等的情況。

為了屏蔽掉各種硬件和操作系統的內存訪問差異,實現讓 Java 程序在各種平台下都能達到一致的內存訪問效果,定義了Java的內存模型(Java Memory Model,JMM)。

JMM 的主要目標是定義程序中各個變量的訪問規則,即在虛擬機中將變量存儲到主存和從主存中取出變量這樣的底層細節。這里的變量指的是能夠被多個線程共享的變量,它包括了實例字段、靜態字段和構成數組對象的元素,方法內的局部變量和方法的參數為線程私有,不受JMM的影響。

Java的內存模型如下,

在這里插入圖片描述
JMM定義了線程和主內存之間的關系:線程之間的共享變量存儲在主內存中,每個線程都有一個私有的本地內存,本地內存中存儲着主內存中的共享變量的副本。

JMM規定:將所有共享變量放到主內存中,當線程使用變量時,會把其中的變量復制到自己的本地內存,線程讀寫時操作的是本地內存中的變量副本。一個線程不能訪問其他線程的本地內存。

本地內存其實只是一個抽象的概念,它實際上並不真實存在,其包含了緩存、寫緩沖區、寄存器以及其他的硬件和編譯器的優化。

在多線程環境下,由於每個線程都有主內存中共享變量的副本,所以當線程運行時,讀取的是自己本地內存中的共享變量的副本,這就產生了線程的安全問題:比如主內存中的共享變量i為1,線程A和B從主內存取出變量i,放入自己的本地內存中成為共享變量i的副本。當線程A執行時,會直接從自己的本地內存中讀取副本變量i的值,進行加1計算,完成后更新本地內存中的副本i的值,再寫回到主內存中,此時主內存中的i的值為2。

而如果此時線程B也需要用到變量i的值,則它並不會去主內存中讀取i的值,而是直接在自己的本地內存中讀取i的副本,而此時線程B的本地內存中的副本i的值依然為1,而不是經過線程A修改后的,主內存中的值2。

這也是為什么在上述代碼中,main線程明明已經修改了變量run的值,但My_Thread線程依然在執行while循環的原因。如圖所示,

在這里插入圖片描述

這同樣是JMM所要處理的多線程可見性的問題:當一個共享變量在多個線程的工作內存中都有副本時,如果一個線程修改了這個共享變量的副本值,那么其他線程應該能夠看到這個被修改后的值。即如何保證指令不會受 cpu 緩存的影響。

回到上述的代碼,如何使My_Thread線程能接收到main線程已經修改run = false的信息?即My_Thread線程和main線程如何能夠通信。

根據Java的內存模型,這兩個線程如果需要通信,則必須經歷以下兩步:

①main線程把本地內存中修改過的共享變量run的值刷新到主內存中。
②My_Thread線程到主內存中去讀取main線程之前已經更新過的共享變量run的值。

這意味着,兩個線程的通信必須經過主內存。Java提供volitale關鍵字實現這一要求。

volitale關鍵字可以用來修飾字段(成員變量),告知Java程序任何對該變量的訪問都要從共享內存(主內存)中獲取,而對它的改變都必須同步刷新回共享內存。當一個變量被聲明為volitale時,線程在寫入變量時,不會把值緩存在寄存器或者高速緩存中(即本地內存),而是會把值刷新回主存,當要讀取該共享變量時,線程則會先清空本地內存中的副本值,從主存中重新獲取。故volitale關鍵字可以保證所有線程對變量訪問的可見性,即對共享變量的讀寫都需要經過主內存,因此達到線程通過共享內存進行通信的目的。

知道了線程之間如何通過共享內存進行通信,我們改寫一下上述代碼,使main線程修改完run = false后,My_Thread線程中的while循環即立即停止。

實際上只需要給共享變量run加上volitale關鍵字即可:

private static volatile boolean run = true;

修改后的運行結果如下:

在這里插入圖片描述
可見,在main線程修改共享變量run的值后,即刷新回主內存。而My_Thread線程讀取主內存中的run發現值為false后即停止了while循環。

實際上,也可以使用synchronized關鍵字來保證內存可見性問題,實現線程通信。其機制是:在synchronized修飾的同步塊中,如果對一個共享變量進行操作,將會清空線程本地內存中此變量的值,並在使用這個共享變量前重新在主內存中讀取這個變量的值。而在同步塊執行完畢,釋放鎖資源時,則必須先把此共享變量同步回主內存中。

3、管道流

由於還未學習使用到,先暫時略過。。。

以上內容為本人在學習過程中所做的筆記。參考的書籍、文章或博客如下:
[1]方騰飛,魏鵬,程曉明. Java並發編程的藝術[M].機械工業出版社.
[2]霍陸續,薛賓田. Java並發編程之美[M].電子工業出版社.
[3]Simen郎. 拜托,線程間的通信真的很簡單.知乎.https://zhuanlan.zhihu.com/p/138689342
[4]極樂君.Java線程內存模型,線程、工作內存、主內存.知乎.https://zhuanlan.zhihu.com/p/25474331


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM