一、並行流概念:
並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。
java8中將並行進行了優化,我們可以很容易的對數據進行並行操作。Stream API可以聲明性的通過parallel()與sequential()在並行流與順序流之間進行切換。
二、Fork/Join 框架
就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 匯總。
Fork/Join框架與傳統線程池的區別:
采用 “工作竊取”模式(work-stealing):當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨機線程的隊列中偷一個並把它放在自己的隊列中。
相對於一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由於某些原因無法繼續運行,那么該線程會處於等待狀態,
而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續運行.那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行.這種方式減少了線程的等待時間,提高了性能。
Fork/Join實現例子
1、使用傳統forkJoin實現
//計算從start-end之和 public class ForkJoinCalculate extends RecursiveTask<Long>{ /** * */ private static final long serialVersionUID = 13475679780L; private long start; private long end; private static final long THRESHOLD = 10000L; //臨界值 public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= THRESHOLD){ long sum = 0; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ long middle = (start + end) / 2; ForkJoinCalculate left = new ForkJoinCalculate(start, middle); left.fork(); //拆分,並將該子任務壓入線程隊列 ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end); right.fork(); //匯總 return left.join() + right.join(); } } }
public void test1(){ long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L); long sum = pool.invoke(task); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗費的時間為: " + (end - start)); //112-1953-1988-2654-2647-20663-113808 }
2、使用java8並行流實現
@Test public void test3(){ long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L, 10000000000L) .parallel() .sum(); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗費的時間為: " + (end - start)); //2061-2053-2086-18926 }