java-11-Stream優化並行流


 
並行流    多線程    把一個內容分成多個數據塊  不同線程分別處理每個數據塊的流
 
串行流   單線程  一個線程處理所有數據
 
java8 對並行流優化  StreamAPI 通過parallel()     並行流 
                                                        sequential()   順序流  
 
注意:
使用並行流並不是一定會提高效率,因為jvm對數據進行切片和切換線程也是需要時間的。
所以數據量越小,串行操作越快;數據量越大,並行操作效果越好
 
 
StreamAPI 通過parallel()     並行流   底層是Fork/join 框架
 
Fork/join 框架   將任務分解成  若干小任務(分解到不可分解為止)    小任務的結果 join匯總
 
Fork/Join與傳統線程池的區別!
Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!
 
 
ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有
 
溫馨提示: 電腦性能不好 不要run  很消耗電腦壽命
 1 package com.wf.zhang.java8.stream;
 2 
 3 import java.util.concurrent.RecursiveTask;
 4 
 5 
 6 public class ForkJoinCalculate extends RecursiveTask<Long> {
 7 
 8     private static final long serialVersionUID = 13475679780L;
 9     
10     private long start;
11     private long end;
12     
13     private static final long THRESHOLD = 10000L; //臨界值
14 
15     //計算從start-end之和
16     public ForkJoinCalculate(long start, long end) {
17         this.start = start;
18         this.end = end;
19     }
20     
21     @Override
22     protected Long compute() {
23         long length = end - start;
24         
25         if(length <= THRESHOLD){
26             long sum = 0;
27             
28             for (long i = start; i <= end; i++) {
29                 sum += i;
30             }
31             return sum;
32         }else{
33             long middle = (start + end) / 2;
34             
35             ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
36             left.fork(); //拆分,並將該子任務壓入線程隊列
37             
38             ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end);
39             right.fork();
40             //匯總
41             return left.join() + right.join();
42         }
43         
44     }
45 
46 }
ForkJoinCalculate
 1 package com.wf.zhang.java8.stream;
 2 
 3 import org.junit.Test;
 4 
 5 import java.util.concurrent.ForkJoinPool;
 6 import java.util.concurrent.ForkJoinTask;
 7 import java.util.stream.LongStream;
 8 
 9 public class TestForkJoin {
10 
11     @Test
12     public void test1(){
13         long start = System.currentTimeMillis();
14 
15         ForkJoinPool pool = new ForkJoinPool();
16         ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
17 
18         long sum = pool.invoke(task);
19         System.out.println(sum);
20 
21         long end = System.currentTimeMillis();
22 
23         System.out.println("耗費的時間為: " + (end - start));
24     }
25 
26     //java8 優化並行流
27     @Test
28     public void test3(){
29         long start = System.currentTimeMillis();
30 
31         Long sum = LongStream.rangeClosed(0L, 10000000000L)
32                              .parallel()
33                              .sum();
34 
35         System.out.println(sum);
36 
37         long end = System.currentTimeMillis();
38 
39         System.out.println("耗費的時間為: " + (end - start));
40     }
41 }
TestForkJoin

 

使用java8優化並行流         0L, 10000000000L 相加   
      LongStream.rangeClosed ()    生成連續的Long類型的數   
               parellel()    並行流   底層是Fork/join 框架       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM