流API--使用並行流


這篇博客一起來研究下使用並行流。借組多核處理器並行執行代碼可以顯著提高性能,但是並行編程可能十分復雜且容易出錯,流API提供的好處之一是能夠輕松可靠的並行執行一些操作。請求並行處理流,首先要獲得一個並行流。
獲取一個並行流有2個方法:
1,Collection定義的parallelStream()方法
2,對順序流調用parallel()方法。
一下代碼演示如果獲取一個並行流:
public static void main(String[] args) throws Exception
	{
		List<Integer> list = new ArrayList<>(4);
		list.add(1);
		list.add(2);
		list.add(3);
		list.add(4);
		//直接從集合中獲取並行流
		Stream<Integer> parallelStream = list.parallelStream();
		//先獲取一個順序流,然后在順序流的基礎上獲取一個並行流
		Stream<Integer> stream = list.stream();
		Stream<Integer> parallel = stream.parallel();
		
		
		//Stream中有一個方法可以判斷當前的流是不是並行流,一下代碼輸出全是true,也就是全部都是並行流
		System.out.println(parallelStream.isParallel());
		System.out.println(stream.isParallel());
		System.out.println(parallel.isParallel());
	}


獲取並行流,有2點要注意,
1,對於並行流,只有在環境支持的情況下才可以實現並行處理
2,在一個順序流的基礎上調用parallel()方法,原來的順序流也就變成了並行流了。如果調用該方法的流原來已經就是一個並行流了,那么就直接返回該調用流。
當然我們也可以將一個並行流轉換成一個順序流:在並行流上調用sequential()就可以啦。
public static void main(String[] args) throws Exception
	{
		List<Double> list = new ArrayList<>(4);
		list.add(1.0);
		list.add(2.0);
		list.add(3.0);
		list.add(4.0);


		//獲取一個順序流
		Stream<Double> stream = list.stream();
		System.out.println(stream.isParallel());
		//順序流上轉換成一個並行流
		Stream<Double> parallelStream = stream.parallel();
		Stream<Double> unordered = parallelStream.unordered();//將流里面的元素設置無序
		System.out.println(parallelStream.isParallel());
		//並行流上轉換成一個順序流
		Stream<Double> sequential = parallelStream.sequential();
		System.out.println(sequential.isParallel());
	}

  • 處理並行流
獲得並行流后,如果環境支持並行處理,那么在該流上發生的操作就可以並行執行。區別於順序流,並行流的相關操作發生在不同的線程上。一般來說,應用到並行流上的任何操作都必須是無狀態的,不干預的,並且具有關聯性的。這樣子可以確保在並行流上執行操作得到的結果,和在順序流上執行相同操作得到的結果相同。


上一篇博客中,我們整理到了縮減操作,reduce()方法的第3個方法,就是專門用來指定如何合並並行結果的。
reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)在這個版本中,第三個函數將第二個函數得到的2個值合並起來。


以下代碼使用並行流,計算一個集合中元素的積:
public static void main(String[] args) throws Exception
	{
		List<Integer> list = new ArrayList<>(4);
		list.add(1);
		list.add(2);
		list.add(3);
		list.add(4);
		//直接從集合中獲取並行流,然后執行縮減操作,下面的代碼輸出24
		System.out.println(list.parallelStream().reduce(1, (a, b) -> a * b, (a, b) -> a * b));
	}


處理並行流,有2點要注意:
1,在對並行流做縮減操作時,reduce()函數的第2個參數和第3個參數可以是做相同的操作,也可以是不同的操作,在有些情況下,這2個參數做的操作必須是不同的。來看下面這個例子:
public static void main(String[] args) throws Exception
	{
		List<Double> list = new ArrayList<>(4);
		list.add(1.0);
		list.add(2.0);
		list.add(3.0);
		list.add(4.0);
		//直接從集合中獲取並行流,然后執行縮減操作,下面的代碼輸出24
		//下面的代碼輸出4.898979485566357,這里是順序流
		System.out.println(list.stream().reduce(1.0, (a, b) -> a * Math.sqrt(b)));
		//下面的代碼輸出1.8612097182041991,這里是並行流,正確
		System.out.println(list.parallelStream().reduce(1.0, (a, b) -> a * Math.sqrt(b), (a, b) -> a * b));
		//下面的代碼輸出1.8612097182041991,這里是並行流,錯誤
		System.out.println(list.parallelStream().reduce(1.0, (a, b) -> a * Math.sqrt(b), (a, b) -> a * Math.sqrt(b)));


	}
上面的代碼中,累加器函數將2個元素的平方根相乘,但是合並器則將部分結果相乘,所以這2個函數是不同的,如果累加器函數和合並器函數是同一個函數,這將導致錯誤,因為當合並2個部分結果的時,相乘的是它們的平方根,而不是部分結果自身。值得注意的是,上面對reduce()方法的調用中,如果將流改成順序流,操作將肯定得到正確的結果,所以我們在測試的時候也可以取值順序流操作的結果來作為檢驗標准。


2,在使用並行操作時,關於流還有一點需要注意就是元素的位置。流可以時候有序的,也可以是無序的。一般來說,如果數據源是有序的,那么流也就是有序的。但是,在使用並行流的時候,有時候允許流是無序的這樣子可以死獲得性能上的提升。當並行流無序時,流的每個部分都可以被單獨操作,而不是與其他部分協調。當操作的順序不重要時,可以調用unordered()方法來指定無序行為。
其實有些api本身就是有序的或者說無序的,比如forEach()方法不一定保留並行流的順序,但是在對並行流的每個元素執行操作時希望保留順序的話,可以使用forEachOrdered()方法。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM