Java8采用stream、parallelStream迭代的區別
我們都知道在Java 8 API添加了一個新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數據。Stream 使用一種類似用 SQL 語句從數據庫查詢數據的直觀方式來提供一種對 Java 集合運算和表達的高階抽象。Stream API可以極大提高Java程序員的生產力,讓程序員寫出高效率、干凈、簡潔的代碼。這種風格將要處理的元素集合看作一種流, 流在管道中傳輸, 並且可以在管道的節點上進行處理, 比如篩選, 排序,聚合等。元素流在管道中經過中間操作(intermediate operation)的處理,最后由最終操作(terminal operation)得到前面處理的結果。
通過查看API能夠看到Java8 API為我們提供了Stream和parallelStream兩個不同的方法,那么同樣是流處理,這兩個方法又有什么區別呢?首先我們來看看以下的代碼:
public static void main(String[] args) {
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
System.out.println("運行結果:");
// stream method
numberList.stream().forEach(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEach(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEachOrdered(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
}
通過多次運行上述代碼,我們可以發現,通過parallelStream方法迭代集合,每次輸出的結果都不一樣,而通過steam方法或parallelStream方法並以forEachOrdered方式,每次執行輸出的結果都是一樣的,並且順序符合集合元素的存放順序。
那么,為什么會造成這樣的結果差異呢,難道parallelStram是采用多線程並行的方式運行?於是,我們進一步修改下我們的代碼來驗證一下猜測。
public static void main(String[] args) {
System.out.println("運行結果:");
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
// stream method
numberList.stream().forEach(number -> {
System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEach(number -> {
System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEachOrdered(number -> {
System.out.println(String.format("ParallelStream forEach Ordered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
}
修改后代碼運行結果如下:
運行結果:
Stream The Current Thread's ID is 1 and output number 1
Stream The Current Thread's ID is 1 and output number 2
Stream The Current Thread's ID is 1 and output number 3
Stream The Current Thread's ID is 1 and output number 4
Stream The Current Thread's ID is 1 and output number 5
Stream The Current Thread's ID is 1 and output number 6
Stream The Current Thread's ID is 1 and output number 7
Stream The Current Thread's ID is 1 and output number 8
Stream The Current Thread's ID is 1 and output number 9
ParallelStream The Current Thread's ID is 1 and output number 6
ParallelStream The Current Thread's ID is 19 and output number 9
ParallelStream The Current Thread's ID is 18 and output number 1
ParallelStream The Current Thread's ID is 15 and output number 2
ParallelStream The Current Thread's ID is 17 and output number 4
ParallelStream The Current Thread's ID is 14 and output number 8
ParallelStream The Current Thread's ID is 13 and output number 3
ParallelStream The Current Thread's ID is 16 and output number 7
ParallelStream The Current Thread's ID is 1 and output number 5
ParallelStream forEach Ordered The Current Thread's ID is 15 and output number 1
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 2
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 3
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 4
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 5
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 6
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 7
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 8
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 9
Disconnected from the target VM, address: '127.0.0.1:52976', transport: 'socket'
Process finished with exit code 0
通過上面的運行結果,我們可以看到通過ParallelStream方法迭代的方法,是采用多線程的,可以看過每次輸出都是不同的線程ID,而ParallelStream(). forEach Ordered是在多線程的基礎上,保證了數據的順序輸出。到此,我們驗證了我們的猜測ParallelStream方法是多線程的,而關於線程是否並行的驗證,我們需進一步修改下我們的代碼,於是有了下面的代碼:
public static void main(String[] args) throws InterruptedException {
System.out.println("運行結果:");
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
//for
Long forBegin = System.currentTimeMillis();
for(Integer number : numberList){
//System.out.println(String.format("For The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
Thread.sleep(1000);
}
System.out.println(String.format("For execute time cost %d ms",System.currentTimeMillis()-forBegin));
System.out.println("\r");
// stream method
Long streamBegin = System.currentTimeMillis();
numberList.stream().forEach(number -> {
//System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("Stream execute time cost %d ms",System.currentTimeMillis()-streamBegin));
System.out.println("\r");
// parallelStream method
Long parallelStreamBegin = System.currentTimeMillis();
numberList.parallelStream().forEach(number -> {
//System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("ParallelStream execute time cost %d ms",System.currentTimeMillis()-parallelStreamBegin));
System.out.println("\r");
// parallelStream method
Long parallelStreamForEachOrderBegin = System.currentTimeMillis();
numberList.parallelStream().forEachOrdered(number -> {
//System.out.println(String.format("ParallelStream forEachOrdered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("ParallelStream forEachOrdered execute time cost %d ms",System.currentTimeMillis()-parallelStreamForEachOrderBegin));
System.out.println("\r");
}
這里我們加入了傳統的for循環迭代方式,加入一起比較,由於要體現多線程並行的優勢,這里我們將每次循環里加入線程休眠1秒鍾,運行后的結果如下:
運行結果:
For execute time cost 9032 ms
Stream execute time cost 9079 ms
ParallelStream execute time cost 2011 ms
ParallelStream forEachOrdered execute time cost 9037 ms
通過運行結果,我們可以看到parallelStream().forEach方式耗時最短,而另外其他3種方式運行的耗時都幾乎接近。因此,我們可以斷定我們的猜測是正確的,parallelStream().forEach是通過多線程並行的方式來執行我們的代碼,而parallelStream(). forEachOrdered也是采用多線程,但由於加入了順序執行約束,故程序是采用多線程同步的方式運行的,最終耗時與for、stream兩種單線程執行的耗時接近,但parallelStream(). forEachOrdered由於是多線程,與for、stream兩種單線程的方式相比,優勢在於很好的利用了CPU多核的資源。感興趣的同學可以通過以下代碼查看CPU的核數,並通過jstack dump出堆棧來查看線程對CPU使用的情況。
System.out.println("系統一共有"+Runtime.getRuntime().availableProcessors()+"個cpu");