Forkjoin線程池


  之前學習了ThreadPoolExecutor線程池的用法,現在學習下Forkjoin線程池的用法。主要也就是ForkJoinPool,該類和ThreadPoolExecutor是兄弟類,都繼承自AbstractExecutorService。

1.簡介

1.什么是Forkjoin以及適用場景  

  雖然目前處理器核心數已經發展到最大,但是按任務並發處理並不能完全充分的利用處理器資源,因為一般的應用程序沒有那么多的並發任務。基於這種現象,考慮將一個任務拆分成多個單元,每個單元分別得到執行,最后合並每個單元的結果。

  Fork/join是Java7提供的一個用於並發執行任務的框架,是一個把多個大任務分成若干個小任務,最終匯總每個小任務結果得到大任務結果的框架。

   說白了就是充分利用多處理器的優勢,讓處理器充分利用起來,並發處理任務。

  如果需要查詢CPU數量可以從任務管理器查看,也可以用java代碼查看,如下:

System.out.println(Runtime.getRuntime().availableProcessors());

結果:

4

2.工作竊取算法

  一個大任務拆分成許多小任務,為了減小線程間的競爭,把這些子任務分別放到不同的隊列中,並且每個隊列都有單獨的線程來執行任務,線程和隊列一一對應。但是會出現一種情況:A線程處理完了自己隊列的任務,但是B線程的隊列還有很多任務要處理。如果A過去處理B隊列的任務會訪問同一個隊列,造成競爭,解決辦法就是A從雙端隊列的尾部拿任務、B從雙端隊列的頭部拿任務。

 注意:每個線程都有自己的隊列(這點和TjreadPoolExecutor不一樣),當自己隊列的任務完成以后會從其他線程的隊列竊取一個任務執行,這樣可以充分利用資源。

優點:利用了線程並行計算,減少了線程間的競爭。

缺點:

(1)如果雙端隊列只有一個任務會產生競爭

(2)竊取算法消耗了更多的系統資源,如會創建多個線程和多個雙端隊列。

3.涉及的主要類

1.ForkJoinTask

  它提供任務執行中fork和join操作的機制。一般只需要繼承其子類:

RecursiveTask:用於執行有返回結果的任務,這個泛型需要自己傳。

RecursiveAction:用於執行沒有返回結果的任務,其實這個的泛型只不過是Void

2.ForkJoinPool

  ForkJoinTask任務需要在ForkJoinPool中執行。默認的線程池數量等價於CPU數量(我的機子是4)。

3.ForrkjoinWorkerThread

  ForkJoinPool中執行任務的線程。

2.簡單使用

比如一個簡單的使用,實現Forkjoin計算0-100000000000的和

ForkJoinTask類:

  當需要計算的數值大於step的時候繼續拆分任務,否則拆分任務。

package forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * @author: 喬利強
 * @date: 2021/1/7 19:37
 * @description:
 */
public class SumTask extends RecursiveTask<Long> {

    private long start, end;

    private long step = 1000000;

    public SumTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        if (end - start <= step) {
            for (long i = start; i <= end; i++)
                sum += i;
        } else {
            long mid = start + (end - start) / 2;
            SumTask lt = new SumTask(start, mid);
            SumTask rt = new SumTask(mid + 1, end);
            lt.fork();
            rt.fork();
            long leftsum = lt.join();
            long rightsum = rt.join();
            sum = leftsum + rightsum;
        }
        
        return sum;
    }
}

測試類:測試單線程同步執行和Forkjoin框架執行

package forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

/**
 * @author: 喬利強
 * @date: 2021/1/7 19:32
 * @description:
 */
public class SumClient {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 單線程測試
        long startNum = 0;
        long endNum = 100000000000L;
        long sum = 0;
        long startTime = System.currentTimeMillis();
        for (; startNum <= endNum; startNum++) {
            sum += startNum;
        }
        long endTime = System.currentTimeMillis();
        System.out.println("sum: " + sum + ", 用時: " + (endTime - startTime) + "ms");

        // fork-join測試
        startTime = System.currentTimeMillis();
        // 1. 創建線程池
        ForkJoinPool fp = new ForkJoinPool();
        // 2.創建一個任務
        SumTask task = new SumTask(0, endNum);
        // 3. 任務交給線程
        ForkJoinTask<Long> result = fp.submit(task);
        // 4.得到結果
        Long fkResult = result.get();
        endTime = System.currentTimeMillis();
        System.out.println("sum: " + fkResult + ", 用時: " + (endTime - startTime) + "ms");
    }
}

結果:(我在自己的電腦測的時候計算太耗時了就中斷了。我在另一個電腦測的時候單線程大概是40多s、forkjoin大概是20多s)

查看ForkJoinPool的默認構造,如下:可以創建的線程池的大小是CPU數量

    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

補充:並行流parallelStream底層也是用了ForkJoin,並行流適合沒有線程安全問題、較單純的數據處理任務

  parallelStream提供了流的並行處理,它是Stream的另一重要特性,其底層使用Fork/Join框架實現。簡單理解就是多線程異步任務的一種實現。

測試:

        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        numbers.parallelStream().forEach(num -> {
            try {
                Thread.sleep(1 * 100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ">>" + num);
        });

結果: (可以看出打印的順序是亂序證明確實是異步並行執行;打印的線程名字是ForkJoinPool.commonPool-worker-1到3 也證明是采用的ForkJoinPool)

main>>6

ForkJoinPool.commonPool-worker-1>>8

ForkJoinPool.commonPool-worker-2>>3

ForkJoinPool.commonPool-worker-3>>7

main>>5

ForkJoinPool.commonPool-worker-2>>4

ForkJoinPool.commonPool-worker-1>>9

ForkJoinPool.commonPool-worker-3>>2

main>>1

補充:可以通過修改VM參數-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設置worker的數量,測試如下:

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "6");

        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        numbers.parallelStream().forEach(num -> {
            try {
                Thread.sleep(1 * 100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ">>" + num);
        });

結果:

main>>6

ForkJoinPool.commonPool-worker-2>>3

ForkJoinPool.commonPool-worker-1>>8

ForkJoinPool.commonPool-worker-6>>5

ForkJoinPool.commonPool-worker-3>>7

ForkJoinPool.commonPool-worker-5>>9

ForkJoinPool.commonPool-worker-4>>2

main>>4

ForkJoinPool.commonPool-worker-2>>1

補充:ForkJoin不一定就肯定比單線程處理塊,這還要看你單線程串行處理每個任務的耗時情況;ForkJoin本身創建線程以及Fork、Join任務都是需要花費時間的,比如: 

package forkjoin;

import java.util.Arrays;
import java.util.List;

public class SumClient2 {

    public static void main(String[] args) {
        // 修改forkjoin默認的線程池大小(可以修改為比處理器數量多,但是沒必要,一般和處理器數目相同就可以)
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
        // 模擬處理用時
        int handleTime = 2;

        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
        long start = System.currentTimeMillis();
        numbers.parallelStream().forEach(num -> {
            try {
                Thread.sleep(1 * handleTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ">>" + num);
        });
        long end = System.currentTimeMillis();
        System.out.println((end - start) + "ms ===");

        start = System.currentTimeMillis();
        numbers.stream().forEach(num -> {
            try {
                Thread.sleep(1 * handleTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ">>" + num);
        });
        end = System.currentTimeMillis();
        System.out.println((end - start) + "ms ---");
    }
}

結果: 可以看出使用ForkJoinPool反而時間多於單線程,當 handleTime 增大的時候使用ForkJoinPool會快於單線程。

main>>11
ForkJoinPool.commonPool-worker-1>>1
ForkJoinPool.commonPool-worker-8>>13
ForkJoinPool.commonPool-worker-9>>6
ForkJoinPool.commonPool-worker-4>>14
ForkJoinPool.commonPool-worker-10>>5
ForkJoinPool.commonPool-worker-6>>2
ForkJoinPool.commonPool-worker-2>>15
ForkJoinPool.commonPool-worker-13>>16
ForkJoinPool.commonPool-worker-11>>3
ForkJoinPool.commonPool-worker-15>>8
ForkJoinPool.commonPool-worker-14>>7
ForkJoinPool.commonPool-worker-3>>12
ForkJoinPool.commonPool-worker-12>>4
ForkJoinPool.commonPool-worker-5>>10
ForkJoinPool.commonPool-worker-7>>9
103ms ===
main>>1
main>>2
main>>3
main>>4
main>>5
main>>6
main>>7
main>>8
main>>9
main>>10
main>>11
main>>12
main>>13
main>>14
main>>15
main>>16
49ms ---

補充: 並行流parallelStream底層也是用了ForkJoin,而且全局公用一個Pool, 默認大小是CPU數量減去一

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class PlainTest {

    public static void main(String[] args) {
        List<String> fruitsList = new ArrayList<String>(Arrays.asList("Apple", "Orange", "1", "3", "4", "5", "7"));
        List<String> fruitsUnmodifiableList = Collections.unmodifiableList(fruitsList);
        System.out.println(fruitsUnmodifiableList.getClass());
        System.out.println(Runtime.getRuntime().availableProcessors());

        fruitsList.parallelStream().forEach(c -> {
            System.out.println(Thread.currentThread().getName() + "\t" + c);
            try {
                Thread.sleep(10 * 100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

結果:

class java.util.Collections$UnmodifiableRandomAccessList
4
main 4
ForkJoinPool.commonPool-worker-2 7
ForkJoinPool.commonPool-worker-3 Apple
ForkJoinPool.commonPool-worker-1 Orange

查看是用的java.util.concurrent.ForkJoinPool#common, 創建過程如下:java.util.concurrent.ForkJoinPool#makeCommonPool

    /**
     * Creates and returns the common pool, respecting user settings
     * specified via system properties.
     */
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM