本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接:http://item.jd.com/12299018.html
本節繼續上節的內容,探討如何使用wait/notify實現更多的協作場景。
同時開始
同時開始,類似於運動員比賽,在聽到比賽開始槍響后同時開始,下面,我們模擬下這個過程,這里,有一個主線程和N個子線程,每個子線程模擬一個運動員,主線程模擬裁判,它們協作的共享變量是一個開始信號。我們用一個類FireFlag來表示這個協作對象,代碼如下所示:
static class FireFlag { private volatile boolean fired = false; public synchronized void waitForFire() throws InterruptedException { while (!fired) { wait(); } } public synchronized void fire() { this.fired = true; notifyAll(); } }
子線程應該調用waitForFire()等待槍響,而主線程應該調用fire()發射比賽開始信號。
表示比賽運動員的類如下:
static class Racer extends Thread { FireFlag fireFlag; public Racer(FireFlag fireFlag) { this.fireFlag = fireFlag; } @Override public void run() { try { this.fireFlag.waitForFire(); System.out.println("start run " + Thread.currentThread().getName()); } catch (InterruptedException e) { } } }
主程序代碼如下所示:
public static void main(String[] args) throws InterruptedException { int num = 10; FireFlag fireFlag = new FireFlag(); Thread[] racers = new Thread[num]; for (int i = 0; i < num; i++) { racers[i] = new Racer(fireFlag); racers[i].start(); } Thread.sleep(1000); fireFlag.fire(); }
這里,啟動了10個子線程,每個子線程啟動后等待fire信號,主線程調用fire()后各個子線程才開始執行后續操作。
等待結束
理解join
在理解Synchronized一節中我們使用join方法讓主線程等待子線程結束,join實際上就是調用了wait,其主要代碼是:
while (isAlive()) { wait(0); }
只要線程是活着的,isAlive()返回true,join就一直等待。誰來通知它呢?當線程運行結束的時候,Java系統調用notifyAll來通知。
使用協作對象
使用join有時比較麻煩,需要主線程逐一等待每個子線程。這里,我們演示一種新的寫法。主線程與各個子線程協作的共享變量是一個數,這個數表示未完成的線程個數,初始值為子線程個數,主線程等待該值變為0,而每個子線程結束后都將該值減一,當減為0時調用notifyAll,我們用MyLatch來表示這個協作對象,示例代碼如下:
public class MyLatch { private int count; public MyLatch(int count) { this.count = count; } public synchronized void await() throws InterruptedException { while (count > 0) { wait(); } } public synchronized void countDown() { count--; if (count <= 0) { notifyAll(); } } }
這里,MyLatch構造方法的參數count應初始化為子線程的個數,主線程應該調用await(),而子線程在執行完后應該調用countDown()。
工作子線程的示例代碼如下:
static class Worker extends Thread { MyLatch latch; public Worker(MyLatch latch) { this.latch = latch; } @Override public void run() { try { // simulate working on task Thread.sleep((int) (Math.random() * 1000)); this.latch.countDown(); } catch (InterruptedException e) { } } }
主線程的示例代碼如下:
public static void main(String[] args) throws InterruptedException { int workerNum = 100; MyLatch latch = new MyLatch(workerNum); Worker[] workers = new Worker[workerNum]; for (int i = 0; i < workerNum; i++) { workers[i] = new Worker(latch); workers[i].start(); } latch.await(); System.out.println("collect worker results"); }
MyLatch是一個用於同步協作的工具類,主要用於演示基本原理,在Java中有一個專門的同步類CountDownLatch,在實際開發中應該使用它,關於CountDownLatch,我們會在后續章節介紹。
MyLatch的功能是比較通用的,它也可以應用於上面"同時開始"的場景,初始值設為1,Racer類調用await(),主線程調用countDown()即可,如下所示:
public class RacerWithLatchDemo { static class Racer extends Thread { MyLatch latch; public Racer(MyLatch latch) { this.latch = latch; } @Override public void run() { try { this.latch.await(); System.out.println("start run " + Thread.currentThread().getName()); } catch (InterruptedException e) { } } } public static void main(String[] args) throws InterruptedException { int num = 10; MyLatch latch = new MyLatch(1); Thread[] racers = new Thread[num]; for (int i = 0; i < num; i++) { racers[i] = new Racer(latch); racers[i].start(); } Thread.sleep(1000); latch.countDown(); } }
異步結果
在主從模式中,手工創建線程往往比較麻煩,一種常見的模式是異步調用,異步調用返回一個一般稱為Promise或Future的對象,通過它可以獲得最終的結果。在Java中,表示子任務的接口是Callable,聲明為:
public interface Callable<V> { V call() throws Exception; }
為表示異步調用的結果,我們定義一個接口MyFuture,如下所示:
public interface MyFuture <V> { V get() throws Exception ; }
這個接口的get方法返回真正的結果,如果結果還沒有計算完成,get會阻塞直到計算完成,如果調用過程發生異常,則get方法拋出調用過程中的異常。
為方便主線程調用子任務,我們定義一個類MyExecutor,其中定義一個public方法execute,表示執行子任務並返回異步結果,聲明如下:
public <V> MyFuture<V> execute(final Callable<V> task)
利用該方法,對於主線程,它就不需要創建並管理子線程了,並且可以方便地獲取異步調用的結果,比如,在主線程中,可以類似這樣啟動異步調用並獲取結果:
public static void main(String[] args) { MyExecutor executor = new MyExecutor(); // 子任務 Callable<Integer> subTask = new Callable<Integer>() { @Override public Integer call() throws Exception { // ... 執行異步任務 int millis = (int) (Math.random() * 1000); Thread.sleep(millis); return millis; } }; // 異步調用,返回一個MyFuture對象 MyFuture<Integer> future = executor.execute(subTask); // ... 執行其他操作 try { // 獲取異步調用的結果 Integer result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } }
MyExecutor的execute方法是怎么實現的呢?它封裝了創建子線程,同步獲取結果的過程,它會創建一個執行子線程,該子線程的代碼如下所示:
static class ExecuteThread<V> extends Thread { private V result = null; private Exception exception = null; private boolean done = false; private Callable<V> task; private Object lock; public ExecuteThread(Callable<V> task, Object lock) { this.task = task; this.lock = lock; } @Override public void run() { try { result = task.call(); } catch (Exception e) { exception = e; } finally { synchronized (lock) { done = true; lock.notifyAll(); } } } public V getResult() { return result; } public boolean isDone() { return done; } public Exception getException() { return exception; } }
這個子線程執行實際的子任務,記錄執行結果到result變量、異常到exception變量,執行結束后設置共享狀態變量done為true並調用notifyAll以喚醒可能在等待結果的主線程。
MyExecutor的execute的方法的代碼為:
public <V> MyFuture<V> execute(final Callable<V> task) { final Object lock = new Object(); final ExecuteThread<V> thread = new ExecuteThread<>(task, lock); thread.start(); MyFuture<V> future = new MyFuture<V>() { @Override public V get() throws Exception { synchronized (lock) { while (!thread.isDone()) { try { lock.wait(); } catch (InterruptedException e) { } } if (thread.getException() != null) { throw thread.getException(); } return thread.getResult(); } } }; return future; }
execute啟動一個線程,並返回MyFuture對象,MyFuture的get方法會阻塞等待直到線程運行結束。
以上的MyExecutore和MyFuture主要用於演示基本原理,實際上,Java中已經包含了一套完善的框架Executors,相關的部分接口和類有:
- 表示異步結果的接口Future和實現類FutureTask
- 用於執行異步任務的接口Executor、以及有更多功能的子接口ExecutorService
- 用於創建Executor和ExecutorService的工廠方法類Executors
后續章節,我們會詳細介紹這套框架。
集合點
各個線程先是分頭行動,然后各自到達一個集合點,在集合點需要集齊所有線程,交換數據,然后再進行下一步動作。怎么表示這種協作呢?協作的共享變量依然是一個數,這個數表示未到集合點的線程個數,初始值為子線程個數,每個線程到達集合點后將該值減一,如果不為0,表示還有別的線程未到,進行等待,如果變為0,表示自己是最后一個到的,調用notifyAll喚醒所有線程。我們用AssemblePoint類來表示這個協作對象,示例代碼如下:
public class AssemblePoint { private int n; public AssemblePoint(int n) { this.n = n; } public synchronized void await() throws InterruptedException { if (n > 0) { n--; if (n == 0) { notifyAll(); } else { while (n != 0) { wait(); } } } } }
多個游客線程,各自先獨立運行,然后使用該協作對象到達集合點進行同步的示例代碼如下:
public class AssemblePointDemo { static class Tourist extends Thread { AssemblePoint ap; public Tourist(AssemblePoint ap) { this.ap = ap; } @Override public void run() { try { // 模擬先各自獨立運行 Thread.sleep((int) (Math.random() * 1000)); // 集合 ap.await(); System.out.println("arrived"); // ... 集合后執行其他操作 } catch (InterruptedException e) { } } } public static void main(String[] args) { int num = 10; Tourist[] threads = new Tourist[num]; AssemblePoint ap = new AssemblePoint(num); for (int i = 0; i < num; i++) { threads[i] = new Tourist(ap); threads[i].start(); } } }
這里實現的是AssemblePoint主要用於演示基本原理,Java中有一個專門的同步工具類CyclicBarrier可以替代它,關於該類,我們后續章節介紹。
小結
上節和本節介紹了Java中線程間協作的基本機制wait/notify,協作關鍵要想清楚協作的共享變量和條件是什么,為進一步理解,針對多種協作場景,我們演示了wait/notify的用法及基本協作原理,Java中有專門為協作而建的阻塞隊列、同步工具類、以及Executors框架,我們會在后續章節介紹,在實際開發中,應該盡量使用這些現成的類,而非重新發明輪子。
之前,我們多次碰到了InterruptedException並選擇了忽略,現在是時候進一步了解它了。
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。