在我們開發過程中,我們都知道想要提高程序效率,我們可以啟用多線程去並行處理,而java8中對數據處理也提供了它得並行方法,今天就來簡單學習一下java8中得並行流與順序流。
並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。
Java8中將並行流進行了優化,我們可以很容易的對數據進行並行操作。Stream API可以聲明性地通過parallel()與scqucntial()在並行流與順序流之間進行切換。
一、Fork—Join框架
Fork—Join框架:是java7提供得一個用於執行任務得框架,就是在必要得情況下,將一個大任務,進行拆分(Fork)成若干個小任務(拆分到不能再拆分),再將一個個的小任務運算得結果進行join匯總。
Fork—Join框架時ExecutorService接口得一種具體實現,目的是為了幫助更好地利用多處理器帶來得好處。它是為那些能夠被遞歸地拆分成子任務的工作類型量身設計的。起目的在於能夠使用所有有可用的運算能力來提升你的應用的性能。

Fork—Join框架會將任務分發給線程池中的工作線程。Fork—Join框架的獨特之處在與它使用工作竊取(work-stealing)算法。完成自己的工作而處於空閑的工作線程能夠從其他仍處於忙碌(busy)狀態的工作線程中竊取等待任務執行,每個工作線程都有自己的工作隊列,這是使用雙端隊列(deque)來實現的。當一個任務划分一個新線程時,它將自己推到deque的頭部。當線程的任務隊列為空,它將嘗試從另一個線程的deque的尾部竊取另一個任務。
下面,我們來寫一個簡單的實例來演示一下:
1 /** 2 * 要想使用Fark—Join,類必須繼承RecursiveAction(無返回值) 或者 3 * RecursiveTask(有返回值) 4 * @author Wuyouxin 5 * 6 */ 7 public class ForkJoin extends RecursiveTask<Long>{ 8 9 /** 10 * 序列化 11 */ 12 private static final long serialVersionUID = -645248615909548422L; 13 14 private long start; 15 private long end; 16 17 public ForkJoin(long start, long end) { 18 this.start = start; 19 this.end = end; 20 } 21 22 private static final long THRESHOLD = 10000L; 23 /** 24 * 重寫方法 25 */ 26 @Override 27 protected Long compute() { 28 if (end - start <= THRESHOLD) { 29 long sum = 0; 30 for (long i = start; i < end; i++) { 31 sum += i; 32 } 33 return sum; 34 } else { 35 long middle = (end -start)/2; 36 ForkJoin left = new ForkJoin(start, middle); 37 //拆分子任務,壓入線程隊列 38 left.fork(); 39 ForkJoin right = new ForkJoin(middle, end); 40 right.fork(); 41 42 //合並並返回 43 return left.join() + right.join(); 44 } 45 } 46 47 }
1 /** 2 * 實現數的累加 3 */ 4 @Test 5 public void test1() { 6 //開始時間 7 Instant start = Instant.now(); 8 9 //這里需要一個線程池的支持 10 ForkJoinPool pool = new ForkJoinPool(); 11 12 ForkJoinTask<Long> task = new ForkJoin(0, 10000000000L); 13 14 long sum = pool.invoke(task); 15 16 //結束時間 17 Instant end = Instant.now(); 18 System.out.println(Duration.between(start, end).getSeconds()); 19 }
二、java8 並行流
java8 中 對並行流做了優化,簡化了許多,我們繼續以累加來寫個例子。
1 /** 2 * java8 並行流 parallel() 3 */ 4 @Test 5 public void test2() { 6 //開始時間 7 Instant start = Instant.now(); 8 9 LongStream.rangeClosed(0, 10000000000L).parallel() 10 .reduce(0, Long :: sum); 11 12 //結束時間 13 Instant end = Instant.now(); 14 System.out.println(Duration.between(start, end).getSeconds()); 15 }
java8 中不僅僅對代碼進行了優化,而且效率也大大提升。
