概要
現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數將會發展的越來越多。
雖然硬件上的多核CPU已經十分成熟,但是很多應用程序並未這種多核CPU做好准備,因此並不能很好地利用多核CPU的性能優勢。
為了充分利用多CPU、多核CPU的性能優勢,級軟基軟件系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處於“空閑”狀態。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上並行執行。當多個“小任務”執行完成之后,再將這些執行結果合並起來即可。
Java在JDK7之后加入了並行計算的框架Fork/Join,可以解決我們系統中大數據計算的性能問題。Fork/Join采用的是分治法,Fork是將一個大任務拆分成若干個子任務,子任務分別去計算,而Join是獲取到子任務的計算結果,然后合並,這個是遞歸的過程。子任務被分配到不同的核上執行時,效率最高。偽代碼如下:
- Result solve(Problem problem) {
- if (problem is small)
- directly solve problem
- else {
- split problem into independent parts
- fork new subtasks to solve each part
- join all subtasks
- compose result from subresults
- }
- }
Fork/Join框架的核心類是ForkJoinPool,它能夠接收一個ForkJoinTask,並得到計算結果。ForkJoinTask有兩個子類,RecursiveTask(有返回值)和RecursiveAction(無返回結果),我們自己定義任務時,只需選擇這兩個類繼承即可
package test_lock; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.RecursiveTask; public class SumTask extends RecursiveTask<Integer>{ private static final int THRESHOLD=20; private int[] array; private int low; private int high; public SumTask(int[] array_p,int low_p,int high_p){ this.array=array_p; this.low=low_p; this.high=high_p; } @Override protected Integer compute() { // TODO Auto-generated method stub int sum =0; if((high-low + 1)<=THRESHOLD){ System.out.println(low +"-"+high +" 計算"); for(int i=low;i<=high;i++){ sum+=array[i]; } }else{ System.out.println(low +"-"+high+" 切分"); //1. 一個大任務,分割成兩個子任務; int mid=(low+high)/2; SumTask left =new SumTask(array,low,mid); SumTask right=new SumTask(array,mid+1,high); //2.分別進行計算 invokeAll(left,right); //3.合並結果 sum =left.join()+right.join(); //另一種方法 try{ sum=left.get()+right.get(); }catch(Throwable e){ System.out.println(e.getMessage()); } } return sum; } public static void main(String[] args) { // TODO Auto-generated method stub // 1.6 泛型實例的創建可以通過類型推斷來簡化 可以去掉后面new部分的泛型類型,只用<>就可以了。 //使用泛型前 List strList = new ArrayList(); List<String> strList4 = new ArrayList<String>(); List<Map<String, List<String>>> strList5 = new ArrayList<Map<String, List<String>>>(); //編譯器使用尖括號 (<>) 推斷類型 List<String> strList0 = new ArrayList<String>(); List<Map<String, List<String>>> strList1 = new ArrayList<Map<String, List<String>>>(); List<String> strList2 = new ArrayList<>(); List<Map<String, List<String>>> strList3 = new ArrayList<>(); List<String> list = new ArrayList<>(); list.add("A"); // The following statement should fail since addAll expects // Collection<? extends String> //list.addAll(new ArrayList<>()); List<String> strList7 = new ArrayList<String>(); for(int i=0;i<10;i++){ strList7.add(String.valueOf(i)); } List<String> ll=new ArrayList<String>(); List<Map<String,List<String>>> ll1=new ArrayList<>(); strList7.forEach(o->{System.out.println(o);}); } }
package test_lock; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; public class fork_join { public static void main(String[] args) throws InterruptedException, ExecutionException { // TODO Auto-generated method stub /** * 下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。 大任務是:計算隨機的100個數字的和。 小任務是:每次只能20個數值的和。 */ int[] array = genArray(); System.out.println(Arrays.toString(array)); int total=0; for(int i=0;i<array.length;i++){ System.out.println("目標和是:"+total); total+=array[i]; } System.out.println("目標和是:"+total); //1. 創建任務: SumTask sumTask=new SumTask(array,0,array.length-1); // 2。創建線程池,設置並行計算的個數 int processors=Runtime.getRuntime().availableProcessors(); System.out.println("processors="+processors); ForkJoinPool forkJoinPool =new ForkJoinPool(processors*2); // 3.提交任務到線程池 forkJoinPool.submit(sumTask); long begin=System.currentTimeMillis(); //4 獲取結果 Integer result =sumTask.get();//wait for long end =System.currentTimeMillis(); System.out.println(String.format("結果 %s,耗時 %sms",result,end-begin)); if(result==total){ System.out.println("測試成功!!"); }else{ System.out.println("fork join 調用失敗!!!"); } } private static int[] genArray() { // TODO Auto-generated method stub int[] array=new int[100]; for(int i=0;i<100;i++){ array[i]=new Random().nextInt(500); } return array; } }
結果為:
[412, 204, 449, 387, 245, 104, 73, 488, 42, 232, 84, 420, 101, 425, 3, 482, 8, 263, 492, 307, 312, 438, 29, 152, 467, 113, 265, 72, 429, 441, 199, 251, 416, 343, 386, 48, 403, 292, 232, 412, 469, 498, 139, 137, 181, 424, 52, 468, 260, 50, 164, 72, 259, 239, 448, 240, 415, 37, 186, 134, 147, 332, 172, 108, 205, 191, 194, 54, 359, 341, 348, 114, 405, 296, 14, 422, 275, 300, 413, 274, 279, 454, 213, 310, 96, 489, 96, 267, 250, 113, 252, 325, 163, 305, 206, 282, 145, 489, 253, 322] 目標和是:0.... 目標和是:25744 目標和是:26066 processors=4 0-99 切分 0-49 切分 0-24 切分 50-99 切分 75-99 切分 25-49 切分 25-37 計算 13-24 計算 0-12 計算 38-49 計算 88-99 計算 50-74 切分 63-74 計算 50-62 計算 75-87 計算 結果 26066,耗時 2ms 測試成功!!
上面的代碼是一個100個整數累加的任務,切分到小於20個數的時候直接進行累加,不再切分。
我們通過調整閾值(THRESHOLD),可以發現耗時是不一樣的。實際應用中,如果需要分割的任務大小是固定的,可以經過測試,得到最佳閾值;如果大小不是固定的,就需要設計一個可伸縮的算法,來動態計算出閾值。如果子任務很多,效率並不一定會高。
PS:類似的這種“分而治之”的需求場景,往往帶有遞歸性,實際中,我們可以考慮任務是否具有“遞歸性”來決定是否使用“Fork-Join”框架。