Java7 Fork-Join 框架:任務切分,並行處理


概要

現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數將會發展的越來越多。
雖然硬件上的多核CPU已經十分成熟,但是很多應用程序並未這種多核CPU做好准備,因此並不能很好地利用多核CPU的性能優勢。
為了充分利用多CPU、多核CPU的性能優勢,級軟基軟件系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處於“空閑”狀態。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上並行執行。當多個“小任務”執行完成之后,再將這些執行結果合並起來即可。

 

Java在JDK7之后加入了並行計算的框架Fork/Join,可以解決我們系統中大數據計算的性能問題。Fork/Join采用的是分治法,Fork是將一個大任務拆分成若干個子任務,子任務分別去計算,而Join是獲取到子任務的計算結果,然后合並,這個是遞歸的過程。子任務被分配到不同的核上執行時,效率最高。偽代碼如下:

 

[java]  view plain  copy
 
  1. Result solve(Problem problem) {  
  2.     if (problem is small)  
  3.         directly solve problem  
  4.     else {  
  5.         split problem into independent parts  
  6.         fork new subtasks to solve each part  
  7.         join all subtasks  
  8.         compose result from subresults  
  9.     }  
  10. }  


Fork/Join框架的核心類是ForkJoinPool,它能夠接收一個ForkJoinTask,並得到計算結果。ForkJoinTask有兩個子類,RecursiveTask(有返回值)和RecursiveAction(無返回結果),我們自己定義任務時,只需選擇這兩個類繼承即可

package test_lock;

 

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RecursiveTask;

public class SumTask  extends RecursiveTask<Integer>{
    
    private static final int THRESHOLD=20;
    
    private int[] array;
    private int low;
    private int high;
    
    public SumTask(int[] array_p,int low_p,int high_p){
        
        this.array=array_p;
        this.low=low_p;
        this.high=high_p;
    }
    
    @Override
    protected Integer compute() {
        // TODO Auto-generated method stub
        
        int sum =0;
        
        if((high-low + 1)<=THRESHOLD){
            
            System.out.println(low +"-"+high +" 計算");
            
            for(int i=low;i<=high;i++){
                sum+=array[i];
            }
        }else{
            System.out.println(low +"-"+high+" 切分");
            
            //1. 一個大任務,分割成兩個子任務;
            
            int mid=(low+high)/2;
            SumTask left =new SumTask(array,low,mid);
            SumTask right=new SumTask(array,mid+1,high);
            //2.分別進行計算
            invokeAll(left,right);
            
            //3.合並結果
            
            sum =left.join()+right.join();
            
            //另一種方法
            
            try{
                sum=left.get()+right.get();
            }catch(Throwable e){
                System.out.println(e.getMessage());
            }
            
            
        }
        return sum;
    }
     
        

    public static void main(String[] args) {
        // TODO Auto-generated method stub
//         1.6 泛型實例的創建可以通過類型推斷來簡化 可以去掉后面new部分的泛型類型,只用<>就可以了。
          //使用泛型前 
        List strList = new ArrayList(); 
        List<String> strList4 = new ArrayList<String>(); 
        List<Map<String, List<String>>> strList5 =  new ArrayList<Map<String, List<String>>>();
     
          
        //編譯器使用尖括號 (<>) 推斷類型 
        List<String> strList0 = new ArrayList<String>(); 
        List<Map<String, List<String>>> strList1 =  new ArrayList<Map<String, List<String>>>(); 
        List<String> strList2 = new ArrayList<>(); 
        List<Map<String, List<String>>> strList3 = new ArrayList<>();
        List<String> list = new ArrayList<>();
        list.add("A");
          // The following statement should fail since addAll expects
          // Collection<? extends String>
        //list.addAll(new ArrayList<>()); 
        
        List<String> strList7 = new ArrayList<String>(); 
        
        for(int i=0;i<10;i++){
            strList7.add(String.valueOf(i));
        }
        
        List<String> ll=new ArrayList<String>();
        
        List<Map<String,List<String>>> ll1=new ArrayList<>();
        
        strList7.forEach(o->{System.out.println(o);});
     

    }



}
package test_lock;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

public class fork_join {
    
     
        

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // TODO Auto-generated method stub
 
         /** 
         * 下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。 
                 大任務是:計算隨機的100個數字的和。 
                 小任務是:每次只能20個數值的和。 
         */ 
        
        int[] array = genArray();
        
        System.out.println(Arrays.toString(array));
        
        int total=0;
        for(int i=0;i<array.length;i++){
             System.out.println("目標和是:"+total);
            total+=array[i];
        }

         System.out.println("目標和是:"+total);
         
         //1. 創建任務:
         
         SumTask sumTask=new SumTask(array,0,array.length-1);
         
         // 2。創建線程池,設置並行計算的個數
         
         int processors=Runtime.getRuntime().availableProcessors();
         System.out.println("processors="+processors);
         ForkJoinPool forkJoinPool =new ForkJoinPool(processors*2);
         
         // 3.提交任務到線程池
         
         forkJoinPool.submit(sumTask);
         
         long begin=System.currentTimeMillis();
         //4 獲取結果
         Integer result =sumTask.get();//wait for 
         
         long end =System.currentTimeMillis();
         
        System.out.println(String.format("結果 %s,耗時 %sms",result,end-begin)); 
        
        if(result==total){
            System.out.println("測試成功!!");
        }else{
            System.out.println("fork join 調用失敗!!!");
        }
    }

    private static int[] genArray() {
        // TODO Auto-generated method stub
        
        int[] array=new int[100];
        
        for(int i=0;i<100;i++){
            array[i]=new Random().nextInt(500);
        }
        return array;
    }

}

 

結果為:

[412, 204, 449, 387, 245, 104, 73, 488, 42, 232, 84, 420, 101, 425, 3, 482, 8, 263, 492, 307, 312, 438, 29, 152, 467, 113, 265, 72, 429, 441, 199, 251, 416, 343, 386, 48, 403, 292, 232, 412, 469, 498, 139, 137, 181, 424, 52, 468, 260, 50, 164, 72, 259, 239, 448, 240, 415, 37, 186, 134, 147, 332, 172, 108, 205, 191, 194, 54, 359, 341, 348, 114, 405, 296, 14, 422, 275, 300, 413, 274, 279, 454, 213, 310, 96, 489, 96, 267, 250, 113, 252, 325, 163, 305, 206, 282, 145, 489, 253, 322]
目標和是:0....
目標和是:25744
目標和是:26066
processors=4
0-99 切分
0-49 切分
0-24 切分
50-99 切分
75-99 切分
25-49 切分
25-37 計算
13-24 計算
0-12 計算
38-49 計算
88-99 計算
50-74 切分
63-74 計算
50-62 計算
75-87 計算
結果 26066,耗時 2ms
測試成功!!

 

 

上面的代碼是一個100個整數累加的任務,切分到小於20個數的時候直接進行累加,不再切分。
我們通過調整閾值(THRESHOLD),可以發現耗時是不一樣的。實際應用中,如果需要分割的任務大小是固定的,可以經過測試,得到最佳閾值;如果大小不是固定的,就需要設計一個可伸縮的算法,來動態計算出閾值。如果子任務很多,效率並不一定會高。 
PS:類似的這種“分而治之”的需求場景,往往帶有遞歸性,實際中,我們可以考慮任務是否具有“遞歸性”來決定是否使用“Fork-Join”框架。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM