並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。穿行流則相反,並行流的底層其實就是ForkJoin框架的一個實現。
那么先了解一下ForkJoin框架吧。
Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join匯總。
Fork/Join與傳統線程池的區別!
Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!
我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有
package ForkJionP; import java.util.concurrent.RecursiveTask; public class ForkJoinWork extends RecursiveTask<Long> { private Long start;//起始值 private Long end;//結束值 public static final Long critical = 100000L;//臨界值 public ForkJoinWork(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { //判斷是否是拆分完畢 Long lenth = end - start; if(lenth<=critical){ //如果拆分完畢就相加 Long sum = 0L; for (Long i = start;i<=end;i++){ sum += i; } return sum; }else { //沒有拆分完畢就開始拆分 Long middle = (end + start)/2;//計算的兩個值的中間值 ForkJoinWork right = new ForkJoinWork(start,middle); right.fork();//拆分,並壓入線程隊列 ForkJoinWork left = new ForkJoinWork(middle+1,end); left.fork();//拆分,並壓入線程隊列 //合並 return right.join() + left.join(); } } }
測試:
package ForkJionP; import org.junit.Test; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class ForkJoinWorkDemo { public void test() {
//ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L,10000000000L);//參數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 76474 } public void test2(){
//普通線程實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x+=i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 160939 } @Test public void test3(){
//Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 15531 } }
我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當數據很大的時候ForkJoin比普通線程實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快
Java 8 中將並行流進行了優化,我們可以很容易的對數據進行並行流的操作,Stream API可以聲明性的通過parallel()與sequential()在並行流與穿行流中隨意切換!