[零]java8 函數式編程入門官方文檔中文版 java.util.stream 中文版 流處理的相關概念


前言

本文為java.util.stream 包文檔的譯文
極其個別部分可能為了更好理解,陳述略有改動,與原文幾乎一致
原文可參考在線API文檔
image_5b7910f0_2991

Package java.util.stream Description

一些用於支持流上函數式操作的類 ,例如在集合上的map-reduce轉換。例如
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
此處,我們使用widgets, 他是一個 Collection<Widget>, 作為一個流的源,
然后在流上執行一個filter-map-reduce 來獲得紅色widgets重量的總和。(總和是一個歸約(reduce)操作的例子)
 
這個包中引入的關鍵抽象是流。
類 Stream、IntStream、LongStream和DoubleStream分別是在對象Object和基本類型int、long和double類型上的流。
流與集合的不同有以下幾點:
  • 不存儲數據    流不是存儲元素的數據結構;相反,它通過一個哥哥計算操作組合而成的管道,從一個數據源,如數據結構、數組、生成器函數或i/o通道  來傳遞元素 
  • 函數特性       一個流上的操作產生一個結果,但是不會修改它的源。例如,過濾集合 獲得的流會產生一個沒有被過濾元素的新流,而不是從源集合中刪除元素
  • 延遲搜索        許多流操作,如過濾、映射或重復刪除,都可以延遲實現,從而提供出優化的機會。 
  •                      例如,“找到帶有三個連續元音的第一個字符串”不需要檢查所有的輸入字符串。
  •                      流操作分為中間(流生成)操作和終端(值或副作用生成)操作。許多的中間操作, 如filter,map等,都是延遲執行。
  •                      中間操作總是惰性的的。
  • Stream可能是無限的   雖然集合的大小是有限的,但流不需要。諸如limit(n)或findFirst()這樣的短路操作可以允許在有限時間內完成無限流的計算。 
  • 消耗的          流的元素只在流的生命周期中訪問一次。就像迭代器一樣,必須生成一個新的流來重新訪問源的相同元素 
 
流可以通過多種方式進行獲得,比如
  • Collection 提供的stream   parallelStream  
  • 從數組 Arrays.stream(Object[]) 靜態方法 
  • Stream類的靜態工廠方法 比如  Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator)   Stream.generate   
  • BufferedReader.lines(); 文件行
  • 獲取文件路徑的流: Files類的find(), lines(), list(), walk();
  • Random.ints()  隨機數流
  • JDK中的許多其他流載方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
 
還可以從第三方類庫的提供中創建其他一些流 ,詳見 Low-level stream construction

Stream operations and pipelines流操作以及管道

流操作被划分為中間和終端操作,通過流管道組合起來。
  1. 一條流管道由一個源(如一個集合、一個數組、一個生成器函數或一個i/o通道)組成;
  2. 然后是零個或更多的中間操作,例如stream.filter 或者stream.map;
  3. 還有一個終端操作,如stream.forEach或Stream.reduce
中間操作返回一條新流,他們總是惰性的;
執行諸如filter()之類的中間操作實際上並不會立即執行任何過濾操作,而是創建了一個新流,當遍歷時,它包含與給定謂詞相匹配的初始流的元素。直到管道的終端操作被執行,管道源的遍歷才會開始
 
終端操作,例如Stream.forEach 和 IntStream.sum,可以遍歷流以產生結果或副作用。
在執行終端操作之后,流管道被認為是被消耗掉的,並且不能再被使用;
如果您需要再次遍歷相同的數據源,您必須重新從數據源獲得一條新流
在幾乎所有情況下,終端操作都很迫切,在返回之前完成了數據源的遍歷和管道的處理。只有終端操作iterator() 和 spliterator() 不是;
這些都是作為一個“逃生艙口”提供的,以便在現有操作不足以完成任務的情況下,啟用任意客戶控制的管道遍歷
 
延遲處理流可以顯著提高效率;
在像上面的filer-map-sum例子這樣的管道中,過濾、映射和求和可以被融合到數據的單個傳遞中,並且具有最小的中間狀態。
惰性還允許在沒有必要的情況下避免檢查所有數據;對於諸如“查找第一個超過1000個字符的字符串”這樣的操作,只需要檢查足夠的字符串,就可以找到具有所需特征的字符串,而不需要檢查源的所有字符串。(當輸入流是無限的而不僅僅是大的時候,這種行為就變得更加重要了。)
 
中間操作被進一步划分為無狀態和有狀態操作。
無狀態操作,如filter和map,在處理新元素時不保留以前處理的元素的狀態——每個元素都可以獨立於其他元素的操作處理。
有狀態的操作,例如distinct和sorted,則需要考慮從先前看到處理的元素中合並狀態。
 
有狀態操作可能需要在產生結果之前處理整個輸入。
例如,直到一個人看到了流的所有元素之前  他沒辦法完成對流的排序
因此,在並行計算下,一些包含有狀態中間操作的管道可能需要對數據進行多次傳遞,或者可能需要緩沖重要數據。
包含完全無狀態的中間操作的管道可以在單次傳遞過程中進行處理,無論是順序的還是並行的,只有最少的數據緩沖
 
此外,一些操作被認為是短路操作。一個中間操作,如果在提供無限流輸入時,它可能會產生一個有限的流,那么他就是短路的。
如果在無限流作為輸入時,它可能在有限的時間內終止,這個終端操作是短路的。
在管道中進行短路操作是處理無限流在有限時間內正常終止的必要條件,但不是充分條件 

Parallelism並行

通過顯式的for循環處理元素本質上是串行的
流通過將計算重新定義為聚合操作的管道,而不是在每個單獨元素上立即執行操作,從而促進並行執行。
所有的流操作都可以串行或並行執行
JDK中流的實現創建的都是串行流, 除非顯式的設置為並行
例如,Collection有方法Collection.stream()和Collection.parallelstream(),它們分別產生串行和並行流;
其他的流方法比如  IntStream.range(int, int) 產生串行的流,但是可以通過調用BaseStream.parallel()方法設置為 並行化
想要計算所有 widgets的重量之和 只需要
image_5b7910f0_7b5e
 
 
這個例子的串行和並行版本的唯一區別是初始時創建流,使用parallelStream()而不是stream()
當啟動終端操作時,流管道是按順序或並行執行的,這取決於它被調用的流的策略模式。
一個流是否可以串行或並行執行,可以用isParallel()方法來獲得,
可以用BaseStream.sequential() 和 BaseStream.parallel() 操作修改。
當啟動終端操作時,流管道是按順序或並行執行的,這取決於它被調用的流的模式。
 
除了被確定為顯式非確定性的操作之外,如findAny(),無論是順序執行還是並行執行,都不應該改變計算的結果。
 
大多數流操作接受描述用戶指定行為的參數,這些參數通常是lambda表達式。
為了保持正確的行為,這些行為參數必須是不干涉non-interfering的,並且在大多數情況下必須是無狀態的。
這些參數始終是函數式接口的實例,例如Function,通常是lambda表達式或方法引用

Non-interference  無干擾的 非干涉的

Streams允許您在各種數據源上執行可能並行的聚合操作,甚至包括ArrayList之類的非線程安全集合。
只有當我們能夠在流管道的執行過程中防止對數據源的干擾時這才是可能的。
除了逃脫艙口iterator()和spliterator()之外,都是在調用終端操作時開始執行,並在終端操作完成時結束。
對於大多數數據源來說,防止干擾意味着確保在流管道的執行過程中根本沒有修改數據源。
這方面的一個顯著的例外是源是並發集合的流,它們是專門設計用來處理並發修改的。並發流源是那些Spliterator 設置了並發特性(CONCURRENT characteristic)
 
因此,在流管道中,源不是並發的行為參數,永遠不應該修改流的數據源。
一個行為參數將被稱之為干擾的(interfere) 如果對於一個非並發數據源來說如果它修改或導致被修改數據源被修改.
不僅僅是並行的管道需要,所有的管道都需要是非干擾的(non-interference)
除非流數據源是並發的,否則在執行流管道時修改stream的數據源可能會導致異常、錯誤的答案或不一致的行為。
對於表現良好的stream,數據源是可以修改的,只要是在終端操作開始之前,並且所有的修改都會包含在內
比如
image_5b7910f0_6a66
 
首先創建一個列表,由兩個字符串組成:“one”;和“two”。
然后,從該列表中創建一條stream。接下來,通過添加第三個字符串:“three”來修改列表。
最后,流的元素被collect 以及joining在一起。由於該列表在終端收集操作開始之前被修改,結果將是一串“one two three”。
從JDK集合返回的所有流,以及大多數其他JDK類,都像這樣表現良好;
對於其他庫生成的流,請參閱 Low-level stream construction,以滿足構建行為良好的流的需求。

Stateless behaviors無狀態行為

如果流操作的行為參數是有狀態的,那么流管道的結果可能是不確定的或不正確的。
有狀態的lambda(或實現適當的功函數接口的其他對象)是一個其結果依賴於任何可能在流水線執行過程中發生變化的狀態。
有狀態lambda的一個例子是map()的參數:
image_5b7910f0_374
在這里,如果映射操作是並行執行的,那么相同輸入的結果可能因線程調度差異而變化,而對於無狀態lambda表達式,結果總是相同的
 
還要注意的是,試圖從行為參數訪問可變狀態時,在安全性和性能方面是您一個錯誤的選擇;
如果你不同步訪問那個狀態,你就有了數據競爭,因此你的代碼可能出現問題,
但是如果你對那個狀態進行同步訪問,你就有可能會破壞你想要從並行性中得到的受益。
最好的方法是在流操作中完全地避免有狀態的行為參數; 通常總會有種方法可以重構流以避免狀態性

Side-effects副作用

 
一般來說,對流操作的行為參數的副作用是不鼓勵的,因為它們通常會導致不知情的違反無狀態要求的行為,以及其他線程安全隱患
 
如果行為參數確實有副作用,除非顯式地聲明,否則就無法保證這些副作用對其他線程的可見性,也不能保證在同一條管道內的“相同”元素上的不同操作在相同的線程中執行。此外,這些影響的排序可能出乎意料。即使管道被限制生成一個與stream源的處理順序一致的結果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() 必須生成0、2、4、6、8),對於將mapper函數應用於個別元素的順序,或者對於給定元素執行任何行為參數的順序,都沒有保證
 
對許多可能會被嘗試使用於副作用的計算中,可以替換為無副作用的,更安全更有效的表達,比如使用歸約而不是可變的累積器。
然而,使用println()來進行調試的副作用通常是無害的。少部分的流操作,如forEach()和peek(),用的就是他們的副作用;這些應該小心使用。
 
下面的例子演示,如何從一個使用副作用的計算轉變為不適用副作用
下面的代碼搜索一個字符串流,以匹配給定的正則表達式,並將匹配放在列表中
image_5b7910f0_b8a
 
這段代碼不必要地使用了副作用。如果並行執行,ArrayList的非線程安全將導致不正確的結果,並且添加所需的同步將導致競爭,從而破壞並行性的好處。
此外,在這里使用副作用是完全沒有必要的;forEach()可以簡單地被替換為更安全、更高效、更適合並行化的reduce操作。
image_5b7910f0_2879

Ordering 排序

 
流可能有也可能沒有定義好的順序。流是否有順序取決於源和中間操作。(所謂定義好的順序,就是說原始數據源是否有序)
某些流源(如列表或數組)本質上是有序的,而其他的(如HashSet)則不是。
一些中間操作,比如sorted(),可以在無序的流中強加一個順序,而其他的操作可能會使一個有序的流變成無序,例如BaseStream.unordered(). 
此外,一些終端操作可能會忽略順序,比如forEach()。
 
如果一個流有序,大多數操作都被限制在順序的元素上操作;
如果流的源是包含1、2、3的列表,那么執行map(x-x 2)的結果必須是2、4、6。
然而,如果源沒有定義的順序,那么值2、4、6的任何排列都將是一個有效的結果。
 
對於串行流,順序的存在與否並不影響性能,只影響確定性。
如果一個流是有序的,在相同的源上重復執行相同的流管道將產生相同的結果;
如果沒有排序,重復執行可能會產生不同的結果
 
對於並行流,放松排序的限制有時可以實現更高效的執行。
如果元素的排序不是很重要,那么可以更有效地實現某些聚合操作,如過濾重復元素(distinct()  )或分組歸約(Collectors.groupingBy())。
類似地,與順序相關的操作,如limit(),可能需要緩沖以確保正確的排序,從而破壞並行性的好處。
在流有順序的情況下,但是用戶並不特別關心這個順序,顯式地通過unordered()方法調用取消排序, 可能會改善一些有狀態或終端操作的並行性能。
然而,大多數的流管道,例如上面的“blocks的重量總和”,即使在排序約束下仍然有效地並行化。
 

Reduction operations歸約操作

 
一個歸約操作(也稱為折疊)接受一系列的輸入元素,並通過重復應用組合操作將它們組合成一個簡單的結果,例如查找一組數字的總和或最大值,或者將元素累積到一個列表中。streams類有多種形式的通用歸約reduce操作,稱為reduce()和collect(),以及多個專門化的簡化形式,如sum()、max()或count()
 
當然,這樣的操作可以很容易用簡單的順序循環來實現,如下所示
image_5b7910f0_6543
 
然而,我們有充分的理由傾向於reduce操作,而不是像上面這樣的迭代累計運算。
它不僅是一個“更抽象的”——它在流上把流作為一個整體運行而不是作用於單獨的元素——但是一個適當構造的reduce操作本質上是可並行的,只要用於處理元素的函數(s)是結合的associative和無狀態stateless的。舉個例子,給定一個數字流,我們想要找到和,我們可以寫:
image_5b7910f0_1a90
 
幾乎不需要怎么修改,就可以以並行的方式運行
image_5b7910f0_458
 
之所以歸約操作可以很好地並行,是因為實現可以並行地操作數據的子集,然后將中間結果組合在一起,得到最終的正確答案。(即使該語言有一個“"parallel for-each"”構造,迭代累計運算方法仍然需要開發人員提供對共享累積變量sum的線程安全更新以及所需的同步,這可能會消除並行性帶來的任何性能收益。)
使用reduce()代替了歸約操作的並行化的所有負擔,並且庫可以提供一個高效的並行實現,不需要額外的同步
 
前面展示的“widgets”示例展示了如何與其他操作相結合,以替換for循環。
如果widgets 是Widget 對象的集合,它有一個getWeight方法,我們可以找到最重的widget:
image_5b7910f0_3e8  
 
在更通用的形式中   對類型為T的元素,並且返回結果類型為U的reduce操作   需要三個參數:
image_5b7910f0_b3
在這里,identity不僅僅是歸約的初始化結果值或者如果沒有任何元素時的一個默認的返回值
迭代累計運算器接受部分結果和下一個元素,並產生一個新的中間結果。
組合函數結合了兩個部分結果,產生了一個新的中間結果。
(在並行減少的情況下,組合是必要的,在這個過程中,輸入被分區,每個分區都計算出部分的累積,然后將部分結果組合起來產生最終的結果。)
 
更准確地說,identity必須是組合函數的恆等式。這意味着對所有的u,combiner.apply(identity, u)等於u,
另外,組合函數必須是結合的,必須與累加器函數兼容:
對所有u和t,
combiner.apply(identity, u) 必須等於accumulator.apply(u, t).
三參數形式是雙參數形式的泛化,將映射步驟合並到累加步驟中。
我們可以用更一般的形式重新改寫這個簡單的widgets重量的例子
image_5b7910f0_414e
盡管顯式的map-reduce的形式更易於閱讀,因此通常應該優先考慮。
通用的形式是為了  通過將映射和減少到單個函數,以重要的工作進行優化 這種場景

Mutable reduction 可變的歸約

一個可變的歸約操作在處理流中的元素時,將輸入元素積累到一個可變的結果容器中,例如一個Collection或StringBuilder,
如果我們想要獲取一串字符串的流並將它們連接成一個長字符串,我們可以通過普通的reduce來實現這個目標:
image_5b7910f0_1e36
 
我們會得到想要的結果,它甚至可以並行工作,然而,但是我們可能對性能不滿意
這樣的實現將會進行大量的字符串復制  時間復雜度O(n^2)
一種更有效的方法是將結果累積到StringBuilder中,這是一個用於累積字符串的可變容器
就如同我們對普通的歸約操作處理一樣,我們可以使用相同的技術來處理可變的歸約
 
可變歸約操作稱為collect()當它將期望的結果收集到一個結果容器中,例如一個集合
收集操作需要三個功能:
一個supplier 功能來構造結果容器的新實例,
一個累計運算器函數將一個輸入元素合並到一個結果容器中,
一個組合函數將一個結果容器的內容合並到另一個結果容器中。
 
它的形式與普通歸約的一般形式非常相似
image_5b7910f0_2046
 
與reduce()相比,以這種抽象的方式表示收集的好處是它直接適合並行化:
我們可以並行地累計運算部分結果,然后將它們組合起來,只要積累和組合功能滿足適當的需求。
例如,為了收集流中的元素的字符串表示到ArrayList,我們可以編寫顯式的for循環
image_5b7910f0_30f2
或者我們可以使用一個可並行的collect形式
image_5b7910f0_56c8
 
或者,從累加器函數中提取出來map操作,我們可以更簡潔地表達它:
image_5b7910f0_4867
 
在這里,我們的supplier只是ArrayList的構造器,累加器將string   element元素添加到ArrayList中,組合器簡單地使用addAll將字符串從一個容器復制到另一個容器中
 
collect的三個部分——supplier, accumulator, 和combiner ——是緊密耦合的。
我們可以使用Collector來抽象的表達描述這三部分。
上面的例子可以將字符串collect到列表中,可以使用一個標准收集器來重寫:
image_5b7910f0_4a1c
 
將可變的歸約打包成收集器有另一個優點:可組合性。
類Collectors包含許多用於收集器的預定義工廠,包括將一個收集器轉換為另一個收集器的組合器。
例如,假設我們有一個Collector,它計算員工流的薪水之和,如下所列
 
image_5b7910f0_175
 
 
(對於第二個類型的參數  ?  ,僅僅表明我們不關心收集器所使用的中間類型。 )如果我們想要創建一個收集器來按部門計算工資的總和,我們可以使用groupingBy來重用summingSalaries 薪水:
image_5b7910f0_256d
就像常規的reduce操作一樣,只有滿足適當的條件collect()  操作才能夠並行化
對於任何部分累計運算的結果,將其與空結果容器相結合combiner  必須產生一個等效的結果
也就是說,對於任意一個部分累計運算的結果p,累計運算或者組合調用的結果,p必須等於   combiner.apply(p, supplier.get()).
 
而且,無論計算是否分割,它必須產生一個等價的結果。對於任何輸入元素t1和t2,下面計算的結果r1和r2必須是等價的
image_5b7910f0_1d27
在這里,等價通常指的是Object.equals(Object).。但在某些情況下,等價性的要求可能會降低
 

