java8新特性(六):Stream多線程並行數據處理


轉:http://blog.csdn.net/sunjin9418/article/details/53143588

將一個順序執行的流轉變成一個並發的流只要調用 parallel()方法
public static long parallelSum( long n){
     return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum);
}
並行流就是一個把內容分成多個數據塊,並用不不同的線程分別處理每個數據塊的流。最后合並每個數據塊的計算結果。
將一個並發流轉成順序的流只要調用sequential()方法
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
 
這兩個方法可以多次調用, 只有最后一個調用決定這個流是順序的還是並發的。
 
並發流使用的默認線程數等於你機器的處理器核心數。
 
通過這個方法可以修改這個值,這是全局屬性。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
 
並非使用多線程並行流處理數據的性能一定高於單線程順序流的性能,因為性能受到多種因素的影響。
如何高效使用並發流的一些建議:
1. 如果不確定, 就自己測試。
2. 盡量使用基本類型的流  IntStream, LongStream, and DoubleStream
3. 有些操作使用並發流的性能會比順序流的性能更差,比如limit,findFirst , 依賴元素順序的操作在並發流中是極其消耗性能的 。findAny的性能就會好很多,應為不依賴順序。
4. 考慮流中計算的性能(Q)和操作的性能(N)的對比, Q表示單個處理所需的時間, N表示需要處理的數量,如果Q的值越大, 使用並發流的性能就會越高。
5. 數據量不大時使用並發流,性能得不到提升。
6.考慮數據結構:並發流需要對數據進行分解,不同的數據結構被分解的性能時不一樣的。
 
流的數據源和可分解性
可分解性
ArrayList 非常好
LinkedList
IntStream.range 非常好
Stream.iterate
HashSet
TreeSet
 
 
7. 流的特性以及中間操作對流的修改都會對數據對分解性能造成影響。 比如固定大小的流在任務分解的時候就可以平均分配,但是如果有filter操作,那么流就不能預先知道在這個操作后還會剩余多少元素。
 
8. 考慮最終操作的性能:如果最終操作在合並並發流的計算結果時的性能消耗太大,那么使用並發流提升的性能就會得不償失。
 
9.需要理解並發流實現機制:
 
fork/join 框架
 
fork/join框架是jdk1.7引入的,java8的stream多線程並非流的正是以這個框架為基礎的,所以想要深入理解並發流就要學習fork/join框架。
fork/join框架的目的是以遞歸方式將可以並行的任務拆分成更小的任務,然后將每個子任務的結果合並起來生成整體結果。它是ExecutorService接口的一個實現,它把子任務分配線程池(ForkJoinPool)中的工作線程。要把任務提交到這個線程池,必須創建RecursiveTask<R>的一個子類,如果任務不返回結果則是RecursiveAction的子類。
 
fork/join框架流程示意圖:
 
 
廢話不多說,上代碼:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * Created by sunjin on 2016/7/5.
 * 繼承RecursiveTask來創建可以用於分支/合並的框架任務
 */
public class ForkJoinSumCalculator  extends RecursiveTask<Long> {
     //要求和的數組
     private final long[]  numbers;
     //子任務處理的數組開始和終止的位置
     private final int  start;
     private final int  end;
     //不在將任務分解成子任務的閥值大小
     public static final int  THRESHOLD = 10000;

     //用於創建組任務的構造函數
     public ForkJoinSumCalculator( long[] numbers){
         this(numbers, 0, numbers. length);
    }

     //用於遞歸創建子任務的構造函數
     public ForkJoinSumCalculator( long[] numbers, int start, int end){
         this. numbers = numbers;
         this. start = start;
         this. end = end;
    }

     //重寫接口的方法
    @Override
     protected Long compute() {
         //當前任務負責求和的部分的大小
         int length =  end start;
         //如果小於等於閥值就順序執行計算結果
         if(length <=  THRESHOLD){
             return computeSequentially();
        }
         //創建子任務來為數組的前一半求和
        ForkJoinSumCalculator leftTask =  new ForkJoinSumCalculator( numbersstartstart + length/2);
         //將子任務拆分出去,丟到ForkJoinPool線程池異步執行。
        leftTask.fork();
         //創建子任務來為數組的后一半求和
        ForkJoinSumCalculator rightTask =  new ForkJoinSumCalculator( numbersstart + length/2,  end);
         //第二個任務直接使用當前線程計算而不再開啟新的線程。
         long rightResult = rightTask.compute();
         //讀取第一個子任務的結果,如果沒有完成則等待。
         long leftResult = leftTask.join();
         //合並兩個子任務的計算結果
         return rightResult + leftResult;
    }

     //順序執行計算的簡單算法
     private long computeSequentially(){
         long sum = 0;
         for( int i = start; i<  end; i++){
            sum +=  numbers[i];
        }
         return sum;
    }
     //提供給外部使用的入口方法
     public static long forkJoinSum( long n) {
         long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task =  new ForkJoinSumCalculator(numbers);
         return new ForkJoinPool().invoke(task);
    }
}
注意事項:
1. 調用join 方法要等到調用這個方法的線程的自己的任務完成之后。
2. 不要直接去調用ForkJoinPool的invoke方法 ,只需要調用RecursiveTask的fork或者compute。
3. 拆解任務時只需要調用一次fork執行其中一個子任務, 另一個子任務直接利用當前線程計算。應為fork方法只是在ForkJoinPool中計划一個任務。
4.任務拆分的粒度不宜太細,不否得不償失。
 
 
工作盜取
由於各種因素,即便任務拆分是平均的,也不能保證所有子任務能同時執行結束, 大部分情況是某些子任務已經結束, 其他子任務還有很多, 在這個時候就會有很多資源空閑, 所以fork/join框架通過工作盜取機制來保證資源利用最大化, 讓空閑的線程去偷取正在忙碌的線程的任務。
在沒有任務線程中的任務存在一個隊列當中, 線程每次會從頭部獲取一個任務執行,執行完了再從queue的頭部獲取一個任務,直到隊列中的所有任務執行完,這個線程偷取別的線程隊列中的任務時會從隊列到尾部獲取任務,並且執行,直到所有任務執行結束。
從這個角度分析,任務的粒度越小, 資源利用越充分。
 
 
工作盜取示意圖
page222image11664
 
 
可拆分迭代器Spliterator
 
它和Iterator一樣也是用於遍歷數據源中的元素,但是他是為並行執行而設計的。 java8 所有數據結構都實現了 這個接口, 一般情況不需要自己寫實現代碼。但是了解它的實現方式會讓你對並行流的工作原理有更深的了解。(未完待續)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM