描述
我們先看一段使用了並行流的代碼
@Test public void testStream() { List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } System.out.println(list.size()); List<Integer> streamList = new ArrayList<>(); list.parallelStream().forEach(streamList::add); System.out.println(streamList.size()); }
編譯結果:
觀察發現,原來集合中的數據有10000條,但是使用並行流遍歷數據插入到新集合streamList中后,新的集合中只有5746條數據。並且會在多次之后可能會出現數組下標越界異常,顯然這里的代碼是不合邏輯的。
分析
parallelStream中使用的是ForkJobTask。Fork/Join的框架是通過把一個大任務不斷fork成許多子任務,然后多線程執行這些子任務,最后再Join這些子任務得到最終結果。關於分支/合並框架的使用案例可以看我的這篇文章(用分支/合並框架執行並行求和)。從程序上看,就是先將list集合fork成多段,然后多線程添加到streamList的結合中,而streamList是ArrayList類型,它的add方法並不能保證原子性。
ArrayList的add方法源碼如下:
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { ensureCapacityInternal(size + 1); // Increments modCount!! elementData[size++] = e; return true; }
可以看到add方法可以概括為以下兩個步驟
- ensureCapacityInternal(),確認下當前ArrayList中的數組,是否還可以加入新的元素。如果不行,就會再申請一個:int newCapacity = oldCapacity + (oldCapacity >> 1) 大小的數組(這個容量相當於:1 + 1/2 = 1.5倍),然后將數據copy過去。
- elementData[size++] = e:添加元素到elementData數組中。
在並發情況下,如果同時有A、B兩個線程同時執行add,在第一步ensureCapacityInternal校驗數組容量時,A、B線程都發現當前容量還可以添加最有一個元素,不需擴容;因此進入第二步,此時,A線程先執行完,數組容量已滿,然后B線程再對elementData賦值時,就會拋出“ArrayIndexOutOfBoundsException”。
解決方案
第一種:將parallelStream改成stream,或者直接使用foreach處理。這可以通過判斷並發處理真實能帶來多大的好處,做取舍。
@Test public void testStream() { List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } System.out.println(list.size()); List<Integer> streamList = new ArrayList<>(); //list.stream().forEach(streamList::add); list.forEach(streamList::add); System.out.println(streamList.size()); }
第二種:使用resultList =new CopyOnWriteArrayList<>(); 這是個線程安全的類。從源碼上看,CopyOnWriteArrayList在add操作時,通過ReentrantLock進行加鎖,防止並發寫。不給過CopyOnWriteArrayList,每次add操作都是把原數組中的元素拷貝一份到新數組中,然后在新數組中添加新元素,最后再把引用指向新數組。這會導致頻繁的對象創建,況且數組還是需要一塊連續的內存空間,如果有大量add操作,慎用。
@Test public void testStream() { List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } System.out.println(list.size()); List<Integer> streamList = new CopyOnWriteArrayList<>(); list.parallelStream().forEach(streamList::add); System.out.println(streamList.size()); }
第三種:使用包裝類 resultList = Collections.synchronizedList(Arrays.asList());
@Test public void testStream() { List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } System.out.println(list.size()); List<Integer> streamList = Collections.synchronizedList(new ArrayList<>()); list.parallelStream().forEach(streamList::add); System.out.println(streamList.size()); }
總結
在從stream和parallelStream方法中進行選擇時,我們可以考慮以下幾個問題:
1.是否需要並行?
2.任務之間是否是獨立的?是否會引起任何競態條件?
3.結果是否取決於任務的調用順序?
對於問題1,在回答這個問題之前,你需要弄清楚你要解決的問題是什么,數據量有多大,計算的特點是什么?並不是所有的問題都適合使用並發程序來求解,比如當數據量不大時,順序執行往往比並行執行更快。畢竟,准備線程池和其它相關資源也是需要時間的。但是,當任務涉及到I/O操作並且任務之間不互相依賴時,那么並行化就是一個不錯的選擇。通常而言,將這類程序並行化之后,執行速度會提升好幾個等級。
對於問題2,如果任務之間是獨立的,並且代碼中不涉及到對同一個對象的某個狀態或者某個變量的更新操作,那么就表明代碼是可以被並行化的。
對於問題3,由於在並行環境中任務的執行順序是不確定的,因此對於依賴於順序的任務而言,並行化也許不能給出正確的結果。