[翻譯]Java 8 並行流 – 自定義線程池示例


本篇是簡單介紹如何自定義線程池並在Java 8 的Parallel Streams中使用線程池。並舉例介紹如何不使用普通線程池而是使用自定義線程池來調用Parallel streams API。

1. 介紹

在本篇教程中,您將學習如何使用強大的並行流API(在Java8中)創建用於批量數據處理的自定義線程池。

並行流可以在並發環境中運行,這是以多線程開銷為代價的streams性能的改進版本。
本文重點關注Stream API的最大限制並舉例說明如何將並行流與自定義線程池結合使用。

public class CustomPootParallelStreams {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        parallelStreamProcess();
    }

    private static void parallelstreamProcess() throws ExecutionException, InterruptedException {
        int start = 1;
        int end = 10000;
        List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
                .collect(Collectors.toList());
        System.out.println(intList.size());
        ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
        int actualTotal = newCustomThreadPool.submit(
                () -> {
                    int a = intList.stream().reduce(0, Integer::sum).intValue();
                    System.out.println("------" + a);
                    return a;
                }).get();
        System.out.println("actualTotal " + actualTotal);
    }
}

2. Java 8 並行流

首先,讓我們看看如何從一個集合中創建並行流。
為了使一個流可以在多核處理器中運行,你只需要調用parallelStream()方法來創建並行流。

package com.javaprogramto.java8.streams.parallel.streams;
 
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
 
public class ParallelStreamCreation {
    public static void main(String[] args) {
        List<Integer> intList = Arrays.asList(10, 20, 30, 40, 50);
        Stream<Integer> parallelStream = intList.parallelStream();
        parallelStream.forEach(value -> System.out.println(value));
    }
}

Output:
[30
40
50
20
10]

你可以觀察到輸出結果是多核處理器隨機打印出來的。在內部,並行流使用SplitIterator和StreamSupport類使其並行運行。
並行流的默認的處理過程是用 ForkJoinPool.commonPool()來創建的線程池,這樣創建的線程池會被整個應用所共享。如果你同時運行大量的並行流,則可能會看到處理時間的性能和延遲。

3. 使用自定義線程池

上面的操作並行流將會使用普通的ForkJoinPool 線程池。
如果你有許多並行流需要同時運行並且其中一些並行流可能會因為網絡延遲導致處理時長超出預期,並且這些任務可能會阻塞由公共線程池創建的進程。因此,它會導致任務的速度變慢,需要更長的時間來執行。

在這些情況下,最好使用並行流組合的自定義線程池。

看看下面的例子,我們使用ForkJoinPool 創建了5個線程並且在線程創建了一個並行流,以查找給定范圍內所有數字的總和。

package com.javaprogramto.java8.streams.parallel.streams;
 
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
 
public class CustomPoolParallelStreams {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        parallelStreamProcess();
    }
 
    private static void parallelStreamProcess() throws ExecutionException, InterruptedException {
 
        int start = 1;
        int end = 10000;
 
        List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
                .collect(Collectors.toList());
        System.out.println(intList.size());
 
        ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
        int actualTotal = newCustomThreadPool.submit(
                () -> {
                     int a = intList.stream().parallel().reduce(0, Integer::sum).intValue();
                     return a;
                }).get();
 
        System.out.println("actualTotal " + actualTotal);
 
    }
}

Output:
[10000
actualTotal 50005000]

實際上,上面的程序並沒有給出有效的解決方案,不過我看到很多網站都在討論這個解決方案。事實上,這段代碼在ForkJoinPool中創建了一個並行流,在線程內部再次使用ForkJoinPool區域的公共池中的線程。

因此,如果您正在運行多個並行流,那不要使用這個Steam API的並行方法,因為這可能會減慢其他流的速度,從而用更多的時間給出結果。

在這段程序中,我們將線程池計數設為5,當然你可以根據你的CPU配置進行更改。如果你有更多的任務,那么你可以根據其他任務進行微調。

如果你只有一個並行流,那么你可以使用一個固定線程個數的線程池。

如果上述都不能滿足,請等待Java的更新,並行流可以將ForkJoinPool作為輸入來限制並行進程的數量

4. 結論

在本文中,您已經看到了如何在JavaStreamAPI中創建並行流,並行流API使用來自ForkJoinPool的公共共享線程池。但是,這是所有其他並行線程共享的,因此最好避免使用並行流,但是您可以使用第二種方法限制線程的數量。而且你還必須考慮使用第二種方法也有一些缺點。

只需等待官方oracle的新並行流api。

本文中顯示的所有代碼都是通過GitHub實現的。

您可以直接下載項目,並且可以在本地運行,無任何錯誤。

在Github上查看

下載

如果你有其他的見解,請在評論區留言談論。

原文鏈接:Java 8 Parallel Streams - Custom Thread Pools Examples | Java Code Geeks - 2021


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM