為了防止無良網站的爬蟲抓取文章,特此標識,轉載請注明文章出處。LaplaceDemon/ShiJiaqi。
http://www.cnblogs.com/shijiaqi1066/p/4631466.html
1. Fork/Join框架
fork操作的作用是把一個大的問題划分成若干個較小的問題。在這個划分過程一般是遞歸進行的。直到可以直接進行計算。需要恰當地選取子問題的大小。太大的子問題不利於通過並行方式來提高性能,而太小的子問題則會帶來較大的額外開銷。每個子問題計算完成后,可以得到關於整個問題的部分解。join操作的作用是把這些分解手機組織起來,得到完整解。
簡單的說,ForkJoin其核心思想就是分治。Fork分解任務,Join收集數據。
在fork/join框架中,若某個子問題由於等待另一個子問題的完成而無法繼續執行。那么處理該子問題的線程會主動尋找其他尚未運行完成的子問題來執行。這種方式減少了線程的等待時間,提高了性能。子問題中應該避免使用synchronized關鍵詞或其他方式方式的同步。也不應該是一阻塞IO或過多的訪問共享變量。在理想情況下,每個子問題的實現中都應該只進行CPU相關的計算,並且只適用每個問題的內部對象。唯一的同步應該只發生在子問題和創建它的父問題之間。
2. Fork/Join框架的主要類
一個fork/join框架之下的任務由ForkJoinTask類表示。ForkJoinTask實現了Future接口,可以按照Future接口的方式來使用。在ForkJoinTask類中之重要的兩個方法fork和join。fork方法用以一部方式啟動任務的執行,join方法則等待任務完成並返回指向結果。在創建自己的任務是,最好不要直接繼承自ForkJoinTask類,而要繼承自ForkJoinTask類的子類RecurisiveTask或RecurisiveAction類。兩種的區別在於RecurisiveTask類表示的任務可以返回結果,而RecurisiveAction類不行。
簡單總結:
ForkJoin主要提供了兩個主要的執行任務的接口。RecurisiveAction與RecurisiveTask 。
- RecurisiveAction :沒有返回值的接口。
- RecurisiveTask :帶有返回值的接口。
fork/join框架任務的執行由ForkJoinTask類的對象之外,還可以使用一般的Callable和Runnable接口來表示任務。
ForkJoin要利用線程池ForkJoinPool。每個線程池都有一個WorkQueue實例。ForkJoinPool推薦查看JDK8的源碼,比JDK7更利於理解。
在ForkJoinPool類的對象中執行的任務大支可以分為兩類,一類通過execute、invoke或submit提交的任務;另一類是ForkJoinTask類的對象在執行過程中產生的子任務,並通過fork方法來運行。一般的做法是表示整個問題的ForkJoinTask類的對象用第一類型是提交,而在執行過程中產生的子任務並不需要進行處理,ForkJoinPool類對象會負責子任務的執行。
ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。使用方法與Executor框架類似。ForkJoinPool提供如下兩個常用的構造器:
ForkJoinPool(int parallelism) 創建一個包含parallelism個並行線程的ForkJoinPool。
ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作為parallelism參數來創建ForkJoinPool。
ForkJoinPool有如下三個方法啟動線程:
使用ForkJoinPool的submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執行指定任務。其中ForkJoinTask代表一個可以並行、合並的任務。
客戶端非fork/join調用 | 內部調用fork/join | |
異步執行 | execute(ForkJoinTask) | ForkJoinTask.fork |
等待獲取結果 | invoke(ForkJoinTask) | ForkJoinTask.invoke |
執行,獲取Future | submit(ForkJoinTask) |
ForkJoinTask.fork(ForkJoinTask are Futures) |
ForkJoinTask是分支合並的執行任何,分支合並的業務邏輯使用者可以再繼承了這個抽先類之后,在抽象方法exec()中實現。其中exec()的返回結果和ForkJoinPool的執行調用方(execute(...),invoke(...),submit(...)),共同決定着線程是否阻塞,具體請看下面的測試用例。
ForkJoinTask 是一個抽象類,它還有兩個抽象子類:RecurisiveTask和RecurisiveAction。
RecurisiveTask代表有返回值的任務。RecursiveTask<T>是泛型類。T是返回值的類型。
RecurisiveAction代表沒有返回值的任務。
3. 異常處理
ForkJoinTask在執行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:
if(task.isCompletedAbnormally()) { System.out.println(task.getException()); }
getException方法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。
4. ForkJoinPool
ForkJoinPool是Java 1.7之后新添加的一個ExecutorService實現,在java.util.concurrent中。和其他的ExecutorService一樣,ForkJoinPool在提供自身特殊優勢的同時也可以作為一個普通的Executor框架來使用,通過submit等方法來提交Runnable任務。
ForkJoinPool最大的特殊之處就在於其實現了工作密取(work-stealing)。
工作竊取 (work-stealing)
所謂工作竊取,即當前線程的Task已經全被執行完畢,則自動取到其他線程的Task池中取出Task繼續執行。
ForkJoinPool中維護着多個線程(一般為CPU核數)在不斷地執行Task,每個線程除了執行自己職務內的Task之外,還會根據自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU利用率。
實現原理
ForkJoinPool的具體實現可參考Doug Lea的論文:A Java Fork/Join Framework
ForkJoinPool中的工作線程是由ForkJoinWorkerThread類實現的,其通過維護一個雙端隊列(ForkJoinPool.WorkQueue)來存放Task的,這里的Task一般是ForkJoinTask的子類。每一個工作線程簡單的通過以下兩條原則進行活動:
- 若隊列非空,則代表自己線程的Task還沒執行完畢,取出Task並執行。
- 若隊列為空,則隨機選取一個其他的工作線程的Task並執行。
那么為了減少在對Task雙端隊列進行操作時的Race Condition,這里的雙端隊列通過維護一個top變量和一個base變量來解決這個問題。top變量類似於棧幀,當ForkJoinTask fork出新的Task或者Client從外部提交一個新的Task的ForkJoinPool時,工作線程將Task以LIFO的方式push到雙端隊列的隊頭,top維護隊頭的位置,可以簡單理解為雙端隊列push的部分為一個棧。而base維護隊列的隊尾,當別的線程需要從本工作線程密取任務時,是從雙端隊列的隊尾出取出任務。工作隊列基於以下幾個保證對隊列進行操作:
- push和pop操作只會被owner線程調用。
- 只有非owner線程會調用take操作。
- pop和take操作只有在隊列將要變成空(當前只有一個元素)時才會需要處理同步問題。
也就是說這個實現的雙端隊列將整體的同步問題轉換為了一個two-party的同步問題,對於take而言我們只要提供一個簡單的entry lock來保證所以其他線程的take的一致性,而對於自己owner線程的pop和push幾乎不需要同步。
由於ForkJoinPool的這些特性,因此它除了適合用來實現分而治之的計算框架以外,還非常適合用來作為基於event的異步消息處理執行框架,而Akka正是將ForkJoinPool作為默認的底層ExcutorService。事實證明,ForkJoinPool在Akka這種基於消息傳遞的異步執行環境下能夠展現出非常高的性能優勢,前提是盡量減少在處理過程中的線程阻塞(如IO等待等等)。
為了防止無良網站的爬蟲抓取文章,特此標識,轉載請注明文章出處。LaplaceDemon/ShiJia
http://www.cnblogs.com/shijiaqi1066/p/4631466.html