上一節學習了Java8中比較常用的內置collector的用法。接下來就來理解下collector的組成。
Collector定義
Collector接口包含了一系列方法,為實現具體的歸約操作(即收集器)提供了范本。我們已經看過了Collector接口中實現的許多收集器,例如toList或groupingBy。這也意味着你可以為Collector接口提供自己的實現,從而自由創建自定義歸約操作。
要開始使用Collector接口,我們先來看看toList的實現方法,這個在日常中使用最頻繁的東西其實也簡單。
Collector接口定義了5個函數
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
}
- T是流中要收集的對象的泛型
- A是累加器的類型,累加器是在收集過程中用於累積部分結果的對象。
- R是收集操作得到的對象(通常但不一定是集合)的類型。
對於toList, 我們收集的對象是T, 累加器是List
public class ToListCollector<T> implements Collector<T, List<T>, List<T>>
理解Collector幾個函數
建立新的結果容器 supplier方法
supplier方法必須返回一個結果為空的Supplier,也就是一個無參數函數,在調用時,它會創建一個空的累加器實例,供數據收集過程使用。就個人通俗的理解來說,這個方法定義你如何收集數據,之所以提煉出來就是為了讓你可以傳lambda表達式來指定收集器。對於toList, 我們直接返回一個空list就好。
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
累加器執行累加的具體實現 accumulator方法
accumulator方法會返回執行歸約操作的函數,該函數將返回void。當遍歷到流中第n個元素時,這個函數就會執行。函數有兩個參數,第一個參數是累計值,第二參數是第n個元素。累加值與元素n如何做運算就是accumulator做的事情了。比如toList, 累加值就是一個List,對於元素n,當然就是add。
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
對結果容器應用最終轉換 finisher方法
當遍歷完流之后,我們需要對結果做一個處理,返回一個我們想要的結果。這就是finisher方法所定義的事情。finisher方法必須返回在累積過程的最后要調用的一個函數,以便將累加器對象轉換為整個集合操作的最終結果, 這個返回的函數在執行時,會有個參數,該參數就是累積值,會有一個返回值,返回值就是我們最終要返回的東西。對於toList, 我最后就只要拿到那個收集的List就好,所以直接返回List。
@Override
public Function<List<T>, List<T>> finisher() {
return (i) -> i;
}
對於接收一個參數,返回一個value,我們可以想到Function函數,正如finisher()的返回值。對於這個返回參數本身的做法,Function有個靜態方法
static <T> Function<T, T> identity() {
return t -> t;
}
可以用Function.identity()
代替上述lambda表達式。
合並兩個結果容器 combiner
上面看起來似乎已經可以工作了,這是針對順序執行的情況。我們知道Stream天然支持並行,但並行卻不是毫無代價的。想要並行首先就必然要把任務分段,然后才能並行執行,最后還要合並。雖然Stream底層對我們透明的執行了並行,但如何並行還是需要取決於我們自己。這就是combiner要做的事情。combiner方法會返回一個供歸約操作使用的函數,它定義了對流的各個子部分並行處理時,各個字部分歸約所得的累加器要如何合並。對於toList而言,Stream會把流自動的分成幾個並行的部分,每個部分都執行上述的歸約,匯集成一個List。當全部完成后再合並成一個List。
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
這樣,就可以對流並行歸約了。它會用到Java7引入的分支/合並框架和Spliterator抽象。大概如下所示,
- 原始流會以遞歸方式拆分為子流,直到定義流是否進一步拆分的一個條件為非(如果分布式工作單位太小,並行計算往往比順序計算要慢,而且要是生成的並行任務比處理器內核數多很多的話就毫無意義了)。
- 現在,所有的子流都可以並行處理,即對每個子流應用順序歸約算法。
- 最后,使用收集器combiner方法返回的函數,將所有的部分結果兩兩合並。這時,會把原始流每次拆分得到的子流對應的結果合並起來。
characteristics方法
最后一個方法characteristics會返回一個不可變的Characteristics集合,它定義了收集器的行為--尤其是關於流是否可以並行歸約,以及可以使用哪些優化的提示。
Characteristics是一個包含三個項目的枚舉:
- UNORDERED--歸約結果不受流中項目的遍歷和累積順序的影響
- CONCURRENT--accumulator函數可以從多個線程同時調用,且該收集器可以並行歸約流。如果收集器沒有標為UNORDERED, 那它僅在用於無序數據源時才可以並行歸約。
- IDENTITY_FINISH--這表明完成器方法返回的函數是一個恆等函數,可以跳過。這種情況下,累加器對象將會直接用做歸約過程的最終結果。這也意味着,將累加器A不加檢查地轉換為結果R是安全的。
我們迄今為止ToListCollector是IDENTITY_FINISH的,因為用來累積流中元素的List已經是我們要的最終結果,用不着進一步轉換了,但它並不是UNORDERED,因為用在有序流上的時候,我們還是希望順序能夠保留在得到到List中。最后,他是CONCURRENT的,但我們剛才說過了,僅僅在背后的數據源無序時才會並行處理。
上面這段話說的有點繞口,大概是說像Set生成的stream是無序的,這時候toList就可以並行。而ArrayList這種隊列一樣的數據結構則生成有序的stream,不能並行。
使用
直接傳給collect方法就好。
List<Dish> rs = dishes
.stream()
.collect(new ToListCollector<>());
我們這樣費盡心思去創建一個toListCollector,一個是為了熟悉Collector接口的用法,一個是方便重用。當再遇到這樣的需求的時候就可以直接用這個自定義的函數了,所以才有toList()這個靜態方法。否則,其實collect提供了重載函數可以直接定義這幾個函數。比如,可以這樣實現toList
List<Dish> dishes = dishes
.stream()
.collect(
ArrayList::new, //supplier
List::add, //accumulator
List::addAll //combiner
);
這種方法雖然簡單,但可讀性較差,而且當再次遇到這個需求時還要重寫一遍,復用性差。
關於性能
對於stream提供的幾個收集器已經可以滿足絕大部分開發需求了,reduce提供了各種自定義。但有時候還是需要自定義collector才能實現。文中舉例還是質數枚舉算法。之前我們通過遍歷平方根之內的數字來求質數。這次提出要用得到的質數減少取模運算。然而,悲劇的是我本地測算的結果顯示,這個而所謂的優化版反而比原來的慢100倍。不過,還是把這個自定義收集器列出來。值得銘記的是,這個收集器是有序的,所以不能並行,那個這個combiner方法可以不要的,最好返回UnsupportedOperationException來警示此收集器的非並行性。
public class PrimeNumbersCollector implements
Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {
@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
return () -> {
Map<Boolean, List<Integer>> map = new HashMap<>();
map.put(true, new ArrayList<>());
map.put(false, new ArrayList<>());
return map;
};
}
@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
acc.get(isPrime(acc.get(true), candidate)).add(candidate);
};
}
/**
* 從質數列表里取出來,看看是不是candidate的約數.
*
* @param primes 質數列表
* @param candidate 判斷值
* @return true -> 質數; false->非質數。
*/
private static Boolean isPrime(
List<Integer> primes,
Integer candidate) {
int candidateRoot = (int) Math.sqrt((double) candidate);
return primes.stream().filter(p -> p<=candidateRoot).noneMatch(i -> candidate % i == 0);
}
@Override
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
map1.get(true).addAll(map2.get(true));
map1.get(false).addAll(map2.get(false));
return map1;
};
}
@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
}
}
參考
- Java8 in Action