Java 8 (6) Stream 流 - 並行數據處理與性能


在Java 7之前,並行處理集合非常麻煩。首先你要明確的把包含數據的數據結構分成若干子部分,然后你要把每個子部分分配一個獨立的線程。然后,你需要在恰當的時候對他們進行同步來避免競爭,等待所有線程完成。最后,把這些部分結果合並起來。Java 7中引入了一個叫做 分支/合並的框架,讓這些操作更穩定,更不容易出錯。

並行流

  使用Stream接口可以方便的處理它的元素,可以通過對收集源調用parallelStream方法來把集合轉換為並行流。並行流就是一個把內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。這樣就可以把給定操作的工作負荷分配給多核處理器的所有內核,讓它們都忙起來。

例如求和:1到10000之間的和。

    //求和
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }

這段代碼等價於傳統Java:

    //求和
    public static long getSum(long n){
        long sum = 0;
        for(long i = 1L;i<=10000;i++){
            sum += i;
        }
        return sum;
    }

將順序流轉換為並行流

  只需要對順序流調用parallel方法即可轉換為並行流:

    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }

這段代碼在內部將Stream分成了幾塊,因此可以對不同的塊獨立進行歸納操作。最后,同一個歸納操作會將各個子流的部分歸納結果合並起來,得到整個原始流結果。

對順序流執行parallel方法並不意味着流本身有任何實際的變化,它內部就是一個布爾值,表示parallel之后進行的操作都並行執行,只需要對並行流調用sequential方法就可以變回順序流。這兩個方法可以結合起來,在需要並行的時候並行,需要串行的時候串行。

    Stream.parallel()
          .filter(...)
          .sequential()
          .map(...)
          .parallel()
          .reduce();

但是 最后一次parallel或sequential調用會影響整個流水線,上面的例子流水線會並行執行,因為最后調用的是它。

 

並行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().availableProcessors()得到的,可以通過系統屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池的大小,例如:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")。這是一個全局屬性,意味着所有的並行操作都會受影響。一般不建議修改它。

 

對這三個方法進行測量:

編寫一個測量方法,這個方法接受一個函數和一個long參數,他會對傳給方法的long應用函數10次,記錄每次執行的時間(毫秒),並返回最短的一次執行時間:

    public static long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

 

    //iterate
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }
    //iterate Parallel
    public static long getSumParallel(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }
    //Java Old
    public static long getSumOldJava(long n){
        long sum = 0;
        for(int i = 0;i<=n;i++){
            sum += i;
        }
        return sum;
    }
System.out.println(measureSumPerf(Main::getSum,10000000)); //105
System.out.println(measureSumPerf(Main::getSumParallel,10000000)); //147
System.out.println(measureSumPerf(Main::getSumOldJava,10000000)); //5

用傳統for循環的方式是最快的,因為它更為底層,更重要的是不需要對原始類型進行任何裝箱或拆箱操作。他才5毫秒即可完成。

順序化執行結果為105毫秒,

用並行化進行測試,結果居然是最慢的 147毫秒,因為iterate生成的是裝箱的對象,必須拆箱成數字才能求和,並且我們很難把iterate分成多個獨立塊來進行並行執行。

這意味着 並行化編程可能很復雜,如果用的不對,它甚至會讓程序的整體性能更差。

LongStream.rangeClosed方法與iterate相比有兩個優點:

1.LongStream.rangeClosed直接產生原始類型的long數字,沒有裝箱和拆箱。

2.LongStream.rangeClosed會生成數字范圍,很容易拆分為獨立的小塊。

    //5
    public static long GetRangeClosedSum(long n){
        return LongStream.rangeClosed(1,n).reduce(0L,Long::sum);
    }

順序化的LongStream.rangeClosed 只花費了5毫秒,他比iterate順序化要快得多,因為他沒有裝箱和拆箱。再來看看並行化:

    //1
    public static long GetRangeClosedSumParallel(long n){
        return LongStream.rangeClosed(1,n).parallel().reduce(0L,Long::sum);
    }

LongStream.rangeClosed 調用parallel方法后,執行只使用了1毫秒,終於可以像上面圖中一樣並行了,並行化過程本身需要對流做遞歸划分,把每個子流的歸納操作分配到不同的線程,然后把這些操作的結果合並成一個值。

 

正確使用並行流
  錯用並行流而產生錯誤的首要原因,就是使用的算法改變了某些共享狀態,例如 另一種實現對n個自然數求和的方法,但這會改變一個共享累加器:

public class Accumulator {
    public long total = 0;
    public void add(long value){
        total += value;
    }
}
    public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).forEach(Accumulator::add);
        return accumulator.total;
    }

這種代碼本質就是順序的,每次訪問total都會出現數據競爭。如果你嘗試用同步來修復,那就完全失去並行的意義了。我們試着在forEach前加入parallel方法使其並行化:

public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).parallel().forEach(Accumulator::add);
        return accumulator.total;
    }

調用上面的測試方法:

System.out.println(measureSumPerf(Main::sideEffectSum,10000000));
Result: 10140890625203
Result: 9544849565325
Result: 6438093946815
Result: 11805543046590
Result: 6658954367405
Result: 4642751863823
Result: 5948081550315
Result: 7219270279482
Result: 7258008360508
Result: 4898539133022
1

性能無關緊要了,因為結果都是錯誤的,每次執行都會返回不同的結果,都離正確值差很遠。這是由於多個線程在同時訪問累加器,執行total+=value;foreach中調用的方法會改變多個線程共享對象的可變狀態。 共享可變狀態會影響並行流以及並行計算。

 

如何使用並行流

  1.測量,把順序流轉換成並行流很容易,但不一定性能會提升。並行流不一定總是比順序流快,所以使用並行流時對其和順序流進行測量。

  2.留意裝箱。自動裝箱和拆箱操作會大大降低性能,Java 8中又原始類型流(IntStream、LongStream、DoubleStream)來避免這些操作。

  3.有些操作本身在並行流上的性能就比順序流差。特別是limit何findFirst等依賴於元素順序的操作,他們在並行流上執行的代價非常大。例如,findAny會比findFrist性能好,因為它不一定要按照順序來執行。

  4.對於小數據量,不建議使用並行流。

  5.要考慮流背后的數據結構是否易於分解。例如,ArrayList的拆分效率比LinkedList高的多,因為ArrayList用不着遍歷就可以拆分,而LinkedList必須遍歷。另外,用range方法創建的原始類型流也可以快速分解。

 

分支/合並框架

  分支/合並框架的目的是以遞歸的方式將可以並行的任務拆分成更小的任務,然后將每個子任務結果合並起來成為整體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(ForkJoinPool)中的工作線程。

1.使用RecursiveTask

  要把任務提交到這個池,必須創建RecursiveTask<R>的一個子類,其中R是並行化任務(以及所有子任務)產生的結果類型,或者如果任務不返回結果,則是RecursiveAction類型(它可能會更新其他非局部機構)。要定義RecursiveTask,只需實現它唯一的抽象方法compute,這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。類似以下偽代碼:

        if(任務足夠小或不可分) {
            順序計算該任務
        }else{
            將任務拆分為兩個子任務 
            遞歸調用本方法,拆分每個子任務,等待所有子任務完成 
            合並每個子任務的結果
        }

一般來說沒有確切的標准來絕對一個任務是否可以被拆分,但是有幾種試探方法可以查看是否可以拆分。

 以前面的求和例子為基礎,我們試着用這個框架為一個數字范圍long[]數組求和,首選需要為RecursiveTask類做一個實現,ForkJoinSumCalculator

public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {

    //要求和的數組
    private final long[] numbers;
    //子任務處理的數組的開始位置。
    private final int start;
    //子任務處理的數組的終止位置
    private final int end;
    //不可將任務分解為子任務的數組大小
    public static final long THRESHOLD = 10000;

    //共用構造函數用於創建主任務
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    //私有構造方法用於以遞歸方式為主任務創建子任務
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //該任務負責求和的部分的大小
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially(); //如果大小小於或等於閥值,順序計算結果
        }
        //創建一個子任務來為數組的前一半進行求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        //利用另一個ForkJoinPool線程異步執行新創建的子任務
        leftTask.fork();
        //創建一個任務為數組的后一半進行求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);

        //同步執行第二個子任務,有可能允許進一步遞歸划分
        Long rightResult = rightTask.compute();
        //讀取第一個子任務的結果,如果尚未完成就等待
        Long leftResult = leftTask.join();
        //合並兩個任務的結果
        return leftResult + rightResult;
    }

    //在子任務不可再分時計算結果
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

現在就可以通過調用構造函數來求和了:

        long[] numbers = LongStream.rangeClosed(1,10000000).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        System.out.println(new ForkJoinPool().invoke(task));

這里使用LongStream.rangeClosed生成了一個long 數組,然后創建了一個ForkJoinTask的父類,並把數組傳遞給ForkJoinSumCalculator的公共構造函數,最后創建愛你了一個新的ForkJoinPool,並把任務傳給它調用方法。在ForkJoinPool中執行時,最后一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。

在實際應用時,使用多個ForkJoinPool是沒有意義的,一般來說把它實例化一次,然后把實力保存在靜態字段中,使之成為單例。這樣就可以在任何地方方便的重用了。

運行ForkJoinSumCalculator

  當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個線程執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,如果不夠小則會把要求和的數組分成兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。因此,這一過程可以遞歸重復,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件。這時會順序計算每個任務的結果,然后由分支過程創建的任務二叉樹遍歷回到它的根。接下來回合並每個子任務的部分結果,從而得到總任務的結果。

System.out.println(measureSumPerf(Main::forkJoinSum,10000000)); // 79

執行速度為79毫秒,是因為必須先把整個數字流都放進一個long數組,之后才能在ForkJoinSumCalculator任務中使用它。

 

使用分支/合並框架的最佳做法

1.對一個任務調用join方法會阻塞調用方,直到該任務做出結果。因此,有必要在兩個字任務的計算都開始后再調用它。否則,你得到的版本會比原始的順序算法更慢更復雜,因為每個子任務都必須等待另一個子任務完成才能啟動。

2.不應該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啟動並行計算。

3.對於子任務調用fork方法可以把它排進ForkJoinPool。同時對左邊和右邊的子任務調用它似乎很自然,但這樣的效率要比直接對其中一個調用compute低。這樣做你可以為其中一個子任務重用同一個線程,從而避免在線程池中多分配一個任務造成的開銷。

4.對於分支/合並拆分策略你必須選擇一個標准,來決定任務是要進一步拆分還是已到可以順序求值。

 

工作竊取

  在ForkJoinSumCalculator的例子中,設置的閥值是10000,就是在數組為10000時就不會再創建子任務了。在測試案例中,我們先有了一個1000萬的數組,意味着ForkJoinSumCalculator至少會分出1000個子任務來。分出大量的小任務一般來說是一個好的選擇,理想的情況下,划分並行任務時,應該讓每個任務都用完全相同時間完成,讓所有的CPU都同樣繁忙,但在實際中,每個子任務所花費的時間可能天差地別,要么是因為划分策略效率低,要么是有不可預知的原因,比如磁盤訪問慢,或是需要和外部服務協調執行。

  分支/合並框架工程用一種稱為工作竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味着這些任務差不多被平均分配到ForkJoinPool中的所有線程上。每個線程都為分配給它的任務保存一個雙向鏈式隊列,沒完成一個任務,他就會從隊列頭上取出下一個任務開始執行。基於前面所述的原因,某個線程可能早早完成了分配給它的任務,也就是它的隊列已經空了,而其他的線程還很忙。這時,這個線程沒有閑下來,而是隨機選了一個別的線程,從隊列的尾巴上偷走一個任務。這個過程一直繼續下去,直到所有的任務都執行完畢,所有隊列都清空。這就是為什么划成許多小任務而不是少數幾個大任務,這樣有助於更好地在工作線程之間平衡負載。

