多線程 | 線程通信


線程之間的通信

前言

為什么要有線程通信?

​ 多個線程並發執行時, 在默認情況下CPU是隨機切換線程的,當我們需要多個線程來共同完成一件任務,當然如果我們沒有使用線程通信來使用多線程共同操作同一份數據的話,雖然可以實現,但是在很大程度會造成多線程之間對同一共享變量的爭奪,那樣的話勢必為造成很多錯誤和損失!
​ 所以,我們才引出了線程之間的通信,多線程之間的通信能夠避免對同一共享變量的爭奪。且我們希望他們有規律的執行, 那么多線程之間需要一些協調通信,以此來幫我們達到多線程共同操作一份數據

什么是線程通信?

​ 首先,線程間通信的方式有兩種:共享內存和消息傳遞,以下方式都是基本這兩種模型來實現的。我們來基本一道面試常見的題目來分析:

題目:有兩個線程A、B,A線程向一個集合里面依次添加元素"abc"字符串,一共添加十次,當添加到第五次的時候,希望B線程能夠收到A線程的通知,然后B線程執行相關的業務操作**

一種方式是使用 volatile 關鍵字

基於 volatile 關鍵字來實現線程間相互通信是使用共享內存的思想,大致意思就是多個線程同時監聽一個變量,當這個變量發生變化的時候 ,線程能夠感知並執行相應的業務。這也是最簡單的一種實現方式

public class TestSync {
    // 定義一個共享變量來實現通信,它需要是volatile修飾,否則線程不能及時感知
    static volatile boolean notice = false;

    public static void main(String[] args) {
        List<String>  list = new ArrayList<>();
        // 實現線程A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("線程A向list中添加一個元素,此時list中的元素個數為:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    notice = true;
            }
        });
        // 實現線程B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (notice) {
                    System.out.println("線程B收到通知,開始執行自己的業務...");
                    break;
                }
            }
        });
        // 需要先啟動線程B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 再啟動線程A
        threadA.start();
    }
}

運行結果為

此方法雖然可以解決問題,但會造成資源爭奪且易出錯,不推薦。

下面引出線程通信常用的方式——等待喚醒機制

等待喚醒機制

​ 就是在一個線程進行了規定操作后,就進入等待狀態(wait), 等待其他線程執行完他們的指定代碼過后 再將其喚醒(notify)。Object類提供了線程間通信的方法:wait()notify()notifyaAl(),它們是多線程通信的基礎,而這種實現方式的思想自然是線程間通信。

wait()

可以使當前執行的線程等待,暫停執行,直到接到通知或被中斷為止

注意

  • 要確保調用wait()方法的時候擁有鎖,即wait()方法只能在synchronized同步代碼塊中且只能由鎖對象調用,否則會拋出IlegalMonitorStateExeption異常
  • 調用 wait()方法,當前線程會釋放鎖

wait和 notify必須配合synchronized使用,wait方法釋放鎖,notify方法不釋放鎖

notify()/notifyAll()

notify():notify()方法會喚醒一個等待當前鎖的線程,若有多個等待的線程則會隨機喚醒一個。

notifAll(): notifyAll()方法會喚醒等待當前鎖的所有線程。

在同步代碼塊中調用 notify()方法后,並不會立即釋放鎖對象,需要等當前同步代碼塊執行完后才會釋放鎖對象,一般將 notify()方法放在同步代碼塊的最后

生產者消費者模型是我們學習多線程知識的一個經典案例,一個典型的生產者消費者模型如下:

    public void produce() {
        synchronized (this) {
            while (mBuf.isFull()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            mBuf.add();
            notifyAll();
        }
    }

    public void consume() {
        synchronized (this) {
            while (mBuf.isEmpty()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            mBuf.remove();
            notifyAll();
        }

    }

這段代碼很容易引申出來兩個問題:一個是wait()方法外面為什么是while循環而不是if判斷,另一個是結尾處的為什么要用notifyAll()方法,用notify()行嗎。

很多人在回答第二個問題的時候會想當然的說notify()是喚醒一個線程,notifyAll()是喚醒全部線程,但是喚醒然后呢,不管是notify()還是notifyAll(),最終拿到鎖的只會有一個線程,那它們到底有什么區別呢?

其實這是一個對象內部鎖的調度問題,要回答這兩個問題,首先我們要明白java中對象鎖的模型,JVM會為一個使用內部鎖(synchronized)的對象維護兩個集合,Entry SetWait Set,也有人翻譯為鎖池和等待池,意思基本一致。

對於Entry Set:如果線程A已經持有了對象鎖,此時如果有其他線程也想獲得該對象鎖的話,它只能進入Entry Set,並且處於線程的BLOCKED狀態。

對於Wait Set:如果線程A調用了wait()方法,那么線程A會釋放該對象的鎖,進入到Wait Set,並且處於線程的WAITING狀態。

還有需要注意的是,某個線程B想要獲得對象鎖,一般情況下有兩個先決條件,一是對象鎖已經被釋放了(如曾經持有鎖的前任線程A執行完了synchronized代碼塊或者調用了wait()方法等等),二是線程B已處於RUNNABLE狀態。

那么這兩類集合中的線程都是在什么條件下可以轉變為RUNNABLE呢?

對於Entry Set中的線程,當對象鎖被釋放的時候,JVM會喚醒處於Entry Set中的某一個線程,這個線程的狀態就從BLOCKED轉變為RUNNABLE。

對於Wait Set中的線程,當對象的notify()方法被調用時,JVM會喚醒處於Wait Set中的某一個線程,這個線程的狀態就從WAITING轉變為RUNNABLE;或者當notifyAll()方法被調用時,Wait Set中的全部線程會轉變為RUNNABLE狀態。所有Wait Set中被喚醒的線程會被轉移到Entry Set中。

然后,每當對象的鎖被釋放后,那些所有處於RUNNABLE狀態的線程會共同去競爭獲取對象的鎖,最終會有一個線程(具體哪一個取決於JVM實現,隊列里的第一個?隨機的一個?)真正獲取到對象的鎖,而其他競爭失敗的線程繼續在Entry Set中等待下一次機會。

有了這些知識點作為基礎,上述的兩個問題就能解釋的清了。

首先來看第一個問題,我們在調用wait()方法的時候,心里想的肯定是因為當前方法不滿足我們指定的條件,因此執行這個方法的線程需要等待直到其他線程改變了這個條件並且做出了通知。那么為什么要把wait()方法放在循環而不是if判斷里呢,其實答案顯而易見,因為wait()的線程不能確定其他線程會在什么狀態下notify(),所以必須在被喚醒、搶占到鎖並且從wait()方法退出的之前再次進行指定條件的判斷,以決定是滿足條件往下執行呢還是不滿足條件再次wait()呢,歸根結底是if只執行一次,而while是循環,可判斷多次。

就像在本例中,如果只有一個生產者線程,一個消費者線程,那其實是可以用if代替while的,因為線程調度的行為是開發者可以預測的,生產者線程只有可能被消費者線程喚醒,反之亦然,因此被喚醒時條件始終滿足,程序不會出錯。但是這種情況只是多線程情況下極為簡單的一種,更普遍的是多個線程生產,多個線程消費,那么就極有可能出現喚醒生產者的是另一個生產者或者喚醒消費者的是另一個消費者,這樣的情況下用if就必然會現類似過度生產或者過度消費的情況了,典型如IndexOutOfBoundsException的異常。所以所有的java書籍都會建議開發者永遠都要把wait()放到循環語句里面

然后來看第二個問題,既然notify()和notifyAll()最終的結果都是只有一個線程能拿到鎖,那喚醒一個和喚醒多個有什么區別呢?

C1、C2(消費者線程)P1、P2 (生產者線程),假設初始時buffer是空的。

  1. C1、C2獲得執行,發現buffer為空,因此調用wait阻塞,此時等待隊列(WaitSet)里有兩個線程C1、C2。

  2. P1拿到了鎖,發現buffer為空,於是把它加滿。

  3. P2 此時過來想要搶鎖,發現鎖被P1持有,於是放在同步隊列里等待(_cxq/_EntryList),此時同步隊列里有一個線程P2。

  4. P1加滿buffer后調用notify將等待隊列里的線程挪動到同步隊列里,假設此處是C1被挪動到同步隊列里。此時等待隊列里有線程C2,同步隊列里有線程P2、C1。

  5. 當P1退出臨界區釋放鎖后,會喚醒同步隊列里的線程,假設喚醒的是P2。

  6. P2獲取鎖后發現buffer是滿的,於是調用wait釋放鎖並阻塞自己。此時同步隊列里有線程C1,等待隊列里有線程C2、P2。

  7. 同步隊列里只有C1,C1獲得鎖后消費buffer,buffer變空,然后調用notify將等待隊列里的線程挪到同步隊列里,假設挪動的是C2。此時同步隊列里有線C2,等待隊列里有線程P2。

  8. C2獲得鎖后發現buffer為空,於是調用wait釋放鎖並掛起自己。此時同步隊列為空,等待隊列里有P2。

  9. 因為同步隊列為空,所以C2並沒有喚醒任何線程,而等待隊列里的P2卻是在苦苦等待。。再也沒人喚醒它

但如果你把上述例子中的notify()換成notifyAll(),這樣的情況就不會再出現了,因為每次notifyAll()都會使其他等待的線程從Wait Set進入Entry Set,從而有機會獲得鎖。

其實說了這么多,一句話解釋就是之所以我們應該盡量使用notifyAll()的原因就是,notify()非常容易導致死鎖。當然notifyAll並不一定都是優點,畢竟一次性將Wait Set中的線程都喚醒是一筆不菲的開銷,如果你能handle你的線程調度,那么使用notify()也是有好處的。

還有notifyAll()是適用在公共的狀態,比如這個例子是公共的狀態,需要通過all 喚醒沒問題,但是實際項目中是不會靠一個公共值維持線程的,一般是單獨的對象對應一個同步鎖,比如我是多個用戶訪問同個方法,每個用戶都會獨立wait(),如果是all 喚醒 那會喚醒全部,是不對的,必須要維持單獨的對象鎖,即每個對象喚醒本身,並設置wait時間!前面聽all 多好多好,用了才發現,全醒是不行滴!請大家注意實際使用!

最后我把完整的測試代碼放出來,供大家參考:

import java.util.ArrayList;
import java.util.List;

public class Something {
    private Buffer mBuf = new Buffer();

    public void produce() {
        synchronized (this) {
            while (mBuf.isFull()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            mBuf.add();
            notifyAll();
        }
    }

    public void consume() {
        synchronized (this) {
            while (mBuf.isEmpty()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            mBuf.remove();
            notifyAll();
        }
    }

    private class Buffer {
        private static final int MAX_CAPACITY = 1;
        private List innerList = new ArrayList<>(MAX_CAPACITY);

        void add() {
            if (isFull()) {
                throw new IndexOutOfBoundsException();
            } else {
                innerList.add(new Object());
            }
            System.out.println(Thread.currentThread().toString() + " add");

        }

        void remove() {
            if (isEmpty()) {
                throw new IndexOutOfBoundsException();
            } else {
                innerList.remove(MAX_CAPACITY - 1);
            }
            System.out.println(Thread.currentThread().toString() + " remove");
        }

        boolean isEmpty() {
            return innerList.isEmpty();
        }

        boolean isFull() {
            return innerList.size() == MAX_CAPACITY;
        }
    }

    public static void main(String[] args) {
        Something sth = new Something();
        Runnable runProduce = new Runnable() {
            int count = 4;

            @Override
            public void run() {
                while (count-- > 0) {
                    sth.produce();
                }
            }
        };
        Runnable runConsume = new Runnable() {
            int count = 4;

            @Override
            public void run() {
                while (count-- > 0) {
                    sth.consume();
                }
            }
        };
        for (int i = 0; i < 2; i++) {
            new Thread(runConsume).start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread(runProduce).start();
        }
    }
}
  • 上面的栗子是正確的使用方式,輸出的結果如下:
Thread[Thread-2,5,main] add
Thread[Thread-1,5,main] remove
Thread[Thread-3,5,main] add
Thread[Thread-0,5,main] remove
Thread[Thread-3,5,main] add
Thread[Thread-0,5,main] remove
Thread[Thread-2,5,main] add
Thread[Thread-1,5,main] remove

Process finished with exit code 0
  • 如果把while改成if,結果如下,程序可能產生運行時異常:
Thread[Thread-2,5,main] add
Thread[Thread-1,5,main] remove
Thread[Thread-3,5,main] add
Thread[Thread-1,5,main] remove
Thread[Thread-3,5,main] add
Thread[Thread-1,5,main] remove
Exception in thread "Thread-0" Exception in thread "Thread-2" java.lang.IndexOutOfBoundsException
    at Something$Buffer.add(Something.java:42)
    at Something.produce(Something.java:16)
    at Something$1.run(Something.java:76)
    at java.lang.Thread.run(Thread.java:748)
java.lang.IndexOutOfBoundsException
    at Something$Buffer.remove(Something.java:52)
    at Something.consume(Something.java:30)
    at Something$2.run(Something.java:86)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 0
  • 如果把notifyAll改為notify,結果如下,死鎖,程序沒有正常退出:
Thread[Thread-2,5,main] add
Thread[Thread-0,5,main] remove
Thread[Thread-3,5,main] add

常見問題

通知過早

線程wait()等待后,可以調用notify()喚醒線程,如果notify()喚醒的過早,在wait()等待之前就調用了notify()可能會打亂程序正常的運行邏輯。

所以若notify()通知過早,就不讓線程等待了

public class WaitTest {
    static boolean isFirst = true;      //定義靜態變量作為是否第一個運行的線程標志
    public static void main(String[] args) {
        final Object lock = new Object();   //定義鎖對象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    while (isFirst) {
                        try {
                            System.out.println("wait begin");
                            lock.wait();
                            System.out.println("wait end");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("notify begin");
                    lock.notify();
                    System.out.println("notify end");
                    isFirst = false;
                }
            }
        });

//        t2.start();
//        t1.start();  //notify begin  notify end

        t1.start();
        t2.start();
        /*
          wait begin
          notify begin
          notify end
          wait end
         */
    }
}

實際上,調用start()就是告訴線程調度器,當前線程准備就緒,線程調度器在什么時候開啟這個線程不確定,即調用start()方法的順序,並不一定就是線程實際開啟的順序.

wait 等待條件發生了變化

public class WaitTest {
    //1.定義list集合
    static List list = new ArrayList<>();

    //2.定義方法從集合中取數據
     static public void subtract(){
        synchronized (list){
            while (list.size() == 0) {
                try {
                    System.out.println(Thread.currentThread().getName()+" wait begin");
                    list.wait();
                    System.out.println(Thread.currentThread().getName()+" wait end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            list.remove(0);
            System.out.println("成功取出數據"+Thread.currentThread().getName()+" list.size="+list.size());
        }
    }

    //3)定義方法向集合中添加數據並喚醒等待的取數據的線程
    static public void add(){
        synchronized (list) {
            list.add("data");
            System.out.println(Thread.currentThread().getName()+"向集合中添加了數據");
            list.notifyAll();
        }
    }

    //4)定義線程類調用 add()取數據的方法
    static class ThreadAdd extends Thread{
        @Override
        public void run() {
            add();
        }
    }

    //定義線程類調用 subtract()方法
    static class ThreadSubtract extends Thread{
        @Override
        public void run() {
            subtract();
        }
    }

    public static void main(String[] args) {
        ThreadAdd threadAdd = new ThreadAdd();
        ThreadSubtract threadSubtract = new ThreadSubtract();
        threadSubtract.setName("subtract 1");

        //測試一: 先開啟添加數據的線程,再開啟一個取數據的線程,大多數情況下會正常取到數據
//        threadAdd.start();
//        threadSubtract.start();

        //測試二: 先開啟取數據的線程,再開啟添加數據的線程, 取數據的線程會先等待, 等到添加數據之后 ,再取數據
//        threadSubtract.start();
//        threadAdd.start();

        //測試三: 開啟兩個取數據的線程,再開啟添加數據的線程
        ThreadSubtract threadSubtract2 = new ThreadSubtract();
        threadSubtract2.setName("subtract 2");
        threadSubtract.start();
        threadSubtract2.start();
        threadAdd.start();
    }
}

//測試1運行結果
Thread-0向集合中添加了數據
成功取出數據,subtract--list.size=0
//測試2運行結果
subtract 1 wait begin
Thread-0向集合中添加了數據
subtract 1 wait end
成功取出數據,subtract--list.size=0
//測試3運行結果
subtract 1 wait begin
subtract 2 wait begin
Thread-0向集合中添加了數據
subtract 2 wait end
成功取出數據subtract 2 list.size=0
subtract 1 wait end
subtract 1 wait begin

參考文檔

https://blog.csdn.net/jisuanji12306/article/details/86363390

https://www.cnblogs.com/xiaowangbangzhu/p/10443103.html

https://www.jianshu.com/p/25e243850bd2?appinstall=0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM