之前學習了ThreadPoolExecutor線程池的用法,現在學習下Forkjoin線程池的用法。主要也就是ForkJoinPool,該類和ThreadPoolExecutor是兄弟類,都繼承自AbstractExecutorService。
1.簡介
1.什么是Forkjoin以及適用場景
雖然目前處理器核心數已經發展到最大,但是按任務並發處理並不能完全充分的利用處理器資源,因為一般的應用程序沒有那么多的並發任務。基於這種現象,考慮將一個任務拆分成多個單元,每個單元分別得到執行,最后合並每個單元的結果。
Fork/join是Java7提供的一個用於並發執行任務的框架,是一個把多個大任務分成若干個小任務,最終匯總每個小任務結果得到大任務結果的框架。
說白了就是充分利用多處理器的優勢,讓處理器充分利用起來,並發處理任務。
如果需要查詢CPU數量可以從任務管理器查看,也可以用java代碼查看,如下:
System.out.println(Runtime.getRuntime().availableProcessors());
結果:
4
2.工作竊取算法
一個大任務拆分成許多小任務,為了減小線程間的競爭,把這些子任務分別放到不同的隊列中,並且每個隊列都有單獨的線程來執行任務,線程和隊列一一對應。但是會出現一種情況:A線程處理完了自己隊列的任務,但是B線程的隊列還有很多任務要處理。如果A過去處理B隊列的任務會訪問同一個隊列,造成競爭,解決辦法就是A從雙端隊列的尾部拿任務、B從雙端隊列的頭部拿任務。
注意:每個線程都有自己的隊列(這點和TjreadPoolExecutor不一樣),當自己隊列的任務完成以后會從其他線程的隊列竊取一個任務執行,這樣可以充分利用資源。
優點:利用了線程並行計算,減少了線程間的競爭。
缺點:
(1)如果雙端隊列只有一個任務會產生競爭
(2)竊取算法消耗了更多的系統資源,如會創建多個線程和多個雙端隊列。
3.涉及的主要類
1.ForkJoinTask
它提供任務執行中fork和join操作的機制。一般只需要繼承其子類:
RecursiveTask:用於執行有返回結果的任務,這個泛型需要自己傳。
RecursiveAction:用於執行沒有返回結果的任務,其實這個的泛型只不過是Void
2.ForkJoinPool
ForkJoinTask任務需要在ForkJoinPool中執行。默認的線程池數量等價於CPU數量(我的機子是4)。
3.ForrkjoinWorkerThread
ForkJoinPool中執行任務的線程。
2.簡單使用
比如一個簡單的使用,實現Forkjoin計算0-100000000000的和
ForkJoinTask類:
當需要計算的數值大於step的時候繼續拆分任務,否則拆分任務。
package forkjoin; import java.util.concurrent.RecursiveTask; /** * @author: 喬利強 * @date: 2021/1/7 19:37 * @description: */ public class SumTask extends RecursiveTask<Long> { private long start, end; private long step = 1000000; public SumTask(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; if (end - start <= step) { for (long i = start; i <= end; i++) sum += i; } else { long mid = start + (end - start) / 2; SumTask lt = new SumTask(start, mid); SumTask rt = new SumTask(mid + 1, end); lt.fork(); rt.fork(); long leftsum = lt.join(); long rightsum = rt.join(); sum = leftsum + rightsum; } return sum; } }
測試類:測試單線程同步執行和Forkjoin框架執行
package forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; /** * @author: 喬利強 * @date: 2021/1/7 19:32 * @description: */ public class SumClient { public static void main(String[] args) throws ExecutionException, InterruptedException { // 單線程測試 long startNum = 0; long endNum = 100000000000L; long sum = 0; long startTime = System.currentTimeMillis(); for (; startNum <= endNum; startNum++) { sum += startNum; } long endTime = System.currentTimeMillis(); System.out.println("sum: " + sum + ", 用時: " + (endTime - startTime) + "ms"); // fork-join測試 startTime = System.currentTimeMillis(); // 1. 創建線程池 ForkJoinPool fp = new ForkJoinPool(); // 2.創建一個任務 SumTask task = new SumTask(0, endNum); // 3. 任務交給線程 ForkJoinTask<Long> result = fp.submit(task); // 4.得到結果 Long fkResult = result.get(); endTime = System.currentTimeMillis(); System.out.println("sum: " + fkResult + ", 用時: " + (endTime - startTime) + "ms"); } }
結果:(我在自己的電腦測的時候計算太耗時了就中斷了。我在另一個電腦測的時候單線程大概是40多s、forkjoin大概是20多s)
查看ForkJoinPool的默認構造,如下:可以創建的線程池的大小是CPU數量
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }
補充:並行流parallelStream底層也是用了ForkJoin,並行流適合沒有線程安全問題、較單純的數據處理任務
parallelStream提供了流的並行處理,它是Stream的另一重要特性,其底層使用Fork/Join框架實現。簡單理解就是多線程異步任務的一種實現。
測試:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream().forEach(num -> { try { Thread.sleep(1 * 100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ">>" + num); });
結果: (可以看出打印的順序是亂序證明確實是異步並行執行;打印的線程名字是ForkJoinPool.commonPool-worker-1到3 也證明是采用的ForkJoinPool)
main>>6
ForkJoinPool.commonPool-worker-1>>8
ForkJoinPool.commonPool-worker-2>>3
ForkJoinPool.commonPool-worker-3>>7
main>>5
ForkJoinPool.commonPool-worker-2>>4
ForkJoinPool.commonPool-worker-1>>9
ForkJoinPool.commonPool-worker-3>>2
main>>1
補充:可以通過修改VM參數-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設置worker的數量,測試如下:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "6"); List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream().forEach(num -> { try { Thread.sleep(1 * 100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ">>" + num); });
結果:
main>>6
ForkJoinPool.commonPool-worker-2>>3
ForkJoinPool.commonPool-worker-1>>8
ForkJoinPool.commonPool-worker-6>>5
ForkJoinPool.commonPool-worker-3>>7
ForkJoinPool.commonPool-worker-5>>9
ForkJoinPool.commonPool-worker-4>>2
main>>4
ForkJoinPool.commonPool-worker-2>>1
補充:ForkJoin不一定就肯定比單線程處理塊,這還要看你單線程串行處理每個任務的耗時情況;ForkJoin本身創建線程以及Fork、Join任務都是需要花費時間的,比如:
package forkjoin; import java.util.Arrays; import java.util.List; public class SumClient2 { public static void main(String[] args) { // 修改forkjoin默認的線程池大小(可以修改為比處理器數量多,但是沒必要,一般和處理器數目相同就可以) System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16"); // 模擬處理用時 int handleTime = 2; List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16); long start = System.currentTimeMillis(); numbers.parallelStream().forEach(num -> { try { Thread.sleep(1 * handleTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ">>" + num); }); long end = System.currentTimeMillis(); System.out.println((end - start) + "ms ==="); start = System.currentTimeMillis(); numbers.stream().forEach(num -> { try { Thread.sleep(1 * handleTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ">>" + num); }); end = System.currentTimeMillis(); System.out.println((end - start) + "ms ---"); } }
結果: 可以看出使用ForkJoinPool反而時間多於單線程,當 handleTime 增大的時候使用ForkJoinPool會快於單線程。
main>>11
ForkJoinPool.commonPool-worker-1>>1
ForkJoinPool.commonPool-worker-8>>13
ForkJoinPool.commonPool-worker-9>>6
ForkJoinPool.commonPool-worker-4>>14
ForkJoinPool.commonPool-worker-10>>5
ForkJoinPool.commonPool-worker-6>>2
ForkJoinPool.commonPool-worker-2>>15
ForkJoinPool.commonPool-worker-13>>16
ForkJoinPool.commonPool-worker-11>>3
ForkJoinPool.commonPool-worker-15>>8
ForkJoinPool.commonPool-worker-14>>7
ForkJoinPool.commonPool-worker-3>>12
ForkJoinPool.commonPool-worker-12>>4
ForkJoinPool.commonPool-worker-5>>10
ForkJoinPool.commonPool-worker-7>>9
103ms ===
main>>1
main>>2
main>>3
main>>4
main>>5
main>>6
main>>7
main>>8
main>>9
main>>10
main>>11
main>>12
main>>13
main>>14
main>>15
main>>16
49ms ---
補充: 並行流parallelStream底層也是用了ForkJoin,而且全局公用一個Pool, 默認大小是CPU數量減去一
import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; public class PlainTest { public static void main(String[] args) { List<String> fruitsList = new ArrayList<String>(Arrays.asList("Apple", "Orange", "1", "3", "4", "5", "7")); List<String> fruitsUnmodifiableList = Collections.unmodifiableList(fruitsList); System.out.println(fruitsUnmodifiableList.getClass()); System.out.println(Runtime.getRuntime().availableProcessors()); fruitsList.parallelStream().forEach(c -> { System.out.println(Thread.currentThread().getName() + "\t" + c); try { Thread.sleep(10 * 100000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
結果:
class java.util.Collections$UnmodifiableRandomAccessList
4
main 4
ForkJoinPool.commonPool-worker-2 7
ForkJoinPool.commonPool-worker-3 Apple
ForkJoinPool.commonPool-worker-1 Orange
查看是用的java.util.concurrent.ForkJoinPool#common, 創建過程如下:java.util.concurrent.ForkJoinPool#makeCommonPool
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }