說說 Java 線程間通信


 序言


 

 正文


 一、Java線程間如何通信?

線程間通信的目標是使線程間能夠互相發送信號,包括如下幾種方式:

1、通過共享對象通信

線程間發送信號的一個簡單方式是在共享對象的變量里設置信號值;線程A在一個同步塊里設置boolean型成員變量hasDataToProcess為true,線程B也在同步塊里讀取hasDataToProcess這個成員變量;線程A和B必須獲得指向一個MySignal共享實例的引用,以便進行通信;如果它們持有的引用指向不同的MySingal實例,那么彼此將不能檢測到對方的信號;需要處理的數據可以存放在一個共享緩存區里,它和MySignal實例是分開存放的。示例如下:

public class MySignal{
  protected boolean hasDataToProcess = false;

  public synchronized boolean getHasDataToProcess(){
    return this.hasDataToProcess;
  }
  public synchronized void setHasDataToProcess(boolean hasData){
    this.hasDataToProcess = hasData;
  }
}
View Code

【場景展現】:

B同學去了圖書館,發現這本書被借走了(執行了例子中的hasDataToProcess),他回到宿舍,等了幾天,再去圖書館找這本書,發現這本書已經被還回,他順利借走了書。

2、忙等待

准備處理數據的線程B正在等待數據變為可用;換句話說,它在等待線程A的一個信號,這個信號使hasDataToProcess()返回true,線程B運行在一個循環里,以等待這個信號。示例如下:

protected MySignal sharedSignal = ...
...
while(!sharedSignal.hasDataToProcess()){
  //do nothing... busy waiting
}
View Code

【場景展現】:

假如A同學在B同學走后一會就把書還回去了,B同學卻是在幾天后再次去圖書館找的書,為了早點借到書(減少延遲),B同學可以就在圖書館等着,比如,每隔幾分鍾(while循環)他就去檢查這本書有沒有被還回,這樣只要A同學一還回書,B同學很快就會知道。

3、wait(),notify()和notifyAll()

忙等待沒有對運行等待線程的CPU進行有效的利用,除非平均等待時間非常短,否則,讓等待線程進入睡眠或者非運行狀態更為明智,直到它接收到它等待的信號。

一個線程一旦調用了任意對象的wait()方法,就會變為非運行狀態,直到另一個線程調用了同一個對象的notify()方法;為了調用wait()或者notify(),線程必須先獲得那個對象的鎖;也就是說,線程必須在同步塊里調用wait()或者notify()。示例如下:

public class MonitorObject{
}

public class MyWaitNotify{
  MonitorObject myMonitorObject = new MonitorObject();

  public void doWait(){
    synchronized(myMonitorObject){
      try{
        myMonitorObject.wait();
      } catch(InterruptedException e){...}
    }
  }
  public void doNotify(){
    synchronized(myMonitorObject){
      myMonitorObject.notify();
    }
  }
}
View Code

等待線程調用doWait(),而喚醒線程調用doNotify();當一個線程調用一個對象的notify()方法,正在等待該對象的所有線程中將有一個線程被喚醒並允許執行(這個將被喚醒的線程是隨機的,不可以指定喚醒哪個線程),可以使用notifyAll()方法來喚醒正在等待一個指定對象的所有線程。

【場景展現】:

檢查很多次后,B同學發現這樣做自己太累了,身體有點吃不消,不過很快,學校圖書館系統改進,加入了短信通知功能(notify()),只要A同學一還回書,立馬會短信通知B同學,這樣B同學就可以在家睡覺等短信了。

4、丟失的信號

notify()和notifyAll()方法不會保存調用它們的方法,因為當這兩個方法被調用時,有可能沒有線程處於等待狀態,通知信號過后便丟棄了;因此,如果一個線程先於被通知線程調用wait()前調用了notify(),等待的線程將錯過這個信號,在某些情況下,這可能使等待線程永遠在等待,不再醒來,因為線程錯過了喚醒信號。
為了避免丟失信號,必須把它們保存在信號類里。示例如下:

public class MyWaitNotify2{
  MonitorObject myMonitorObject = new MonitorObject();
  boolean wasSignalled = false;

  public void doWait(){
    synchronized(myMonitorObject){
      if(!wasSignalled){
        try{
          myMonitorObject.wait();
         } catch(InterruptedException e){...}
      }
      //clear signal and continue running.
      wasSignalled = false;
    }
  }

  public void doNotify(){
    synchronized(myMonitorObject){
      wasSignalled = true;
      myMonitorObject.notify();
    }
  }
}
View Code

【場景展現】:

學校圖書館系統是這么設計的:當一本書被還回來的時候,會給等待者發送短信,並且只會發一次,如果沒有等待者,他也會發(只不過沒有接收者),這樣問題就出現了,因為短信只會發一次,當書被還回來的時候,沒有人等待借書,他會發一條空短信,但是之后有等待借此本書的同學永遠也不會再收到短信,導致這些同學會無休止的等待;為了避免這個問題,我們在等待的時候先打個電話問問圖書館管理員是否繼續等待(if(!wasSignalled))。

5、假喚醒

由於某種原因,線程有可能在沒有調用過notify()和notifyAll()的情況下醒來,這就是所謂的假喚醒(spurious wakeups)。

如果在MyWaitNotify2的doWait()方法里發生了假喚醒,等待線程即使沒有收到正確的信號,也能夠執行后續的操作,這可能出現嚴重問題。

為了防止假喚醒,保存信號的成員變量將在一個while循環里接受檢查,而不是在if表達式里,這樣的一個while循環叫做自旋鎖(這種做法會消耗CPU,如果長時間不調用doNotify方法,doWait方法會一直自旋,CPU會有很大消耗),被喚醒的線程會自旋直到自旋鎖(while循環)里的條件變為false。示例如下:

public class MyWaitNotify3{
  MonitorObject myMonitorObject = new MonitorObject();
  boolean wasSignalled = false;

  public void doWait(){
    synchronized(myMonitorObject){
      while(!wasSignalled){
        try{
          myMonitorObject.wait();
         } catch(InterruptedException e){...}
      }
      //clear signal and continue running.
      wasSignalled = false;
    }
  }
  public void doNotify(){
    synchronized(myMonitorObject){
      wasSignalled = true;
      myMonitorObject.notify();
    }
  }
}
View Code

 【場景展現】:

圖書館系統還有一個bug:系統會偶爾給你發條錯誤短信,說書可以借了(其實書不可以借),我們之前已經給圖書館管理員打過電話了,他說讓我們等短信,我們很聽話,一等到短信(其實是bug引起的錯誤短信),就去借書了,到了圖書館后發現這書根本就沒還回來!我們很郁悶,但也沒辦法啊,學校不修復bug,我們得聰明點:每次在收到短信后,再打電話問問書到底能不能借(while(!wasSignalled))。

 

 二、多個線程如何按順序執行?

多個線程如何保證執行順序,是一個很高頻的面試題,實現方式很多,這里介紹四種實現方式:

1、使用Thread的join方法

Thread類中的join方法的主要作用就是同步,調用線程需等待join線程執行完或指定時間后執行,如:join(10),表示等待某線程執行10秒后再執行。示例如下:

public class ThreadChildJoin {
    public static void main(String[] args) {
        final Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("需求分析...");
            }
        });

        final Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    t1.join();
                    System.out.println("功能開發...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    t2.join();
                    System.out.println("功能測試...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t3.start();
        t1.start();
        t2.start();
    }
}
View Code

2、使用Condition(條件變量)

Condition是一個多線程間協調通信的工具類,使得某個或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。

Condition類主要方法包括:await方法(類似於Object類中的wait()方法)、signal方法(類似於Object類中的notify()方法)、signalAll方法(類似於Object類中的notifyAll()方法)。示例如下:

public class ThreadCondition {
    private static Lock lock = new ReentrantLock();
    private static Condition condition1 = lock.newCondition();
    private static Condition condition2 = lock.newCondition();

    /**
     * 為什么要加這兩個標識狀態?
     * 如果沒有狀態標識,當t1已經運行完了t2才運行,t2在等待t1喚醒導致t2永遠處於等待狀態
     */
    private static Boolean t1Run = false;
    private static Boolean t2Run = false;

    public static void main(String[] args) {

        final Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                System.out.println("需求分析...");
                t1Run = true;
                condition1.signal();
                lock.unlock();
            }
        });

        final Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    if(!t1Run){
                        condition1.await();
                    }
                    System.out.println("功能開發...");
                    t2Run = true;
                    condition2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();
            }
        });

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    if(!t2Run){
                        condition2.await();
                    }
                    System.out.println("功能測試...");
                    lock.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t3.start();
        t1.start();
        t2.start();
    }
}
View Code

3、使用CountDownLatch(倒計數)

顧名思義,使用CountDownLatch可以實現類似計數器的功能。示例如下:

public class ThreadCountDownLatch {
    private static CountDownLatch c1 = new CountDownLatch(1);

    /**
     * 用於判斷線程二是否執行,倒計時設置為1,執行后減1
     */
    private static CountDownLatch c2 = new CountDownLatch(1);

    public static void main(String[] args) {
        final Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("需求分析...");
                //對c1倒計時-1
                c1.countDown();
            }
        });

        final Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //等待c1倒計時,計時為0則往下運行
                    c1.await();
                    System.out.println("功能開發...");
                    //對c2倒計時-1
                    c2.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //等待c2倒計時,計時為0則往下運行
                    c2.await();
                    System.out.println("功能測試...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t3.start();
        t1.start();
        t2.start();
    }
}
View Code

4、使用CyclicBarrier(回環柵欄)

CyclicBarrier可以實現讓一組線程等待至某個狀態之后再全部同時執行,“回環”是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用,可以把這個狀態當做barrier,當調用await()方法之后,線程就處於barrier了。示例如下:

public class ThreadCyclicBarrier {
    static CyclicBarrier barrier1 = new CyclicBarrier(2);
    static CyclicBarrier barrier2 = new CyclicBarrier(2);

    public static void main(String[] args) {

        final Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("需求分析...");
                    //放開柵欄1
                    barrier1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

        final Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //放開柵欄1
                    barrier1.await();
                    System.out.println("功能開發...");
                    //放開柵欄2
                    barrier2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

        final Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //放開柵欄2
                    barrier2.await();
                    System.out.println("功能測試...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

        t3.start();
        t1.start();
        t2.start();
    }
}
View Code

 

參考:

[1] http://ifeve.com/thread-signaling/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM