Java8的新特性--並行流與串行流


寫在前面

我們都知道,在開發中有時候要想提高程序的效率,可以使用多線程去並行處理。而Java8的速度變快了,這個速度變快的原因中,很重要的一點就是Java8提供了並行方法,它使得我們的程序很容易就能切換成多線程,從而更好的利用CPU資源。

下面我們就來簡單學習一下java8中得並行流與串行流。

並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。

Java8中將並行進行了優化,我們可以很容易的對數據進行並行操作。Stream API 可以聲明性地通過parallel()與sequential()在並行流與順序(串行)流之間進行切換

Fork/Join框架

在說並行流之前呢,我們首先來來接一下這個Fork/Join框架框架。

Java 7開始引入了一種新的Fork/Join線程池,它可以執行一種特殊的任務:把一個大任務拆成多個小任務並行執行。即在必要的情況下,將一個大的任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行join匯總。

Fork/Join結構圖

Fork/Join框架與傳統線程池的區別

傳統的線程池

我們就多線程來說吧,所謂的多線程就是把我們的任務分配到CPU不同的核上(也就是CPU不同的線程上)進行執行,我們以4核CPU為例。如果是傳統線程的話,每個任務都有可能會阻塞,因為每個線程什么時候執行是由CPU時間片給他分配的執行權決定的,當這個時間片用完了以后,CPU會剝奪他的執行權,然后交給其他的線程去執行,這時就有可能出現阻塞的情況。即4核CPU我們可以看成4個線程,有可能其中倆線程中的一個任務阻塞造成后面的任務排隊得不到執行,而另外兩個沒有阻塞的線程,則順利執行完處於空閑狀態了,這種有的線程在阻塞線程里的任務得不到執行,而別的不阻塞的線程空閑沒有任務可以執行的狀態,就造成了CPU資源的浪費,這樣就會大大影響我們程序的執行效率。

Fork/Join框架

是把一個大任務拆分成若干個小任務,然后把這些小任務都壓入到對應的線程中,也就是把這些小任務都壓入到對應的CPU中(默認CPU有幾核就有幾個線程),然后形成一個個的線程隊列。

Fork/Join任務的原理:判斷一個任務是否足夠小,如果是,直接計算,否則,就分拆成幾個小任務分別計算。這個過程可以反復“裂變”成一系列小任務。

Fork/Join框架會將任務分發給線程池中的工作線程。Fork/Join框架的獨特之處在於它使用“工作竊取”(work-stealing)算法。完成自己的工作而處於空閑的工作線程,能夠從其他扔處於忙碌狀態的工作線程中竊取等待執行的任務,每個工作線程都有自己的工作隊列,這是使用雙端隊列(dequeue)來實現的。線程執行任務是從隊列頭部開始執行的,而處於空閑狀態的線程,在竊取別的線程的任務的時候,是從被竊取線程的等待隊列的隊尾開始竊取的。這種情況下,就不會出現空閑的線程浪費CPU資源,因為一旦空閑便會去竊取任務執行。沒有資源浪費,減少了線程的等待時間,所以效率就高,就提升了性能。

下面我們舉個例子:如果要計算一個超大數組的和,最簡單的做法是用一個循環在一個線程內完成。還有一種方法,可以把數組拆成兩部分,分別計算,最后加起來就是最終結果,這樣可以用兩個線程並行執行,如果拆成兩部分還是很大,我們還可以繼續拆,用4個線程並行執行,這種即使用Fork/Join對大數據進行並行求和。

Fork/Join框架的使用

下面我們來寫測試類演示一下:實現數的累加操作,比如說計算1到100億的和。

我們要編寫一個類繼承RecursiveTask類,並重寫compute()方法。

package com.cqq.java8.parallel;

import java.util.concurrent.RecursiveTask;

/**
 * @Description:遞歸進行拆分
 * @date 2021/3/14 7:55
 */
public class ForkJoinCalculate extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    public ForkJoinCalculate(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    //臨界值,當大於臨界值的時候就一直拆分,小於臨界值就不再進行拆分了
    private static final long THREASHOLD = 100000000L;

    //重寫compute方法
    @Override
    protected Long compute() {

        long length = end - start;
        if(length <= THREASHOLD){//到臨界值就不能再拆了
            long sum = 0;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else{//不到臨界值就進行拆分
            long middle = (end + start);
            ForkJoinCalculate left = new ForkJoinCalculate(start,middle);
            //拆分子任務,同時壓入線程隊列
            left.fork();
            ForkJoinCalculate right = new ForkJoinCalculate(middle+1,end);
            right.fork();
            //拆完之后,合並,把fork()之后的結果得一個個合並,即累加總和
            return left.join()+right.join();

        }
    }
}

測試方法

package com.cqq.java8;

import com.cqq.java8.parallel.ForkJoinCalculate;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

/**
 * @Description:
 * @date 2021/3/14 8:17
 */
public class TestForkJoin {
    @Test
    public void test01(){

        //開始時間
        Instant start = Instant.now();
        //ForkJoin的執行需要一個ForkJoinPool的支持
        ForkJoinPool pool = new ForkJoinPool();

        ForkJoinTask<Long> task = new ForkJoinCalculate((long) 0,100000000L);
        Long invoke = pool.invoke(task);
        System.out.println(invoke);
        //結束時間
        Instant end = Instant.now();
        //計算一下時間用  耗時多少
        System.out.println(Duration.between(start,end).toMillis());

    }

    //一個普通for循環即傳統的單線程的測試類 與Fork/Join的執行結果做對比
    @Test
    public void test02(){

        Instant start = Instant.now();
        long sum = 0L;

        for (long i = 0; i < 100000000L; i++) {
            sum += i;
        }
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println(Duration.between(start,end).toMillis());

    }
}

測試結果

類加和 ForkJoin耗時 傳統單線程耗時
1-1億 521 85
1-10億 241 363
1-100億 1103 2431

從測試結果可以看出,當任務量不大時,傳統單線程耗時短,任務達到一定量時ForkJoin的性能就很好了,因為在任務量不大時,拆分任務也要耗時,所以總的執行時間就比較長。說明,多線程也是要在合適的時候用才能提升性能。

Java8中的並行流

在Java 8中我們用的是parallel()方法,對並行流進行了優化。但是實際上底層還是用的Fork/Join框架。

    @Test
    public void test03(){

        Instant start = Instant.now();

        //順序流
        long reduce = LongStream.rangeClosed(0, 100000000L)
                .reduce(0, Long::sum);

        //使用parallel()並行流
        OptionalLong reduce1 = LongStream.rangeClosed(0, 100000000L)
                .parallel()//並行
                .reduce(Long::sum);

        Instant end = Instant.now();
        System.out.println(Duration.between(start,end).toMillis());

    }

Java8 中不僅僅對代碼進行了優化,而且效率也大大提升。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM