1. 使用场景: 对1000000000000000000000000求和
ForkJoinPool 使用时,先将任务 拆分 成 小任务 和 合并任务两部分
public class RecursiveActionTest extends RecursiveTask<Integer> { private static final long serialVersionUID = -3611254198265061729L; //阀值 public static final int threshold = 10; private int start; private int end; public RecursiveActionTest(int start, int end) { this.start = start; this.end = end; } //compute 是一个递归任务 @Override protected Integer compute() { int sum = 0; //如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshold; if (canCompute) { //执行加法任务 for (int i = start; i <= end; i++) { // 每个任务为:sum+i( 此处添加任务) sum += i; } } else { // 如果任务大于阈值,就分裂成10子任务计算,step:每一份的任务数 int step = (start + end) / 10; ArrayList<RecursiveActionTest> subTaskList = new ArrayList<RecursiveActionTest>(); int pos = start; //设置每个小任务的始起始值和终止值 for (int i = 0; i < 10; i++) { int lastOne = pos + step; if (lastOne > end) lastOne = end; RecursiveActionTest subTask = new RecursiveActionTest(pos, lastOne); pos += step + 1; subTaskList.add(subTask); //把子任务推向线程池 subTask.fork(); } // 等待所有子任务完成,并计算值 for (RecursiveActionTest task : subTaskList) { sum += task.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一个计算任务,计算1+2+3+4 RecursiveActionTest task = new RecursiveActionTest(1, 1000000000000000000000000); //执行一个任务 Future<Integer> result = forkjoinPool.submit(task); try { System.out.println(result.get()); } catch (Exception e) { System.out.println(e); } } }
2. forkJoinPool使用步骤:
3. ForkJoinPool 使用场景是什么?
答: 比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
4. ForkJoinPool另外一个使用例子
public class MakeMoneyTask extends RecursiveTask<Integer>{ private static final int MIN_GOAL_MONEY = 100000; private int goalMoney; private String name; private static final AtomicLong employeeNo = new AtomicLong(); public MakeMoneyTask(int goalMoney){ this.goalMoney = goalMoney; this.name = "员工" + employeeNo.getAndIncrement() + "号"; } @Override protected Integer compute() { if (this.goalMoney < MIN_GOAL_MONEY){ System.out.println(name + ": 老板交代了,要赚 " + goalMoney + " 元,为了买车买房,加油吧...."); return makeMoney(); }else{ int subThreadCount = ThreadLocalRandom.current().nextInt(10) + 2; System.out.println(name + ": 上级要我赚 " + goalMoney + ", 有点小多,没事让我" + subThreadCount + "个手下去完成吧," + "每人赚个 " + Math.ceil(goalMoney * 1.0 / subThreadCount) + "元应该没问题..."); List<MakeMoneyTask> tasks = new ArrayList<>(); for (int i = 0; i < subThreadCount; i ++){ tasks.add(new MakeMoneyTask(goalMoney / subThreadCount)); } Collection<MakeMoneyTask> makeMoneyTasks = invokeAll(tasks); int sum = 0; for (MakeMoneyTask moneyTask : makeMoneyTasks){ try { sum += moneyTask.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(name + ": 嗯,不错,效率还可以,终于赚到 " + sum + "元,赶紧邀功去...."); return sum; } } private Integer makeMoney(){ int sum = 0; int day = 1; try { while (true){ Thread.sleep(ThreadLocalRandom.current().nextInt(500)); int money = ThreadLocalRandom.current().nextInt(MIN_GOAL_MONEY / 3); System.out.println(name + ": 在第 " + (day ++) + " 天赚了" + money); sum += money; if (sum >= goalMoney){ System.out.println(name + ": 终于赚到 " + sum + " 元, 可以交差了..."); break; } } } catch (InterruptedException e) { e.printStackTrace(); } return sum; } }