並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。
Java8中將並行進行了優化,我們可以很容易的對數據進行並行操作。Stream API可以聲明性地通過parallel()和sequential()在並行流和順序流之間進行切換。
在了解並行流之前,我們首先需要了解Fork/Join框架
Fork/Join框架
Fork/Join框架:在必要的情況下,將一個大任務進行拆分(fork)成若干個小任務(拆到不可在拆時),在將一個個的小任務運算的結果進行匯總(join)。
Fork/Join 框架與傳統線程池的區別
采用“工作竊取”模式(work-stealing):
當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨機線程的隊列中偷一個並把它放在自己的隊列中。
相對於一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由於某些原因無法繼續運行,那么該線程會處於等待狀態.而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續運行.那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行.這種方式減少了線程的等待時間,提高了性能。
測試代碼
public class ForkJoinCalculate extends RecursiveTask<Long> {
public static void main(String[] args) {
// java8之前
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinCalculate(0, 1000000000L);
Long sum = pool.invoke(task);
// java8並行流
LongStream.rangeClosed(0, 1000000000L)
.parallel()
.reduce(0, Long::sum);
}
// 處理任務的起始值
private long start;
// 處理任務的終止值
private long end;
// 被拆分后的最小單位
private static final long THRESHOLD = 10000;
public ForkJoinCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
// 到達臨界值
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i ++) {
sum += i;
}
return sum;
}
// 沒有達到臨界值
else {
long middle = (start + end) / 2;
ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
// 拆分子任務,同時壓入線程隊列
left.fork();
ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
// 拆分子任務,同時壓入線程隊列
right.fork();
return left.join() + right.join();
}
}
}