本文的主要目的是介紹 ForkJoinPool 的適用場景,實現原理,以及示例代碼。
說在前面
可以說是說明,也可以說下面是結論:
ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。
ForkJoinPool 主要用於實現“分而治之”的算法,特別是分治之后遞歸調用的函數,例如 quick sort 等。
ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O,線程間同步,sleep() 等會造成線程長時間阻塞的情況時,最好配合使用 ManagedBlocker。
使用
首先介紹的是大家最關心的 Fork/Join Framework 的使用方法,用一個特別簡單的求整數數組所有元素之和來作為我們現在需要解決的問題吧。
問題:計算1至10000000的正整數之和。
方案一:最為普通的for循環解決
最簡單的,顯然是不使用任何並行編程的手段,只用最直白的 for-loop 來實現。下面就是具體的實現代碼。
為了面向接口編程,下面我們把計算的方法定義成接口,不同的方案書寫不同的實現即可
public interface Calculator {
/**
* 把傳進來的所有numbers 做求和處理
*
* @param numbers
* @return 總和
*/
long sumUp(long[] numbers);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
寫一個通過for loop的實現。這段代碼毫無出奇之處,也就不多解釋了
public class ForLoopCalculator implements Calculator {
@Override
public long sumUp(long[] numbers) {
long total = 0;
for (long i : numbers) {
total += i;
}
return total;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
寫一個main方法進行測試:
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
Instant start = Instant.now();
Calculator calculator = new ForLoopCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("結果為:" + result);
}
輸出:
耗時:10ms
結果為:50000005000000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
方案二:ExecutorService多線程方式實現
在 Java 1.5 引入 ExecutorService 之后,基本上已經不推薦直接創建 Thread 對象,而是統一使用 ExecutorService。畢竟從接口的易用程度上來說 ExecutorService 就遠勝於原始的 Thread,更不用提 java.util.concurrent 提供的數種線程池,Future 類,Lock 類等各種便利工具。
由於上面是面向接口的設計,因此我們只需要加一個使用 ExecutorService 的實現類:
public class ExecutorServiceCalculator implements Calculator {
private int parallism;
private ExecutorService pool;
public ExecutorServiceCalculator() {
parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心數 默認就用cpu核心數了
pool = Executors.newFixedThreadPool(parallism);
}
//處理計算任務的線程
private static class SumTask implements Callable<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
public Long call() {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
}
}
@Override
public long sumUp(long[] numbers) {
List<Future<Long>> results = new ArrayList<>();
// 把任務分解為 n 份,交給 n 個線程處理 4核心 就等分成4份唄
// 然后把每一份都扔個一個SumTask線程 進行處理
int part = numbers.length / parallism;
for (int i = 0; i < parallism; i++) {
int from = i * part; //開始位置
int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //結束位置
//扔給線程池計算
results.add(pool.submit(new SumTask(numbers, from, to)));
}
// 把每個線程的結果相加,得到最終結果 get()方法 是阻塞的
// 優化方案:可以采用CompletableFuture來優化 JDK1.8的新特性
long total = 0L;
for (Future<Long> f : results) {
try {
total += f.get();
} catch (Exception ignore) {
}
}
return total;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
main方法改為:
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
Instant start = Instant.now();
Calculator calculator = new ExecutorServiceCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("結果為:" + result); // 打印結果500500
}
輸出:
耗時:30ms
結果為:50000005000000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
方案三:采用ForkJoinPool(Fork/Join)
前面花了點時間講解了 ForkJoinPool 之前的實現方法,主要為了在代碼的編寫難度上進行一下對比。現在就列出本篇文章的重點——ForkJoinPool 的實現方法。
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
//執行任務RecursiveTask:有返回值 RecursiveAction:無返回值
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法為ForkJoin的核心方法:對任務進行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {
// 當需要計算的數字個數小於6時,直接采用for loop方式計算結果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else { // 否則,把任務一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}
輸出:
耗時:390ms
結果為:50000005000000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
可以看出,使用了 ForkJoinPool 的實現邏輯全部集中在了 compute() 這個函數里,僅用了14行就實現了完整的計算過程。特別是,在這段代碼里沒有顯式地“把任務分配給線程”,只是分解了任務,而把具體的任務到線程的映射交給了 ForkJoinPool 來完成。
方案四:采用並行流(JDK8以后的推薦做法)
public static void main(String[] args) {
Instant start = Instant.now();
long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("結果為:" + result); // 打印結果500500
}
輸出:
耗時:130ms
結果為:50000005000000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
並行流底層還是Fork/Join框架,只是任務拆分優化得很好。
耗時效率方面解釋:Fork/Join 並行流等當計算的數字非常大的時候,優勢才能體現出來。也就是說,如果你的計算比較小,或者不是CPU密集型的任務,不太建議使用並行處理
原理
**我一直以為,要理解一樣東西的原理,最好就是自己嘗試着去實現一遍。**根據上面的示例代碼,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的關鍵。我們可以根據函數名假設一下 fork() 和 join() 的作用:
fork():開啟一個新線程(或是重用線程池內的空閑線程),將任務交給該線程處理。
join():等待該任務的處理線程處理完畢,獲得返回值。
疑問:當任務分解得越來越細時,所需要的線程數就會越來越多,而且大部分線程處於等待狀態?
但是如果我們在上面的示例代碼加入以下代碼
System.out.println(pool.getPoolSize());
1
這會顯示當前線程池的大小,在我的機器上這個值是4,也就是說只有4個工作線程。甚至即使我們在初始化 pool 時指定所使用的線程數為1時,上述程序也沒有任何問題——除了變成了一個串行程序以外。
public ForkJoinCalculator() {
pool = new ForkJoinPool(1);
}
- 1
- 2
- 3
- 4
這個矛盾可以導出,我們的假設是錯誤的,並不是每個 fork() 都會促成一個新線程被創建,而每個 join() 也不是一定會造成線程被阻塞。Fork/Join Framework 的實現算法並不是那么“顯然”,而是一個更加復雜的算法——這個算法的名字就叫做work stealing 算法。
ForkJoinPool 的每個工作線程都維護着一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。
在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。
至於Fork和Join源碼級別的的細節,本文不做過多描述了~~
submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味着提交的任務真正開始進入執行階段。
ForkJoinPool的commonPool相關參數配置
commonPool是ForkJoinPool內置的一個線程池對象,JDK8里有些都是使用它的。他怎么來的呢?具體源碼為ForkJoinPool的靜態方法:makeCommonPool
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
參數解釋 以及自定義commonPool的參數
通過代碼指定,必須得在commonPool初始化之前(parallel的stream被調用之前,一般可在系統啟動后設置)注入進去,否則無法生效。
通過啟動參數指定無此限制,較為安全
parallelism(即配置線程池個數)
可以通過java.util.concurrent.ForkJoinPool.common.parallelism進行配置,最大值不能超過MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
1
如果沒有指定,則默認為Runtime.getRuntime().availableProcessors() - 1.
自定義:代碼指定(必須得在commonPool初始化之前注入進去,否則無法生效)
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “8”);
// 或者啟動參數指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
threadFactory:默認為defaultForkJoinWorkerThreadFactory,沒有securityManager的話。
exceptionHandler:如果沒有設置,默認為null
WorkQueue:控制是FIFO還是LIFO
ForkJoinPool 的每個工作線程都維護着一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
queue capacity:隊列容量
繼續介紹
創建了ForkJoinPool實例之后,就可以調用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法來執行指定任務了。
其中ForkJoinTask代表一個可以並行、合並的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。
其中RecusiveTask代表有返回值的任務,
而RecusiveAction代表沒有返回值的任務。
它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入希望的線程數量,那么當前計算機可用的CPU數量會被設置為線程數量作為默認值。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序算法。
這里的要點在於,ForkJoinPool需要使用相對少的線程來處理大量的任務。
比如要對1000萬個數據進行排序,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合並任務。以此類推,對於500萬的數據也會做出同樣的分割處理,到最后會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。
那么到最后,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之后,它才能夠被執行。
所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務隊列中再添加一個任務並且在等待該任務完成之后再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。
使用ThreadPoolExecutor或者ForkJoinPool,會有什么性能的差異呢?
使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,顯然這是不可行的。
這就是工作竊取模式的優點
總結
- 在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事項就可以從原理中找到原因。例如:為什么在 ForkJoinTask 里最好不要存在 I/O 等會阻塞線程的行為?,這個各位讀者可以思考思考了。
- 還有一些延伸閱讀的內容,在此僅提及一下:
ForkJoinPool 有一個 Async Mode ,效果是工作線程在處理本地任務時也使用 FIFO 順序。這種模式下的 ForkJoinPool 更接近於是一個消息隊列,而不是用來處理遞歸式的任務。
在需要阻塞工作線程時,可以使用 ManagedBlocker - Java 1.8 新增加的 CompletableFuture 類可以實現類似於 Javascript 的 promise-chain,內部就是使用 ForkJoinPool 來實現的