Reduction, concurrency, and ordering 歸約 並發與排序

 
通過一些復雜的reduce操作,例如生成map的collect(),例如
image_5b7910f0_604
並行執行操作可能實際上會產生反效果。這是因為組合步驟(通過鍵將一個Map合並到另一個Map)對於某些Map實現來說可能代價很大
 
然而,假設在這個reduce中使用的結果容器是一個可修改的集合——例如ConcurrentHashMap。在這種情況下,對迭代累計運算器的並行調用實際上可以將它們的結果並發地放到相同的共享結果容器中,從而將不再需要組合器合並不同的結果容器。這可能會促進並行執行性能的提升。我們稱之為並行reduce
 
支持並發reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性為標志。並發特性。然而,並發集合也有缺點。
如果多個線程將結果並發地存入一個共享容器,那么產生結果的順序是不確定的。
因此,只有在排序對正在處理的流不重要的情況下,才可能執行並發的reduce
下面這些條件下   Stream.collect(Collector) 的實現會並發reduce(歸約)
  • 流是並行的;
  • 收集器有Collector.Characteristics.CONCURRENT 特性
  • 要么是無序的流,要么收集器擁有Collector.Characteristics.UNORDERED 特性
 
您可以通過使用BaseStream.unordered()方法來確保流是無序的。例如:
image_5b7910f0_2a7e
 
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同於 groupingBy). 

Associativity 結合性

 
如果一個操作或者函數方法滿足下面的形式,那么他就是結合的
image_5b7910f0_724c
如果我們把這個問題擴大到四項,就可以看到這種結合性對於並行的重要性
image_5b7910f0_43e2
這樣我們就可以把(a op b)  和 (c op d) 進行並行計算  最后在對他們進行  op  運算
結合性操作的例子包括數字加法、min、max和字符串串聯

Low-level stream construction  低級流構造器

 
到目前為止,所有的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法來獲得一個stream。這些處理流的方法是如何實現的?
 
類StreamSupport提供了許多用於創建流的低級方法,所有這些方法都使用某種形式的Spliterator。
一個Spliterator是迭代器的一個並行版本;
它描述了一個(可能是無限的)元素集合,支持順序前進、批量遍歷,並將一部分輸入分割成另一個可並行處理的Spliterator。
在最低層,所有的流都由一個Spliterator驅動構造
 
在實現Spliterator時,有許多實現選擇,幾乎所有的實現都是在簡單的實現和運行時性能之間進行權衡。
創建Spliterator的最簡單、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)從一個iterator中創建spliterator  。
雖然這樣的spliterator 可以工作,但它可能會提供糟糕的並行性能,因為我們已經丟失了容量信息(底層數據集有多大),以及被限制為一個簡單的分割算法。
 
一個高質量的spliterator 將提供平衡的和已知大小的分割,精確的容量信息,以及一些可用於實現優化執行的spliterator 或數據的其他特征  (特征見spliterator characteristics)
 
可變數據源的Spliterators 有一個額外的挑戰;綁定到數據的時間,因為數據可能在創建Spliterators 后和開始執行流管道的期間,發生變化。
理想情況下,一個流的spliterator將報告一個IMMUTABLE or CONCURRENT;如果不是,應該是后期綁定(late-binding)。
如果一個源不能直接提供一個推薦的spliterator,它可能會通過Supplier 間接地提供一個spliterator,並通過接收Supplier作為參數的stream()版本構造一個stream。只有在流管道的終端操作之后,才從Supplier處獲得spliterator
 
這些要求極大地減少了流源的變化和流管道的執行之間的潛在的干擾。
基於具有所需特性的spliterators ,或者使用 Supplier-based 的工廠的形式的流,在終端操作開始之前對數據源的修改是不受影響的(如果流操作的行為參數滿足不干涉和無狀態的要求標准)。參見不干涉 Non-Interference的細節。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM