在日常的項目開發中,我們會經常遇到通過多線程執行程序並需要返回執行結果的場景,下面我們就對獲取多線程返回結果的幾種方式進行一下歸納,並進行簡要的分析與總結。
一、Thread.join
在一些簡單的應用場景中我們可以使用線程本身提供的join方法,我們知道join方法的目的是讓一個線程等待另一個線程結束后才能執行,利用此原理我們可以設置一個監控線程用來等待程序線程執行完畢后輸出返回結果,下面我們看下具體示例代碼
首先定義一個結果實體類
public class Result { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
定義工作線程,模擬程序執行並輸出線程執行結果
public class WorkThread extends Thread { private Result result ; public void init(Result result) { this.result = result; } public void run() { try { Thread.sleep(1000*10);//模擬程序執行 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } result.setValue("線程執行完畢,輸出結果"); } }
主線程等待工作線程執行,並獲取工作線程的執行成果
public class MainThread { public static void main(String[] args) throws InterruptedException { Result result = new Result(); WorkThread workThread = new WorkThread(); workThread.init(result); System.out.println("線程啟動"); workThread.start(); System.out.println("線程等待"); // 等待work線程運行完再繼續運行 workThread.join(); System.out.println("線程執行結果:"+result.getValue()); } }
輸出結果
線程啟動
線程等待
線程執行結果:線程執行完畢,輸出結果
以上代碼通過Thread.join的方式,模擬了一個最基本的獲取線程執行結果場景,采用Thread.join的方式雖然使用方便,但這種原生的方式只適用於一些簡單的應用場景中,其主要存在以下問題:
1、獲取多個線程返回結果時較為繁瑣,需要自己手動實現;
2、與線程池無法配合使用;
3、工作線程內部執行復雜度與耗時不確定,程序需要額外完善;
4、本質上還是同步返回結果,主線程阻塞;
二、CountDownLatch
CountDownLatch做為jdk提供的多線程同步工具,CountDownLatch其實本質上可以看做一個線程計數器,統計多個線程執行完成的情況,適用於控制一個或多個線程等待,直到所有線程都執行完畢的場景,因此我們可以利用其功能特點實現獲取多個線程的執行結果,一定程度上彌補了Thread.join的不足,代碼示例如下:
工作線程
public class WorkThread extends Thread { private Vector<Result> vectors ; private CountDownLatch countDownLatch; public WorkThread(CountDownLatch countDownLatch) { this.countDownLatch=countDownLatch; } public void init(Vector<Result> vectors) { this.vectors = vectors; } public void run() { try { Thread.sleep(1000*3);//模擬程序執行 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Result result = new Result(); result.setValue(Thread.currentThread().getName()+"線程執行完畢,輸出結果"); vectors.add(result);//結果放入Vector中 countDownLatch.countDown(); } }
主線程
public class MainThread { public static void main(String[] args) throws InterruptedException { Vector<Result> vectors = new Vector<Result>();//定義一個Vector做為存儲返回結果的容器; final CountDownLatch countDownLatch = new CountDownLatch(5); // 啟動多個工作線程 for (int i = 0; i < 5; i++) { WorkThread workThread = new WorkThread(countDownLatch); workThread.init(vectors); workThread.start(); } System.out.println("主線程等待工作線程執行"); countDownLatch.await(); for (int i=0; i<vectors.size(); i++) { System.out.println(vectors.get(i).getValue()); } } }
輸出結果
主線程等待工作線程執行 Thread-0線程執行完畢,輸出結果 Thread-1線程執行完畢,輸出結果 Thread-2線程執行完畢,輸出結果 Thread-4線程執行完畢,輸出結果 Thread-3線程執行完畢,輸出結果
通過利用jdk的多線程工具類CountDownLatch,我們可以等待多個線程執行完畢后獲取結果,但這種方式局限性較多,如果你的應用場景中啟動的線程次數是固定的且需要等待執行結果全部返回后統一處理,使用CountDownLatch是一個不錯的選擇。
三、Future
1、Future與FutureTask
使用Future,包括 FutureTask、CompletionService、CompletableFuture等
首先我們使用Future配合線程池,獲取線程池執行線程的返回結果
定義一個實現Callable接口的工作線程
public class WorkThread implements Callable<Result> { public Result call() throws Exception { Thread.sleep(5000); Result result = new Result(); result.setValue(Thread.currentThread().getName()+"線程執行完畢,輸出結果"); return result; } }
主線程
public class MainThread { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { ExecutorService taskPool = new ThreadPoolExecutor(5, 15, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy()); Future<Result> future = taskPool.submit(new WorkThread()); System.out.println("線程池執行工作線程"); Result result = future.get();//注意這里get操作是阻塞,future仍屬於同步返回,主線程需要阻塞等待結果返回 //result = future.get(3,TimeUnit.SECONDS);//設置阻塞超時時間 System.out.println(result.getValue()); } }
Future與FutureTask實現方式基本類似,FutureTask其實是對Futue的進一步封裝,通過上面的代碼我們可以看到Future能夠配合ExecutorService 線程池來獲取線程執行的結果,使用起來也較為方便,同時可以設置獲取結果的超時時間,避免長時間阻塞帶來的問題,基本上能夠滿足大部分應用場景下的要求, 但Future獲取結果的get方法是阻塞,本質上是個同步返回,如果希望獲取結果所在線程不阻塞,需要引入其他模式相互配合,這個我們下面會說到。
2、CompletionService
CompletionService可以看作FutureTask的一個進階版,通過FutureTask+阻塞隊列的方式能夠按照線程執行完畢的順序獲取線程執行結果,起到聚合的目的,這個其實跟CountDownLatch差不多,如果你需要執行的線程次數是固定的且需要等待執行結果全部返回后統一處理,可以使用CompletionService,下面我們通過示例代碼進行演示
同上先實現一個工作線程,這次我們為了能體現出結果輸出的順序,在工作線程內部定義一個編號,編號為偶數的線程阻塞一定時間
public class WorkThread implements Callable<Result>{ int num;//線程編號 public WorkThread(int num) { this.num=num; } public Result call() throws InterruptedException { int count = num; if(count%2==0) {//編號為偶數的線程阻塞3秒鍾 Thread.sleep(3*1000); } Result result = new Result(); result.setValue(num+"號線程執行完畢,輸出結果"); return result; } }
主線程中啟動十個線程
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); //定義一個阻塞隊列 BlockingQueue<Future<Result>> futureQueue = new LinkedBlockingQueue<Future<Result>>(); //傳入ExecutorService與阻塞隊列,構造一個completionService CompletionService<Result> completionService = new ExecutorCompletionService<Result>( exec,futureQueue); for(int i=0;i<10;i++) { completionService.submit(new WorkThread(i)); } for(int i=0;i<10;i++) { Result res = completionService.take().get();//注意阻塞隊列take操作,如果獲取不到數據時處於阻塞狀態的 System.out.println(new Date()+ "--"+res.getValue()); } } }
輸出結果如下,可以看到奇數編號的線程結果優先返回,偶數編號的線程由於阻塞3秒后才輸出返回結果,符合程序預期;
Sun Apr 11 18:38:46 CST 2021--3號線程執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--1號線程執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--7號線程執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--9號線程執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--5號線程執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--2號線程執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--4號線程執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--0號線程執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--8號線程執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--6號線程執行完畢,輸出結果
上面主線程代碼中的completionService.take().get()操作,當獲取不到數據也就是當偶數編號線程休眠時仍然會產生阻塞, 其實我們只要對上面代碼進行稍微改造就能避免主線程的阻塞,這也就引出了我們下面要說的生產者與消費者模式;
四、生產者消費者模式
上面我們列舉的幾種獲取多線程執行結果的方式,都是通過不同技術方法來實現的,而生產者消費者模式本身跟你運用的技術實現沒有太多關系,接觸過多線程開發的同學應該都有所了解;
生產者消費者模式如下圖所示
生產者消費者模式是一種能夠解耦與同步生產線程、消費線程、數據集合的多線程設計模式,一個或一組生產者線程負責向數據隊列中生產數據,也就是線程執行結果;另外一個或一組消費者線程負責消費處理數據隊列中的數據,生產者線程與消費者線程相互之間並沒有直接的關聯,數據的交互都是通過數據隊列,通過這種模式能夠很好的在一定程度上解決多線程開發中存在線程同步與安全的問題,同時程序也會看起來更加清晰與方便理解;
當然一個完善的生產者消費者模式我們需要考慮很多其他方面, 但最關鍵的還是以下兩個要素:
1、線程安全,生產者與消費者分別執行讀寫操作,特別是在多個生產線程與消費線程時,一定會存在數據讀寫的並發操作,所以數據隊列一定要保證線程安全;
2、生產與消費的協調,數據隊列滿時生產線程是否停止寫入,數據隊列空時消費線程是否停止消費,這里一方面需要結合你的應用場景,同時也是需要考慮不必要的性能浪費;
下面看下基本的代碼實現
首先定義一個全局的數據隊列,這里我用的JDK提供的阻塞隊列ArrayBlockingQueue,這里同樣也直接可以上面講到的completionService,當然也可以用其他線程安全的數據集合或者自己定義實現,但要注意無論使用哪種都要注意上面的兩個關鍵要素,平常使用中JDK封裝的阻塞隊列已經基本滿足要求;
public class Container { public static ArrayBlockingQueue<Result> arrayBlockingQueue = new ArrayBlockingQueue<>(100);//這里最好根據系統負載量評估一個閾值,避免OOM問題 }
生產者線程實現,隊列數據插入時是采用put還是offer結合應用場景調整
public class ProducerThread extends Thread { public void run() { try { Thread.sleep(1000*3);//模擬程序執行 Result result = new Result(); result.setValue(Thread.currentThread().getName()+"線程執行完畢,輸出結果"); Container.arrayBlockingQueue.put(result);//超過阻塞隊列最大閾值時阻塞,一直阻塞 // if(!Container.arrayBlockingQueue.offer(result, 5, TimeUnit.SECONDS)) {//規定時間內數據入隊失敗 // System.err.println("數據入隊失敗"); // } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消費者線程實現,消費者線程是常駐線程,隊列中沒有數據時就線程阻塞
public class ConsumerThread extends Thread { public void run() { while (!this.isInterrupted()) { try { Result result = Container.arrayBlockingQueue.take();//有數據就消費,沒有就阻塞等待 System.out.println(result.getValue()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
主線程中同時啟動生產線程與消費線程
public class MainThread { public static void main(String[] args) { ExecutorService exec = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for(int i=0;i<100;i++) {//使用線程池模擬生成者生產數據 exec.execute(new ProducerThread()); } for(int i=0;i<2;i++) {//啟動兩個消費者線程 new ConsumerThread().start(); } } }
消費者線程中會輪詢獲取生產者線程執行並放到阻塞隊列中的結果
pool-1-thread-13線程執行完畢,輸出結果 pool-1-thread-2線程執行完畢,輸出結果 pool-1-thread-1線程執行完畢,輸出結果 pool-1-thread-10線程執行完畢,輸出結果 pool-1-thread-9線程執行完畢,輸出結果 pool-1-thread-15線程執行完畢,輸出結果 pool-1-thread-4線程執行完畢,輸出結果 pool-1-thread-5線程執行完畢,輸出結果 pool-1-thread-8線程執行完畢,輸出結果 pool-1-thread-12線程執行完畢,輸出結果 pool-1-thread-16線程執行完畢,輸出結果 ..................................................... .....................................................
生產者消費者模式是程序開發當中一種十分常見且易於理解與掌握的開發設計模式,且適用場景廣泛,希望大家都能夠深入理解與掌握
五、異步回調
上面列舉的獲取線程執行結果的方法都存在一個共性的問題,就是在等待結果的返回過程中,主線程或者消費者線程都是需要阻塞或輪詢等待的,但在一些應用場景下我們是希望線程執行的過程中,程序該干嘛干嘛,繼續向下執行,等到結果返回了再通過回調來通知,這就是異步回調的必要性。實現異步回調思路我這里列舉兩種,一種是多線程與回調,第二種JDK1.8中新加入了一個實現類CompletableFuture,通過這兩種都能夠實現異步獲取線程執行結果的目標
1、多線程與回調
這里其實是在多線程中通過回調的方式把結果返回的方式,我們看下具體實現
首先聲明一個回調接口
public interface CallBack { void notice(Result result); }
定義工作線程,在構造函數中傳入回調接口的實現對象
public class WorkThread implements Runnable{ int num;//線程編號 CallBack callBack; public WorkThread(CallBack callBack, int num) { this.num=num; this.callBack = callBack; } @Override public void run() { // TODO Auto-generated method stub try { Thread.sleep((10-num)*1000);//模擬程序運行時間,倒序輸出 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Result result = new Result(); result.setValue(num+"號線程執行完畢,輸出結果"); callBack.notice(result); } }
調用方及回調方法具體實現
public class MainThread implements CallBack { public void run(int num) { WorkThread workThread = new WorkThread(this,num); new Thread(workThread).start(); } @Override public void notice(Result result) { System.out.println("返回結果:"+result.getValue()); } }
程序執行及輸出
public class App { public static void main(String[] args) { MainThread mainThread = new MainThread(); for(int i=0;i<10;i++) { mainThread.run(i); } System.out.println("繼續執行,表示異步操作"); } }
輸出結果
繼續執行,表示異步操作
返回結果:9號線程執行完畢,輸出結果
返回結果:8號線程執行完畢,輸出結果
返回結果:7號線程執行完畢,輸出結果
返回結果:6號線程執行完畢,輸出結果
返回結果:5號線程執行完畢,輸出結果
返回結果:4號線程執行完畢,輸出結果
返回結果:3號線程執行完畢,輸出結果
返回結果:2號線程執行完畢,輸出結果
返回結果:1號線程執行完畢,輸出結果
返回結果:0號線程執行完畢,輸出結果
多線程+回調也是一種常見的異步回調實現方式,但需要注意的是我們自己手動實現異步回調時還是有很多細節需要考慮完善的,如異常、超時、線程開辟與管理等,這里就不再過多的展開。
2、CompletableFuture
JDK1.8中新增的CompletableFuture中通過函數式的編程方法提供了等同於異步回調的能力,下面我們進行具體實現
工作線程
public class WorkThread { public static Result call(int num) throws InterruptedException { Thread.sleep(5*1000);//模擬程序執行時間 Result result = new Result(); result.setValue(String.valueOf(num)); return result; } }
主線程
public class MainThread {
public static void main(String[] args) { List<String> reslist = new ArrayList<String>(); ExecutorService exs = Executors.newFixedThreadPool(10);//定義一個線程池 List<CompletableFuture<Result>> futureList = new ArrayList<>(); try { for(int i=0;i<10;i++) { final int k = i; CompletableFuture<Result> future=CompletableFuture.supplyAsync(()->{ try { return WorkThread.call(k); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } return null; },exs).thenApply(e->mul(e)).whenComplete((v, e) -> {//thenApply 里面執行就是回調函數CallBack System.out.println("線程"+k+"完成! 結果:"+v.getValue()+",異常 :"+e+","+new Date()); reslist.add(v.getValue()); }); futureList.add(future);//聚合返回結果 } System.out.println("繼續執行,表示異步操作"); }catch (Exception e) { // TODO: handle exception } } public static Result mul(Result result){ try { Integer val = Integer.valueOf(result.getValue())*2; result.setValue(val.toString()); } catch (Exception e) { e.printStackTrace(); } return result; } }
輸出結果如下,可以看到主線程沒有等待線程執行結果返回,繼續向下執行
直接輸出,標識異步操作 線程9完成! 結果:18,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程2完成! 結果:4,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程5完成! 結果:10,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程1完成! 結果:2,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程6完成! 結果:12,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程3完成! 結果:6,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程0完成! 結果:0,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程4完成! 結果:8,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程8完成! 結果:16,異常 :null,Sun Apr 18 17:27:29 CST 2021 線程7完成! 結果:14,異常 :null,Sun Apr 18 17:27:29 CST 2021
CompletableFuture中提供了豐富的API實現,提供了諸如聚合計算等一整套功能,這里就不做太多表述,有興趣的小伙伴可以去多做了解。
六、總結
以上就是針對如何獲取多線程執行結果進行的方法匯總與簡要分析,雖然方法手段多樣,但本質上都還是圍繞線程同步、數據共享、異步回調等幾個思路來進行實現的。在實際的日常開發中的應用,大家還是要結合業務場景具體問題具體分析,一方面固然是要注意程序性能的高效與實現方式的優雅,但另一方面也要注意避免簡單的問題復雜化,反而過猶不及。特別是在多線程的開發過程中,多線程的使用並不就等同於處理效率的提高,要不斷的深入學習與理解,結合應用場景,多分析,多總結,在實踐與積累的過程中逐步提高,真正領會。在此我也希望與大家相互勉勵,共同進步。希望本文對大家能有所幫助,其中如有不足與不正確的地方還望指正與海涵,十分感謝。
關注微信公眾號,查看更多技術文章。