Java fork/join —— 拆分任務並行執行


概念

從JDK1.7開始,Java提供ForkJoin框架用於並行執行任務,它的思想就是講一個大任務分割成若干小任務,最終匯總每個小任務的結果得到這個大任務的結果。作為一個並發框架在jdk7的時候就加入到了我們的java並發包java.util.concurrent中,並且在java 8 的lambda並行流中充當着底層框架的角色。

思維導圖

核心類介紹

  • ForkJoinPool:充當fork/join框架里面的管理者,最原始的任務都要交給它才能處理。它負責控制整個fork/join有多少個workerThread,workerThread的創建,激活都是由它來掌控。它還負責workQueue隊列的創建和分配,每當創建一個workerThread,它負責分配相應的workQueue。然后它把接到的活都交給workerThread去處理,它可以說是整個frok/join的容器。

  • ForkJoinWorkerThread:fork/join里面真正干活的"工人",本質是一個線程。里面有一個ForkJoinPool.WorkQueue的隊列存放着它要干的活,接活之前它要向ForkJoinPool注冊(registerWorker),拿到相應的workQueue。然后就從workQueue里面拿任務出來處理。它是依附於ForkJoinPool而存活,如果ForkJoinPool的銷毀了,它也會跟着結束。

  • ForkJoinPool.WorkQueue: 雙端隊列就是它,它負責存儲接收的任務。

  • ForkJoinTask:代表fork/join里面任務類型,我們一般用它的兩個子類RecursiveTask、RecursiveAction。這兩個區別在於RecursiveTask任務是有返回值,RecursiveAction沒有返回值。任務的處理邏輯包括任務的切分都集中在compute()方法里面。

代碼實現

我們以將一個0到10000的List集合的每個元素進行打印為例:

實現類:

package com.forkjoin.test.task;

import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * forkjoin拆分任務實現
 */
public class ForkJoinTask extends RecursiveTask<List<Integer>> {
    
    private List<Integer> integerList;
    
    public ForkJoinTask(List<Integer> integerList) {
        this.integerList = integerList;
    }

    @Override
    protected List<Integer> compute() {
        System.out.println("數據條數:" + integerList.size());
        // 當數據條數大於100時,將任務拆分
        if(integerList.size() > 100) {
            int minSize = integerList.size() / 2;
            List<Integer> leftList = integerList.subList(0, minSize);
            List<Integer> rightList = integerList.subList(minSize, integerList.size());
            // 拆分出兩個任務 並行執行
            ForkJoinTask leftTask = new ForkJoinTask(leftList);
            ForkJoinTask rightTask = new ForkJoinTask(rightList);
            this.invokeAll(leftTask,rightTask);
        }else {
            // 一直到拆分出的數據不大於100條時,執行任務
            integerList.stream().forEach(x -> {
                System.out.println(x.toString());
            });
        }
        return null;
    }

}

調用類:

package com.forkjoin.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import com.forkjoin.test.task.ForkJoinTask;

public class ForkJoinTest {

    public static void main(String[] args) {
        // 生成一個0-10000的整數集合
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i <= 10000; i++) {
            list.add(i);
        }
        // forkjoin拆分任務調用
        ForkJoinTask task = new ForkJoinTask(list);
        ForkJoinPool pool=ForkJoinPool.commonPool();
        pool.submit(task);
        pool.shutdown();
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM