Fork/Join框架的介紹
第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。
第二步執行任務並合並結果。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。
Fork/Join使用兩個類來完成以上兩件事情:
- ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個子類:
-
RecursiveAction:用於沒有返回結果的任務。
-
RecursiveTask :用於有返回結果的任務。
-
- ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
package com.thread.test.thread; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * Created by windwant on 2016/6/3. */ public class MyForkJoin { public static void main(String[] args) { MyTask task = new MyTask(new File("D:\\MPS")); Integer sum = new ForkJoinPool().invoke(task); System.out.println(sum); } } class MyTask extends RecursiveTask<Integer>{ public Integer num = 0; private File file; MyTask(File file){ this.file = file; } @Override protected Integer compute() { List<MyTask> taskList = new ArrayList<MyTask>(); if(file.isDirectory()){ File[] list = file.listFiles(); for(File subf: list){ if(subf.isDirectory()){ MyTask mt = new MyTask(subf); taskList.add(mt); }else{ num++; } } }else{ num = 1; } if(!taskList.isEmpty()){ //同下 // for(MyTask mtask: taskList){ // mtask.fork(); // } // for(MyTask mtask: taskList){ // num += mtask.join(); // } for(MyTask mtask: invokeAll(taskList)){ num += mtask.join(); } } return num; } }