背景(Background)
如果從一開始就將lambda表達式(閉包)作為Java語言的一部分,那么我們的Collections API肯定會與今天的外觀有所不同。隨着Java語言獲得作為JSR 335一部分的lambda表達式,這具有使我們的Collections接口看起來更加過時的副作用。盡管可能很想從頭開始並構建替換的Collection框架(“ Collections II”),但是替換Collection框架將是一項主要任務,因為Collections接口遍布JDK庫。相反,我們將繼續增加擴展方法,現有的接口(如進化策略Collection,List或Iterable),或者以新的接口(如“流”)被改裝到現有的類,使許多希望的成語,而沒有讓人們買賣其值得信賴的ArrayListS和HashMap秒。(這並不是說Java永遠不會有一個新的Collections框架;很明顯,現有的Collections框架存在局限性,而不僅僅是為lambda設計的。除此以外,創建一個經過改進的Collections框架是一個很好的考慮因素。 JDK的未來版本。)
並行性是這項工作的重要推動力。因此,要鼓勵那些成語是非常重要的兩個 sequential-和並行友好。我們主要通過較少地關注就地突變而更多地關注產生新值的計算來實現這一目標。在使並行變得容易但不能使其變得不可見之間取得平衡也是重要的; 我們的目標是 為新舊系列提供明確但不干擾的並行性。
內部與外部迭代(Internal vs external iteration)
Collections框架依賴於外部迭代的概念,其中a Collection 提供了一種方法來為其客戶端枚舉其元素(Collectionextends Iterable),並且客戶端使用它來順序地遍歷一個集合的元素。例如,如果我們想將一組塊中的每個塊的顏色設置為紅色,則可以這樣寫:
for (Block b : blocks) {
b.setColor(RED);
}
這個例子說明了外部迭代。for-each循環調用的iterator() 方法blocks,然后一步一步地遍歷集合。外部迭代非常簡單,但是存在幾個問題:
- 它本質上是串行的(除非該語言提供了用於並行迭代的結構,而Java沒有提供),並且需要按集合指定的順序處理元素。
- 它使庫方法失去了管理控制流的機會,該方法可能能夠利用數據的重新排序,並行性,短路或惰性來提高性能。
有時需要嚴格指定for-each loop (sequential, in-order) ,但這有時會妨礙性能。
外部迭代的替代方法是內部迭代,它不是控制迭代,而是將其委托給庫,並傳遞代碼片段以在計算的各個點執行。
上一個示例的內部迭代等效項是:
blocks.forEach(b -> { b.setColor(RED); });
這種方法將控制流管理從客戶端代碼轉移到庫代碼,從而使庫不僅可以對通用控制流操作進行抽象,而且還可以使它們潛在地使用延遲,並行和無序執行來提高性能。 。(是否實現forEach這些操作實際上是由庫實現forEach者決定的,但是對於內部迭代,至少是可能的,而對於外部迭代,則不是。)
內部迭代使其具有一種編程樣式,其中可以將操作“管道化”在一起。例如,如果我們只想將藍色塊塗成紅色,我們可以說:
blocks.filter(b -> b.getColor() == BLUE)
.forEach(b -> { b.setColor(RED); });
該濾波器操作產生匹配所提供的條件值的流,並且所述濾波操作的結果被管道輸送到forEach。
如果我們想將藍色塊收集到一個新的中List,我們可以說:
List<Block> blue = blocks.filter(b -> b.getColor() == BLUE)
.into(new ArrayList<>());
如果每個方框都包含在一個Box中,並且我們想知道哪些方框至少包含一個藍色方框,我們可以說:
Set<Box> hasBlueBlock = blocks.filter(b -> b.getColor() == BLUE)
.map(b -> b.getContainingBox())
.into(new HashSet<>());
如果我們希望將藍色塊的總重量相加,則可以表示為:
int sum = blocks.filter(b -> b.getColor() == BLUE)
.map(b -> b.getWeight())
.sum();
到目前為止,我們還沒有寫下這些操作的簽名-這些將在后面顯示。此處的示例僅說明了內部迭代可以輕松解決的問題類型,並說明了我們希望在集合中公開的功能。
惰性作用(The role of laziness)
像過濾或映射這樣的操作,可以“急切地”執行(過濾在從過濾方法返回時完成),也可以“懶惰地”執行(過濾只在開始迭代過濾方法結果的元素時完成)。將自己應用於懶惰的實現,這通常會導致顯著的性能改進。我們可以將這些操作視為“自然懶惰”,不管它們是否被實現。另一方面,像積累這樣的操作,或者產生副作用,比如把結果傾注到一個集合中或者為每一個元素做一些事情(比如打印出來),都是“自然渴望的”。
基於對許多現有循環的檢查,可以從數據源(數組或集合)中繪制大量操作,進行一系列的惰性操作(過濾、映射等),然后進行一個急切操作(如過濾器映射累加),重述(通常在過程中明顯變小)。因此,大多數自然懶惰操作傾向於用來計算臨時中間結果,並且我們可以利用這個屬性來生成更高效的庫。(例如,一個延遲地進行過濾或映射的庫可以將filter map accumulate之類的管道融合到數據的一個傳遞中,而不是三個不同的傳遞中;一個急切地進行過濾或映射的庫不能。類似地,如果我們在尋找與某一特性匹配的第一個元素,那么一個懶惰的方法讓我們得到了檢查較少元素的答案。
這個觀察結果提供了一個關鍵的設計選擇:filter和map的返回值應該是什么?其中一個候選者是list.filter返回一個新的list,這將推動我們朝着一個全力以赴的方向前進。這是直截了當的,但最終可能做得比我們真正需要的更多。另一種方法是為顯式懶惰創建一個全新的抽象集——LaZyLIST、LaZySeT等(但請注意,懶惰的集合仍然具有觸發急切計算的操作——例如大小)。並且,這種方法有可能演變成像MutableSynchronizedLazySortedSet等類型的組合爆炸。
我們首選的方法是將自然懶惰操作當作返回一個流(例如迭代)而不是一個新的集合(無論如何它可能被下一個流水線階段丟棄)。將此應用於上面的示例,過濾器從源(可能是另一個流)中提取並生成與所提供的謂詞匹配的值流。在大多數潛在的懶惰操作被應用到聚合的情況下,這恰好是我們想要的——一個可以傳遞到流水線中的下一個階段的值流。目前,迭代是流的抽象,但這是一個明確的臨時選擇,我們將很快重新訪問,可能創建一個流抽象,它沒有迭代問題(固有檢查然后行為;假設底層源的可變異性;生活在Java.Lang.)中。
流方法的優點是,當用於源代碼惰性惰性渴望管道時,惰性通常是看不見的,因為管道兩端都是“密封的”,但是在不顯著增加庫的概念表面積的情況下,可以獲得良好的可用性和性能。
流(Streams)
下面顯示了一組stream操作。這些方法本質上是順序的,在上游迭代器返回的順序中處理元素(遇到順序)。在當前的實現中,我們使用Iterable作為這些方法的宿主。返回Iterable的方法是懶惰的;那些不急於返回的方法是懶惰的。所有這些操作都可以通過默認的方法單獨實現Iterator(),因此現有Collection實現不需要額外的工作來獲取新的功能。還請注意,Stream功能僅與集合成切線關系;如果備用收集框架想要獲取這些方法,則它們所需要做的只是實現 Iterable。
流(Stream)在幾個方面與集合(Collections )不同:
- 沒有存儲空間。 流沒有存儲值。它們通過一系列操作從數據結構中攜帶值。
- 本質上是功能性的。 對流的操作會產生結果,但不會修改其基礎數據源。可以將Collection用作流的源(取決於適當的無干擾要求,請參見下文)。
- 懶惰尋求。 許多流操作(例如過濾,映射,排序或重復刪除)都可以延遲實施,這意味着我們只需要檢查流中要查找所需答案的元素數量即可。例如,“查找第一個大於20個字符的字符串”不需要檢查所有輸入字符串。
- 邊界可選。 有很多問題可以明智地表達為無限流,讓客戶消費價值直到滿意為止。(如果我們要枚舉完美的數字,則可以很容易地將其表示為對所有整數流進行過濾的操作。)集合不允許您這樣做,但是流可以這樣做。
下面顯示了一組基本的流操作,表示為上的擴展方法Iterable。
public interface Iterable<T> {
// Abstract methods
Iterator<T> iterator();
// Lazy operations
Iterable<T> filter(Predicate<? super T> predicate) default ...
<U> Iterable<U> map(Mapper<? super T, ? extends U> mapper) default ...
<U> Iterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default ...
Iterable<T> cumulate(BinaryOperator<T> op) default ...
Iterable<T> sorted(Comparator<? super T> comparator) default ...
<U extends Comparable<? super U>> Iterable<T> sortedBy(Mapper<? super T, U> extractor) default ...
Iterable<T> uniqueElements() default ...
<U> Iterable<U> pipeline(Mapper<Iterable<T>, ? extends Iterable<U>> mapper) default ...
<U> BiStream<T, U> mapped(Mapper<? super T, ? extends U> mapper) default ...
<U> BiStream<U, Iterable<T>> groupBy(Mapper<? super T, ? extends U> mapper) default ...
<U> BiStream<U, Iterable<T>> groupByMulti(Mapper<? super T, ? extends Iterable<U>> mapper) default ...
// Eager operations
boolean isEmpty() default ...;
long count() default ...
T getFirst() default ...
T getOnly() default ...
T getAny() default ...
void forEach(Block<? super T> block) default ...
T reduce(T base, BinaryOperator<T> reducer) default ...
<A extends Fillable<? super T>> A into(A target) default ...
boolean anyMatch(Predicate<? super T> filter) default ...
boolean noneMatch(Predicate<? super T> filter) default ...
boolean allMatch(Predicate<? super T> filter) default ...
<U extends Comparable<? super U>> T maxBy(Mapper<? super T, U> extractor) default ...
<U extends Comparable<? super U>> T minBy(Mapper<? super T, U> extractor) default ...
}
懶惰和短路(Laziness and short-circuiting)
類似anyMatch的方法,雖然是急性的,但一旦可以確定最終結果,便可以使用short-circuiting來停止處理-它只需要對足夠多的元素進行謂詞評估就可以找到該謂詞為真的單個元素。
在像這樣的傳輸中:
int sum = blocks.filter(b -> b.getColor() == BLUE)
.map(b -> b.getWeight())
.sum();
在filter和map操作是惰性的。這意味着在sum步驟開始之前,我們不會從源頭開始繪制元素,從而最大程度地減少了管理中間元素所需的簿記成本。
另外,給定一個類似的傳輸方式:
Block firstBlue = blocks.filter(b -> b.getColor() == BLUE)
.getFirst();
由於篩選器步驟是惰性的,因此該getFirst步驟將僅在上游進行,Iterator直到獲得一個元素為止,這意味着我們只需要對元素上的謂詞求值,直到找到該謂詞為真的元素為止,而不是所有元素都為真。
請注意,用戶不必詢問懶惰,甚至不必考慮太多。正確的事情發生了,庫安排了盡可能少的計算。
用戶可以按以下方式調用:
Iterable<Block> it = blocks.filter(b -> b.getColor() == BLUE);
並從中獲得一個Iterator,盡管我們嘗試將功能集設計為不需要這種用法。在這種情況下,此操作只會創建一個Iterable,但除了保留對上游Iterable(blocks)及其Predicate過濾對象的引用之外,不會做任何其他工作。Iterator 從this獲得an以后,所有工作都將完成Iterable。
通用功能接口(Common functional interfaces)
Java中的Lambda表達式將轉換為一種方法接口(功能性接口)的實例。該軟件包java.util.functions包含功能接口的“入門套件”:
- Predicate -- 作為參數傳遞的對象的屬性
- Block -- 將對象作為參數傳遞時要執行的操作
- Mapper -- 將T轉換為U
- UnaryOperator -- 來自T-> T的一元運算符
- BinaryOperator -- 來自(T,T)-> T的二進制運算符
出於性能原因,可能需要提供這些核心接口的專門的原始版本。(在這種情況下,可能不需要完整的原始特征的補充;如果我們提供Integer、Long和Double,則可以通過轉換來容納其他原始類型。)。
不干擾假設(Non-interference assumptions)
因為Iterable可以描述一個可變的集合,所以如果在遍歷集合時修改它,就有可能產生干擾。Iterable上的新操作將在操作期間保持基礎源不變的情況下使用。(這種情況一般容易維持;如果集合僅限於當前線程,只需確保傳遞給filter、map等的lambda表達式不會改變底層集合。這個條件與當前迭代集合的限制沒有本質的不同;如果一個集合在迭代時被修改,大多數實現都會拋出ConcurrentModificationException。)在上面的示例中,我們通過過濾一個集合來創建一個Iterable,遍歷過濾后的Iterable時遇到的元素是基於底層集合的迭代器返回的元素。因此,對iterator()的重復調用將導致對上游迭代的重復遍歷;這里沒有緩存延遲計算的結果。(因為大多數管道看起來都是源代碼-延遲-延遲-等待,所以大多數時候底層集合只會被遍歷一次。)
實例(Examples)
下面是JDK類class (getEnclosingMethod方法)的一個例子,它遍歷所有聲明的方法、匹配的方法名、返回類型以及參數的數量和類型。原始代碼如下:
for (Method m : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
if (m.getName().equals(enclosingInfo.getName()) ) {
Class<?>[] candidateParamClasses = m.getParameterTypes();
if (candidateParamClasses.length == parameterClasses.length) {
boolean matches = true;
for(int i = 0; i < candidateParamClasses.length; i++) {
if (!candidateParamClasses[i].equals(parameterClasses[i])) {
matches = false;
break;
}
}
if (matches) { // finally, check return type
if (m.getReturnType().equals(returnType) )
return m;
}
}
}
}
throw new InternalError("Enclosing method not found");
使用filter和getFirst,我們可以消除所有臨時變量,並將控制邏輯移到庫中。我們從反射中獲取方法列表,將其轉換為一個可迭代的數組。asList(我們也可以向數組類型中注入類似流的接口),然后使用一系列過濾器來拒絕不匹配名稱、參數類型或返回類型的過濾器:
Method matching =
Arrays.asList(enclosingInfo.getEnclosingClass().getDeclaredMethods())
.filter(m -> Objects.equals(m.getName(), enclosingInfo.getName())
.filter(m -> Arrays.equals(m.getParameterTypes(), parameterClasses))
.filter(m -> Objects.equals(m.getReturnType(), returnType))
.getFirst();
if (matching == null)
throw new InternalError("Enclosing method not found");
return matching;
這個版本的代碼更緊湊,更不易出錯。
流操作對於集合上的特別查詢非常有效。考慮一個假設的“音樂庫”應用程序,其中一個庫有一個專輯列表,一個專輯有一個標題和一個曲目列表,一個曲目有一個名稱、歌手和評級。
考慮這樣的查詢“為我找到至少有一首排名在4或4以上的專輯的名字,按名字排序。”為了構造這個集合,我們可以這樣寫:
List<Album> favs = new ArrayList<>();
for (Album a : albums) {
boolean hasFavorite = false;
for (Track t : a.tracks) {
if (t.rating >= 4) {
hasFavorite = true;
break;
}
}
if (hasFavorite)
favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
public int compare(Album a1, Album a2) {
return a1.name.compareTo(a2.name);
}});
我們可以使用流操作來簡化三個主要步驟中的每一個——確定專輯中的任何曲目是否至少在(anyMatch)上有一個評級,排序,以及將符合我們標准的專輯集合放入一個列表:
List<Album> sortedFavs =
albums.filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
.sortedBy(a -> a.name)
.into(new ArrayList<>());
非線性流(Nonlinear streams)
如上所述,“obvious”的流形狀是簡單的線性值流,例如可以由數組或Collection管理的值。我們可能還想表示其他常見的形狀,例如(鍵,值)對的流(可能限制了鍵的唯一性。)
將雙值流表示為值流可能會很方便Pair<X,Y>。這將很容易,並允許我們重用現有的流機制,但會產生一個新問題:如果我們可能想對鍵值流執行新操作(例如將其拆分為a keys或 values流),則擦除操作會進入方式-我們無法表達僅在類的類型變量滿足某些約束(例如a)的情況下存在的方法Pair。(這是C#靜態擴展方法的一個優點,它被注入實例化的泛型類型而不是類中,這是毫無價值的。)此外,將雙值流建模為的流。Pair對象可能會有大量的“裝箱”開銷。通常,每個不同的流“形狀”可能都需要其自己的流抽象,但這並不是不合理的,因為每個不同的形狀將具有在該形狀上有意義的自己的一組操作。
因此,我們使用一個單獨的抽象為雙值流建模,我們將其暫時稱為BiStream。因此我們的流庫具有兩個基本的流形狀:linear (Iterable) 和map-shaped (BiStream),就像Collections框架具有兩個基本形狀(Collection and Map)一樣。
雙值流可以對“ zip”運算的結果,地圖的內容或分組運算的結果(其中結果為BiStream<U, Stream
Map<Integer, Integer>
counts = document.words() // stream of strings
.groupBy(s -> s.length()) // bi-stream length -> stream of words with that length
.mapValues(stream -> stream.count()) // bi-stream length -> count of words
.into(new HashMap<>()); // Map length -> count
並行性(Parallelism)
雖然內部迭代的使用使得操作可以並行完成,但是我們不希望給用戶帶來任何“透明的並行性”。相反,用戶應該能夠以一種顯式但不顯眼的方式選擇並行性。我們通過允許客戶顯式地請求集合的“並行視圖”來實現這一點,集合的操作是並行執行的;這是通過parallel()方法在集合上公開的。如果我們想要並行計算我們的“藍色塊的權重和”查詢,我們只需要添加一個調用parallel():
int sum = blocks.parallel()
.filter(b -> b.getColor() == BLUE)
.map(b -> b.getWeight())
.sum();
這看起來與串行版本非常相似,但是被明確地標識為並行的,而沒有並行機制壓倒代碼。
有了Java SE 7中添加的Fork/Join框架,我們就有了實現並行操作的高效機制。然而,這項工作的目標之一是減少相同計算的串行和並行版本之間的差距,目前使用Fork/Join並行化計算與串行代碼看起來非常不同(而且比串行代碼大得多)——這是並行化的障礙。通過公開流操作的並行版本,並允許用戶顯式地在串行和並行執行之間進行選擇,我們可以極大地縮小這一差距。
使用Fork/Join實現並行計算所涉及的步驟是:將問題划分為子問題,按順序解決子問題,並組合子問題的結果。Fork/Join機制被設計成自動化這個過程。
我們對Fork/Join的結構需求進行了建模,並使用了一個稱為Splittable的分割抽象,它描述了可以進一步分割成更小塊的子聚合,或者其元素可以按順序迭代的子聚合。
public interface Splittable<T, S extends Splittable<T, S>> {
/** Return an {@link Iterator} for the elements of this split. In general, this method is only called
* at the leaves of a decomposition tree, though it can be called at any level. */
Iterator<T> iterator();
/** Decompose this split into two splits, and return the left split. If further splitting is impossible,
* {@code left} may return a {@code Splittable} representing the entire split, or an empty split.
*/
S left();
/** Decompose this split into two splits, and return the right split. If further splitting is impossible,
* {@code right} may return a {@code Splittable} representing the entire split, or an empty split.
*/
S right();
/**
* Produce an {@link Iterable} representing the contents of this {@code Splittable}. In general, this method is
* only called at the top of a decomposition tree, indicating that operations that produced the {@code Spliterable}
* can happen in parallel, but the results are assembled for sequential traversal. This is designed to support
* patterns like:
* collection.filter(t -> t.matches(k))
* .map(t -> t.getLabel())
* .sorted()
* .sequential()
* .forEach(e -> println(e));
* where the filter / map / sort operations can occur in parallel, and then the results can be traversed
* sequentially in a predicatable order.
*/
Iterable<T> sequential();
}
為常見的數據結構(如基於數組的列表、二叉樹和映射)實現Splittable非常簡單。
我們使用Iterable來描述順序集合,這意味着一個集合知道如何按順序分配它的成員。Iterable的並行模擬體現了可拆分的行為,以及類似於Iterable上的聚合操作。我們目前將其稱為ParallelIterable。
public interface ParallelIterable<T> extends Splittable<T, ParallelIterable<T>> {
// Lazy operations
ParallelIterable<T> filter(Predicate<? super T> predicate) default ...
<U> ParallelIterable<U> map(Mapper<? super T, ? extends U> mapper) default ...
<U> ParallelIterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default ...
ParallelIterable<T> cumulate(BinaryOperator<T> op) default ...
ParallelIterable<T> sorted(Comparator<? super T> comparator) default ...
<U extends Comparable<? super U>> ParallelIterable<T> sortedBy(Mapper<? super T, U> extractor) default ...
ParallelIterable<T> uniqueElements() default ...
// Eager operations
boolean isEmpty() default ...;
long count() default ...
T getFirst() default ...
T getOnly() default ...
T getAny() default ...
void forEach(Block<? super T> block) default ...
T reduce(T base, BinaryOperator<T> reducer) default ...
<A extends ParallelFillable<? super T>> A into(A target) default ...
<A extends Fillable<? super T>> A into(A target) default ...
boolean anyMatch(Predicate<? super T> filter) default ...
boolean noneMatch(Predicate<? super T> filter) default ...
boolean allMatch(Predicate<? super T> filter) default ...
<U extends Comparable<? super U>> T maxBy(Mapper<? super T, U> extractor) default ...
<U extends Comparable<? super U>> T minBy(Mapper<? super T, U> extractor) default ...
}
您將注意到ParallelIterable上的操作集與Iterable上的操作非常相似,只是延遲操作返回的是ParallelIterable而不是Iterable。這意味着順序集合上的操作管道也將以相同的方式(僅以並行方式)在並行集合上工作。
需要的最后一步是從(順序的)集合中獲得ParallelIterable的方法;這是新的parallel()方法在集合上返回的結果。
interface Collection<T> {
....
ParallelIterable<T> parallel();
}
我們在這里實現的是將遞歸分解的結構特性與可在可分解數據結構上並行執行的算法分離開來。數據結構的作者只需要實現Splittable方法,然后就可以立即訪問filter、map和friends的並行實現。類似地,向ParallelIterable添加新方法可以立即在任何知道如何分割自身的數據結構上使用。
變異運算(Mutative operations)
對集合進行批量操作的許多用例會產生新的值、集合或副作用。然而,有時我們確實希望對集合進行就地修改。我們打算在采集上添加的主要原位突變有:
- 刪除與謂詞(Collection)匹配的所有元素
- 用新元素(List)替換與謂詞匹配的所有元素
- 對列表進行排序(List)
這些將作為擴展方法添加到適當的接口上。