之前的兩篇文章中,我們介紹了異步編程,也介紹了線程池的基本概念。也說了,線程池的實現天生也實現了異步任務,允許直接向線程池中進行任務的提交與結果獲取。
但是,我們始終沒有去深入的了解下,異步任務框架對於任務執行的進度是如何監控的,任務執行的結果該如何獲取。
那么,本篇文章就來詳細地探討下異步框架中,關於任務執行過程中的一些狀態以及執行結果反饋的相關細節。
傳統的 Future 模式
我們說過,異步編程的一個好處是:
我只需要定義好任務,向 ExecutorService 中提交即可,而不用關心什么時候,什么線程在執行我們的任務。它會返回一個 Future 對象,我們通過他了解當前任務的執行細節。
Future 接口中定義了以下一些方法:
public interface Future<V> {
//取消執行當前任務
boolean cancel(boolean mayInterruptIfRunning);
//當前任務是否被取消了
boolean isCancelled();
//當前任務是否已經完成
boolean isDone();
//返回任務執行的返回結果,如果任務未完成
//將阻塞在 Future 內部隊列上等待
V get()
//新增超時限制
V get(long timeout, TimeUnit unit)
}
這五個方法,每一個都很重要,為我們監控任務的執行提供有力的支持。而我們的 ThreadPoolExecutor 使用的是 FutureTask 作為 Future 的實現類。
而我們也不妨看看這個 FutureTask 內部都有些哪些成員:
state 和它可取的這些值共同描述了當前任務的執行狀態,是剛開始執行,還是正在執行中,還是正常結束,還是異常結束,還是被取消了,都由這個 state 來體現。
callable 代表當前正在執行的工作內容,這里說一下為什么只有 Callable 類型的任務,因為所有的 Runnable 類型任務都會被事先轉換成 Callable 類型,我覺得主要是統一和抽象實現吧。
outcome 是任務執行結束的返回值,runner 是正在執行當前任務的線程,waiters 是一個簡單的單鏈表,維護的是所有在任務執行結束之前嘗試調用 get 方法獲取執行結果的線程集合。當任務執行結束自當喚醒隊列中所有的線程。
除此之外,還有一個稍顯重要的方法,就是 run 方法,這個方法會在任務開始時由 ExecutorService 調用,這是一個很核心的方法,雖然方法體有點長,但是邏輯簡單,我們大體上概括下。
- 如果任務已經開始將退出方法邏輯的執行
- 調度任務執行,調用 call 方法
- 調用成功將保存結果,異常則將保存異常信息
- 處理中斷
這里需要額外去說一下,第三步中的 set 方法除了會將任務執行的返回結果設置到 FutureTask 的 outcome 字段上,還會調用 finishCompletion 方法完成任務的調用,嘗試喚醒所有在等待任務執行結果的線程。
其他的方法就不去看了,也比較多,還算是簡單的,如果有所想法,也歡迎你和我探討交流。
那么,我們也來看一個最簡單的應用示例:
我們向線程池提交了一個任務,這個任務的工作量不大,就是睡覺然后返回執行結果。而我們可以直接調用 get 方法去獲取任務執行的結果,不過 get 方法是阻塞式的,一旦任務還未執行結束,當前線程將丟失 CPU 進而被阻塞到 Future 的內部隊列上。
所以,推薦大家在 get 返回結果之前,先判斷下目標任務是否已經執行結束,進而避免當前線程的阻塞喚醒所帶來的代價。
到這里,相信你也一定看出來了,FutureTask 實現的 Future 的弊端在 get 方法,這個方法非異步,如果沒有成功獲取到任務的執行結果就將直接阻塞當前線程,以等待任務的執行完成。
但是,有一種情境,當我們向線程池中提交了很多任務,但是不清楚各個任務的執行效率,也就是不知道誰先執行結束,如果直接 get 某個未完成的任務,將導致當前線程阻塞等待。
那么我們能不能阻塞,直接獲取已經執行結束的任務 Future,而未完成的任務不允許獲取它的 Future?
使用 CompletionService
分析 CompletionService 之前,我們搬出之前分析過的一張類圖:
左半邊的類我們已經在前面的文章中都涉獵了,唯獨落下了 CompletionService 這個接口,我們當時說以后會分析它的,現在我們來看看這個接口會給我們帶來哪些能力。
首先,從類的繼承體系上來看,CompletionService 並不與我們的 Executor 產生任何直接關系,線程池的實現也沒有繼承該接口。
實際上來說,CompletionService 只是利用了 Executor 乃至線程池為自己提供任務的提交與執行能力,而自己不過額外的維護一個隊列,保存着所有已經完成的任務的 Future,以至於我們可以直接在外部調用 take 方法直接獲取已完成的任務返回結果,無需阻塞。
廢話不多說,我們寫個小 demo,或許你會有更直接的體驗:
要求:使用多線程計算 1-10000 之間的總和
思路:分段計算,最后總和相加
實現:
相信你運行后一定和我是同樣的答案:50005000
可能很多人會有疑問,這段代碼其實也沒什么特別的地方啊,我使用基本的線程池不一樣也能實現嗎?
但是,實際上並沒有那么簡單,因為你不能確定哪個任務完成了,哪個還沒有,所以你至少需要寫五個循環自旋等待。
而如果你的運氣不好,第一個任務特別慢,即便后續的任務已經結束了,主線程也依然由於第一個任務的結果拿不到而阻塞,耽誤了對其他已完成任務的返回結果處理。
乍一看,你可能覺得差別不大,但仔細分析了才會發現,一旦任務量增大、增多,真的是「差之毫厘,謬以千里」。
其實,原理我也可以帶大家一起來看看,並不難:
先從大家最關心的 CompletionService 實現子類內部結構開始:
這里,至少可以看出來兩點,字段 executor 是一個任務調度器,completionQueue 是一個阻塞隊列。
也就是說,Completion 是完全依賴外部傳入的 Executor 來實現任務的提交與執行的。而這個阻塞隊列 completionQueue 就是保存的所有已經完成的任務 Future 對象。
除此之外,ExecutorCompletionService 還自定義了一個內部類 QueueingFuture,重寫了 FutureTask 的 done 方法。
可能大家對這個 done 沒什么印象,但是還記得我們說過的 finishCompletion 方法嗎?
FutureTask 抽象的描述了一個任務,當線程啟動后將調用 FutureTask 內部的 run 方法執行任務的核心邏輯,並在執行的最后調用 finishCompletion 喚醒所有阻塞在自己隊列上等待返回結果的線程。
而其中 finishCompletion 方法在結束前,會調用一個 done 方法,這個 done 方法在 FutureTask 中是空實現,沒有任何的代碼實現,表示並沒有什么用。
但是我們的 QueueingFuture 充分利用這一點,重寫了 done 方法,而邏輯就是將已結束的任務添加到我們在外部維護的一個新隊列 completionQueue 中,供外部獲取調用。
這些就是 CompletionService 的秘密。