現在應該清楚 流 是如何使用 分支/合並 框架來並行處理它的項目了。本例中我們明確指定了將數組拆分成多少個任務的邏輯。但是,使用並行流時就不用這么做了,有一種機制來為你拆分流。

 

Spliterator

  Spliterator是Java 8 中加入的另一個新接口,這個名字代表“可分迭代器”,和Iterator一樣,Spliterator也用於遍歷數據源中的元素,但它是為了並行執行而設計的。Java 8已經為集合框架中包含了所有數據結構提供了一個默認的Spliterator實現。 集合實現了Spliterator接口,接口提供了一個spliterator方法。這個接口定義了若干方法:

public interface Spliterator<T> {
    
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

}

T是Spliterator遍歷的元素類型,tryAdvance方法的行為類似於普通的Iterator,因為它會按順序一個個地使用Spliterator中的元素,並且如果還有其他元素要遍歷就返回true。但trySplit是專為Spliterator接口設計的,因為它可以把一些元素划出去分給第二個Spliterator,讓它們兩個並行處理。Spliterator還可通過estimateSize,讓它們兩個並行處理。Spliterator還可通過estimateSize方法估計還剩下多少元素要遍歷,因為即使不那么確切,能快速算出來是一個值也有助於讓拆分均勻一點。

拆分過程

  將Stream拆分成多個部分的算法是一個遞歸過程,如圖所示,第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trySplit直到它返回null,表明它處理的數據結構不能再分割,第三部,這個遞歸拆分過程到第四步就終止了,這時所有的Spliterator在調用trySplit時都返回了null。

這個拆分過程也受Spliterator本身特性影響,而特性是通過characteristics方法聲明的。

Spliterator的特性

Spliterator接口聲明的最后一個抽象方法是characteristics,它返回一個int,代表Spliterator本身特性集的編碼。

  ORDERED : 元素有既定的順序,因此Spliterator在遍歷和划分時也遵循這一點

  DISTINCT : 對於任意一對遍歷過的元素x和y,x.equals(y) 返回false

  SORTED : 遍歷的元素按照一個預定義的順序排序

  SIZED : 該Spliterator由一個已知大小的源建立,因此estimatedSize()返回的是 准確值

  NONNULL : 保證遍歷的元素不會為null

  IMMUTABLE: Spliterator的數據源不能修改,這意味着在遍歷時不能添加、刪除、修改任何元素

  CONCURRENT : 該Spliterator的數據源可以被其他線程同時修改而無需同步

  SUBSIZED : 該Spliterator和所有從它拆分出來的Spliterator都是SIZED

 

小結:

  1.內部迭代讓你可以並行處理一個流,而無需在代碼中顯示使用和協調不同的線程。

  2.雖然並行處理一個流很容易,卻不能保證程序在所有情況下都運行的更快。因此一定要測量,確保你並沒有把程序拖的更慢。

  3.像並行流那樣昂對一個數據集並行執行操作可以提升性能,特別是要處理的元素數量龐大,或處理單個元素特別耗時時。

  4.盡量使用原始特化流,來避免裝箱和拆箱操作。

  5.分支/合並框架讓你得以用遞歸方式將可以並行的任務拆分成功效的任務,在不同的線程上執行,然后將各個子任務的結果合並起來生成整體結果。

  6.Spliterator定義了並行流如何拆分它要遍歷的數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM