iwehdio的博客園:https://www.cnblogs.com/iwehdio/
學習自:
1、FutureTask
-
無論是Runnable還是Callable,它們其實和線程沒半毛錢關系,它們是任務類,只有Thread是線程類。
-
JDK那么多類,有且僅有Thread類能通過start0()方法向操作系統申請線程資源(本地方法)。
-
並且,在JVM的設定中Java的線程和操作系統的線程是一一對應的:
-
而Runnable和Callable如果沒有線程或線程池去執行它們,就什么也不是,只是一坨普通的代碼。
public class AsyncAndWaitTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // 方式1:重寫Thread#run() Thread thread = new Thread() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "========>正在執行"); } }; thread.start(); // 方式2:構造方法傳入Runnable實例 new Thread(() -> { System.out.println(Thread.currentThread().getName() + "========>正在執行"); }).start(); // 方式3:線程池 + Callable ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> submit = executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "========>正在執行"); Thread.sleep(3 * 1000L); return "success"; }); String result = submit.get(); System.out.println("result=======>" + result); // 關閉線程池 executorService.shutdown(); } }
-
FutureTask = 任務 + 結果。
- 第四種方法:通過Thread的構造器傳入Runnable實例(FutureTask,內部包裝了Runnable/Callable)。
- 基本使用:
public class AsyncAndWaitTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // FutureTask實現了Runnable,可以看做是一個任務 FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + "========>正在執行"); try { Thread.sleep(3 * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } return "success"; } }); System.out.println(Thread.currentThread().getName() + "========>啟動任務"); // 傳入futureTask,啟動線程執行任務 new Thread(futureTask).start(); // 但它同時又實現了Future,可以獲取異步結果(會阻塞3秒) String result = futureTask.get(); System.out.println("任務執行結束,result====>" + result); } }
-
FutureTask這個名字!它既是一個任務,又能存儲任務執行的結果。反映在程序上就是既能傳入Thread執行,又能通過futureTask.get()獲取任務執行結果。
-
FutureTask有以下2個特征:
- 能包裝Runnable和Callable(構造器傳入),但本身卻又實現了Runnable接口,即本質是Runnable。
- 既然是Runnable,所以FutureTask能作為任務被Thread執行,但詭異的是FutureTask#get()可以獲取結果。
-
FutureTask如何包裝Runnable/Callable:
-
使用:
-
通過FutureTask構造器傳入Runnable/Callable的,所以我們去看看FutureTask的構造器:
-
FutureTask內部維護Callable類型的成員變量,對於Callable任務,直接賦值即可:
-
而對於Runnable任務,需要先調用Executors#callable()把Runnable先包裝成Callable:
-
Executors#callable()用到了適配器模式:
-
而RunnableAdapter實現了Callable接口,所以包裝后的RunnableAdapter可以賦值給FutureTask.callable。
-
也就是說:
- Runnable --> Executors.callable() --> RunnableAdapter implements Callable --> FutureTask.callable
- Callable --> FutureTask.callable
-
-
Runnable和Callable的返回值問題:
-
Callable#call()是有返回值的,而Runnable#run()沒有。它們都包裝成FutureTask后,一個有返回值,一個沒返回值,怎么處理呢。
-
設計成有返回值的,畢竟Callable.call()明明有返回值,你總不能硬生生丟掉吧。至於Runnable.run()確實沒返回值,但也好辦,搞個假的返回即可。
-
等到Thread執行FutureTask時,會先取出FutureTask.callable,然后調用callable.call():
- 如果是真的Callable,調用Callable.call()會返回真實的result
- 如果是Runnable包裝的RunnableAdapter,會返回事先傳入的result
- 這也是上面的程序中,為什么Runnable要多傳一個參數的原因
-
-
FutureTask是如何被Thread執行的:
-
thread執行自己的run方法。這里的target是FutureTask,所以target.run()就是FutureTask#run()。
-
結果最終存哪呢?
-
也是FutureTask的一個成員變量:
-
進一步印證了說 FutureTask = 任務 + 結果。
-
-
為什么get()是阻塞的?
-
在FutureTask中定義了很多任務狀態:
- 剛創建
- 即將完成
- 完成
- 拋異常
- 任務取消
- 任務即將被打斷
- 任務被打斷
-
這些狀態的設置意義在哪?
- 一個任務,有時可能非常耗時。而當用戶使用futureTask.get()時,必然是希望獲取最終結果的。如果FutureTask不幫我們阻塞,就有可能獲取空結果。此時為了獲取最終結果,用戶不得不在外部自己寫阻塞程序。
- 所以,get()內部會判斷當前任務的狀態,只有當任務完成才返回。
-
線程從阻塞到獲取結果,中間必然經歷類似喚醒的操作,怎么做到的?
- 秘密就在awaitDone():核心的就是 for循環 + LockSupport。
LockSupport
是一個線程阻塞工具類,所有的方法都是靜態方法,可以讓線程在任意位置阻塞,當然也有喚醒的方法。LockSupport
主要有兩類方法:park
和unpark
。即讓線程停下和啟動。
-
類似於:
public class ParkTest { @Test public void testPark() throws InterruptedException { // 存儲線程 List<Thread> threadList = new ArrayList<>(); // 創建5個線程 for (int i = 0; i < 5; i++) { Thread thread = new Thread(() -> { System.out.println("我是" + Thread.currentThread().getName() + ", 我開始工作了~"); LockSupport.park(this); System.out.println("我是" + Thread.currentThread().getName() + ", 我又活過來了~"); }); thread.start(); threadList.add(thread); } Thread.sleep(3 * 1000L); System.out.println("====== 所有線程都阻塞了,3秒后全部恢復了 ======"); // unPark()所有線程 for (Thread thread : threadList) { LockSupport.unpark(thread); } // 等所有線程執行完畢 Thread.sleep(3 * 1000L); } }
-
也就是說,調用get()后,如果當前沒有結果,就會被park(),等有了結果再unpark()並往下走:
-
取出outcome返回:
-
-
FutureTask如何異步返回結果:
-
往線程池submit了一個Callable,結果馬上返回了result(FutureTask):
-
觀察:
- 返回的FutureTask里包含剛才丟進去的Callable
- result.outcome目前還是null
-
實際上,返回的futureTask並不是真正的結果,它內部持有outcome引用,它才指向真正的結果。而在任務完成之前,outcome引用指向的是null。
-
-
何時調用futureTask.get()?
-
用戶調用get()必然是想到得到最終結果的,所以為了保證一定能得到結果,JDK把FutureTask#get()設計成阻塞的。
-
建議不要立即調用get(),否則程序完全沒有發揮異步優勢,由異步阻塞變成同步阻塞。
-
開啟多線程,當然應該發揮多線程的優勢:
-
-
isDone() + get():
- 但是實際開發時,異步線程具體會耗時多久有時很難預估,受網絡、數據庫等各方面影響。所以很難做到在合適的地方get()然后一擊即中。
- FutureTask提供了isDone()方法:
- 當然,這種做法也不是很優雅。JDK1.8提供了CompletableFuture解決這個問題。
2、CompletableFuture
-
FutureTask#get()本身是阻塞的,假設當前有三個下載任務在執行:
- task1(預計耗時5秒)
- task2(預計耗時1秒)
- task3(預計耗時1秒)
-
如果阻塞獲取時不湊巧把task1.get()排在最前面,那么會造成一定的資源浪費,因為task2和task3早就已經准備好了,可以先拿出來處理,以獲得最佳的用戶體驗。
-
雖然可以結合輪詢+isDone()的方式改進,但仍存在以下問題:
- 輪詢間隔多少合適?
- 為了避免while(true)阻塞主線程邏輯,可能需要開啟單獨的線程輪詢,浪費一個線程。
- 仍然無法處理復雜的任務依賴關系。
-
CompletableFuture的簡單使用:
@Test public void testCallBack() throws InterruptedException, ExecutionException { // 提交一個任務,返回CompletableFuture CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("=============>異步線程開始..."); System.out.println("=============>異步線程為:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=============>異步線程結束..."); return "supplierResult"; } }); // 阻塞獲取結果 System.out.println("異步結果是:" + completableFuture.get()); System.out.println("main結束"); }
- 整個過程看起來和同步沒啥區別,因為我們在main線程中使用了CompletableFuture#get(),直接阻塞了。
-
CompletableFuture和FutureTask的異同點:
-
相同:都實現了Future接口,所以都可以使用諸如Future#get()、Future#isDone()、Future#cancel()等方法
-
不同:
-
- FutureTask實現了Runnable,所以它可以作為任務被執行,且內部維護outcome,可以存儲結果
- CompletableFuture沒有實現Runnable,無法作為任務被執行,所以你無法把它直接丟給線程池執行,相反地,你可以把Supplier#get()這樣的函數式接口實現類丟給它執行
- CompletableFuture實現了CompletionStage,支持異步回調。
-
-
FutureTask和CompletableFuture最大的區別在於,FutureTask需要我們主動阻塞獲取,而CompletableFuture支持異步回調。
-
CompletableFuture好像承擔的其實是線程池的角色,而Supplier#get()則對應Runnable#run()、Callable#call()。
-
CompletionStage的基本使用:
@Test public void testCallBack() throws InterruptedException, ExecutionException { // 提交一個任務,返回CompletableFuture(注意,並不是把CompletableFuture提交到線程池,它沒有實現Runnable) CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("=============>異步線程開始..."); System.out.println("=============>異步線程為:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=============>異步線程結束..."); return "supplierResult"; } }); // 異步回調:上面的Supplier#get()返回結果后,異步線程會回調BiConsumer#accept() completableFuture.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) { System.out.println("=============>異步任務結束回調..."); System.out.println("=============>回調線程為:" + Thread.currentThread().getName()); } }); // CompletableFuture的異步線程是守護線程,一旦main結束就沒了,為了看到打印結果,需要讓main休眠一會兒 System.out.println("main結束"); TimeUnit.SECONDS.sleep(15); }
-
結果:
=============>異步線程開始... =============>異步線程為:ForkJoinPool.commonPool-worker-9 main結束 =============>異步線程結束... =============>異步任務結束回調... =============>回調線程為:ForkJoinPool.commonPool-worker-9
-
-
主線程調用了CompletableFuture#whenComplete():
-
這個方法定義在CompletionStage接口中:
public interface CompletionStage<T> { public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action); // 省略其他方法... }
-
而CompletableFuture實現了whenComplete():
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { // 省略其他方法... public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } // 省略其他方法... }
-
CompletionStage是什么呢?
- 是一個“很簡單”的接口。完全獨立,沒有繼承任何其他接口,所有方法都是它自己定義的。
public interface CompletionStage<T> { // 定義了超級多類似whenComplete()的方法 }
- 是個不簡單的接口。因為CompletableFuture實現Future的同時,還實現了它。Future方法就6、7個,而CompletionStage的方法超級多,所以如果你打開CompletableFuture的源碼,目之所及幾乎都是它對CompletionStage的實現。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { // 一些字段 // 實現Future的方法 // 實現CompletionStage的方法 // 一些私有方法,配合CompletionStage // 一些內部類,配合CompletionStage }
- 異步回調其實和CompletionStage有着很大的關系。
-
總而言之,CompletionStage是一個接口,定義了一些方法,CompletableFuture實現了這些方法並設計出了異步回調的機制
-
-
異步線程會回調BiConsumer#accept(),而CompletableFuture#whenComplete()是主線程調用的。即CompletionStage中定義的諸如whenComplete()等方法雖然和異步回調有關系,但並不是最終被回調的方法,最終被回調的其實是whenComplete(BiConsumer)傳進去的BiConsumer#accept()。
-
異步線程哪來的,Supplier如何被執行?
-
跟隨主線程進入CompletableFuture#supplyAsync():
-
注釋:返回一個新的CompletableFuture,該future是由運行在{@link ForkJoinPool#commonPool()}中的任務異步完成的,其值是通過調用給定的Supplier獲得的。
- 即異步線程來自ForkJoinPool線程池。
- 通過CompletableFuture#supplyAsync(supplier)傳入Supplier,返回CompletableFuture對象,它包含一個未來的value,且這個value會在稍后由異步線程執行Supplier#get()產生。
-
CompletableFuture#supplyAsync(supplier)內部調用了asyncSupplyStage(asyncPool, supplier),此時傳入了一個線程池asyncPool,它是CompletableFuture的成員變量:
-
useCommonPool為true時會使用ForkJoinPool,而useCommonPool取決於運行當前程序的硬件是否支持多核CPU。
-
主線程傳進來的Supplier壓根沒有實現Runnable/Callable接口,怎么被異步線程執行呢?
-
和ExecutorService#submit()一樣的套路:包裝成Task再執行。只不過這次被包裝成了AsyncSupply,而不是FutureTask:
-
AsyncSupply和當初的FutureTask頗為相似,都實現了Future和Runnable,具備 任務+結果 雙重屬性:
-
最終就是把Supplier包裝好,傳入線程池的execute()中運行。等線程池分配出線程,最終會執行AsyncSupply#run()。
-
AsyncSupply#run()在方法內調用f.get(),也就是Supplier#get(),阻塞獲取結果並通過d.completeValue(v)把值設置到CompletableFuture中,而CompletableFuture d已經在上一步asyncSupplyStage()中被返回。最終效果和線程池+FutureTask是一樣的,先返回Future實例,再通過引用把值放進去。
- 從這個層面上來看,CompletableFuture相當於一個自帶線程池的Future,而CompletableFuture#supplyAsync(Supplier)倒像是ExecutorService#submit(Runnable/Callable),內部也會包裝任務,最終丟給Executor#execute(Task)。
- 只不過ExecutorService是把Runnable#run()/Callable#call()包裝成FutureTask,而CompletableFuture則把亂七八糟的Supplier#get()等函數式接口的方法包裝成ForkJoinTask。
-
-
回調機制的原理:
-
CompletableFuture的回調機制,其實本質上是對多個CompletableFuture內部函數的順序執行,只不過發起者是異步線程而不是主線程
-
CompletableFuture#thenApply(),與CompletableFuture#whenComplete()本質是一樣的(也是CompletableFuture對CompletionStage的實現):
@RunWith(SpringRunner.class) @SpringBootTest public class CompletableFutureTest { @Test public void testCallBack() throws InterruptedException { // 任務一:把第一個任務推進去,順便開啟異步線程 CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("=============>異步線程開始..."); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=============>completableFuture1任務結束..."); System.out.println("=============>執行completableFuture1的線程為:" + Thread.currentThread().getName()); return "supplierResult"; } }); System.out.println("completableFuture1:" + completableFuture1); // 任務二:把第二個任務推進去,等待異步回調 CompletableFuture<String> completableFuture2 = completableFuture1.thenApply(new Function<String, String>() { @Override public String apply(String s) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=============>completableFuture2任務結束 result=" + s); System.out.println("=============>執行completableFuture2的線程為:" + Thread.currentThread().getName()); return s; } }); System.out.println("completableFuture2:" + completableFuture2); // 任務三:把第三個任務推進去,等待異步回調 CompletableFuture<String> completableFuture3 = completableFuture2.thenApply(new Function<String, String>() { @Override public String apply(String s) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=============>completableFuture3任務結束 result=" + s); System.out.println("=============>執行completableFuture3的線程為:" + Thread.currentThread().getName()); return s; } }); System.out.println("completableFuture3:" + completableFuture3); System.out.println("主線程結束"); TimeUnit.SECONDS.sleep(40); } }
-
分析主線程的主干:
- CompletableFuture#supplyAsync(Supplier):包裝Supplier為AsyncSupply,調用executor.execute(),等待異步線程回調Supplier#get()
- CompletableFuture#thenApply(Function)
- CompletableFuture#thenApply(Function)
-
主線程在執行“任務一”的CompletableFuture#supplyAsync(Supplier)時,將Supplier包裝成AsyncSupply任務,並開啟了異步線程,此后異步線程會阻塞在Supplier#get():
-
Supplier#get()是異步線程開啟后執行的第一站!
-
與此同時,主線程繼續執行后面的“任務二”、“任務三”,並且都會到達uniApply(),且都返回false,因為a.result==null。
-
當主線程從任務二進來,調用thenApply()。最終會到達uniApply(),通過控制台的日志,我們發現a其實就是completableFuture1。因為uniApply()的上一步傳入的this:
-
也就是說:
- 主線程 ---> completableFuture1.thenApply(Function#apply) ---> !d.uniApply(this, f#apply, null)
- a.result就是completableFuture1.result,而completableFuture1的值來自Supplier#get(),此時確實還是null(異步線程阻塞設定的秒數秒后才會)。
-
所以此時d.uniApply(this, f, null) 為false,那么!d.uniApply(this, f, null) 為true,就會進入if語句:
-
主要做了3件事:
- 傳入Executor e、新建的CompletableFuture d、當前completableFuture1、Function f,構建UniApply
- push(uniApply)
- uniApply.tryFire(SYNC)
-
任務一做了兩件事:
- 開啟異步線程
- 等待回調
-
由於要開啟線程,自己也要作為任務被執行,所以Supplier#get()被包裝成AsyncSupply,是一個Task。而后續的幾個任務其實只做了一件事:等待回調。只要能通過實例執行方法即可,和任務一有所不同,所以只是被包裝成UniApply對象。
-
push(uniApply)姑且認為是把任務二的Function#apply()包裝后塞到任務棧中。
-
但uniApply.tryFire(SYNC)是干嘛的呢?里面又調了一次uniApply():
-
SYNC=0,所以最終判斷!d.uniApply(this, f, this) ==true,tryFire(SYNC)返回null,后面的d.postFire(a, mode)此時並不會執行,等后面異步線程復蘇后,帶着任務一的結果再次調用時,效果就截然不同了。
-
總結一下,“任務二”、“任務三”操作都是一樣的,都做了3件事:
- 主線程調用CompletableFuture#thenApply(Function f)傳入f,構建UniApply對象,包裝Function#apply()
- 把構建好的UniApply對象push到棧中
- 返回CompletableFuture d
-
等過了100秒,supplyAsync(Supplier)中的Supplier#get()返回結果后,異步線程繼續往下走:
- postComplete()也會走uniApply(),但這次已經有了異步結果result,所以流程不會被截斷,最終會調用Function#apply(s),而這個s是上一個函數的執行結果
- 也就是說,新的CompletableFuture對象調用Function#apply()處理了上一個CompletableFuture產生的結果。
-
-
CompletableFuture與FutureTask線程數對比:
-
CompletableFuture和FutureTask耗費的線程數是一致的,但對於FutureTask來說,無論是輪詢還是阻塞get,都會導致主線程無法繼續其他任務,又或者主線程可以繼續其他任務,但要時不時check FutureTask是否已經完成任務,比較糟心。而CompletableFuture則會根據我們編排的順序逐個回調,是按照既定路線執行的。
-
其實無論是哪種方式,異步線程其實都需要阻塞等待結果,期間不能處理其他任務。但對於FutureTask而言,在異步線程注定無法復用的前提下,如果想要獲取最終結果,需要主線程主動查詢或者額外開啟一個線程查詢,並且可能造成阻塞,而CompletableFuture的異步任務執行、任務結果獲取都是異步線程獨立完成。
-
即,1個異步線程阻塞執行任務 + 回調異步結果 > 1個異步線程阻塞執行任務 + 1個線程阻塞查詢任務。
-