Java--8--新特性--串並行流與ForkJoin框架


並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。穿行流則相反,並行流的底層其實就是ForkJoin框架的一個實現。

那么先了解一下ForkJoin框架吧。

Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join匯總。

Fork/Join與傳統線程池的區別!

Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。

就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!

我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有

package ForkJionP;


import java.util.concurrent.RecursiveTask;

public class ForkJoinWork extends RecursiveTask<Long> {

    private Long start;//起始值
    private Long end;//結束值
    public static final  Long critical = 100000L;//臨界值

    public ForkJoinWork(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //判斷是否是拆分完畢
        Long lenth = end - start;
        if(lenth<=critical){
            //如果拆分完畢就相加
            Long sum = 0L;
            for (Long i = start;i<=end;i++){
                sum += i;
            }
            return sum;
        }else {
            //沒有拆分完畢就開始拆分
            Long middle = (end + start)/2;//計算的兩個值的中間值
            ForkJoinWork right = new ForkJoinWork(start,middle);
            right.fork();//拆分,並壓入線程隊列
            ForkJoinWork left = new ForkJoinWork(middle+1,end);
            left.fork();//拆分,並壓入線程隊列

            //合並
            return right.join() + left.join();
        }
    }
}

 

測試:

package ForkJionP;

import org.junit.Test;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class ForkJoinWorkDemo {

    public  void test() {
    //ForkJoin實現
long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L,10000000000L);//參數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 76474 } public void test2(){
  //普通線程實現 Long x
= 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x+=i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 160939 } @Test public void test3(){
   //Java 8 並行流的實現
long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce+" time: " + (l1-l)); //invoke = -5340232216128654848 time: 15531 } }

 我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當數據很大的時候ForkJoin比普通線程實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快

Java 8 中將並行流進行了優化,我們可以很容易的對數據進行並行流的操作,Stream API可以聲明性的通過parallel()與sequential()在並行流與穿行流中隨意切換!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM