Java8-理解Collector


上一節學習了Java8中比較常用的內置collector的用法。接下來就來理解下collector的組成。

Collector定義

Collector接口包含了一系列方法,為實現具體的歸約操作(即收集器)提供了范本。我們已經看過了Collector接口中實現的許多收集器,例如toList或groupingBy。這也意味着你可以為Collector接口提供自己的實現,從而自由創建自定義歸約操作。

要開始使用Collector接口,我們先來看看toList的實現方法,這個在日常中使用最頻繁的東西其實也簡單。

Collector接口定義了5個函數

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();
}
  1. T是流中要收集的對象的泛型
  2. A是累加器的類型,累加器是在收集過程中用於累積部分結果的對象。
  3. R是收集操作得到的對象(通常但不一定是集合)的類型。

對於toList, 我們收集的對象是T, 累加器是List , 最終收集的結果也是一個List ,於是創建ToListCollector如下:

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> 

理解Collector幾個函數

建立新的結果容器 supplier方法

supplier方法必須返回一個結果為空的Supplier,也就是一個無參數函數,在調用時,它會創建一個空的累加器實例,供數據收集過程使用。就個人通俗的理解來說,這個方法定義你如何收集數據,之所以提煉出來就是為了讓你可以傳lambda表達式來指定收集器。對於toList, 我們直接返回一個空list就好。

@Override
public Supplier<List<T>> supplier() {
    return ArrayList::new;
}

累加器執行累加的具體實現 accumulator方法

accumulator方法會返回執行歸約操作的函數,該函數將返回void。當遍歷到流中第n個元素時,這個函數就會執行。函數有兩個參數,第一個參數是累計值,第二參數是第n個元素。累加值與元素n如何做運算就是accumulator做的事情了。比如toList, 累加值就是一個List,對於元素n,當然就是add。

@Override
public BiConsumer<List<T>, T> accumulator() {
    return List::add;
}

對結果容器應用最終轉換 finisher方法

當遍歷完流之后,我們需要對結果做一個處理,返回一個我們想要的結果。這就是finisher方法所定義的事情。finisher方法必須返回在累積過程的最后要調用的一個函數,以便將累加器對象轉換為整個集合操作的最終結果, 這個返回的函數在執行時,會有個參數,該參數就是累積值,會有一個返回值,返回值就是我們最終要返回的東西。對於toList, 我最后就只要拿到那個收集的List就好,所以直接返回List。

@Override
public Function<List<T>, List<T>> finisher() {
    return (i) -> i;
}

對於接收一個參數,返回一個value,我們可以想到Function函數,正如finisher()的返回值。對於這個返回參數本身的做法,Function有個靜態方法

static <T> Function<T, T> identity() {
    return t -> t;
}

可以用Function.identity()代替上述lambda表達式。

順序歸約

合並兩個結果容器 combiner

上面看起來似乎已經可以工作了,這是針對順序執行的情況。我們知道Stream天然支持並行,但並行卻不是毫無代價的。想要並行首先就必然要把任務分段,然后才能並行執行,最后還要合並。雖然Stream底層對我們透明的執行了並行,但如何並行還是需要取決於我們自己。這就是combiner要做的事情。combiner方法會返回一個供歸約操作使用的函數,它定義了對流的各個子部分並行處理時,各個字部分歸約所得的累加器要如何合並。對於toList而言,Stream會把流自動的分成幾個並行的部分,每個部分都執行上述的歸約,匯集成一個List。當全部完成后再合並成一個List。

@Override
public BinaryOperator<List<T>> combiner() {

    return (list1, list2) -> {
        list1.addAll(list2);
        return list1;
    };
}

這樣,就可以對流並行歸約了。它會用到Java7引入的分支/合並框架和Spliterator抽象。大概如下所示,

  1. 原始流會以遞歸方式拆分為子流,直到定義流是否進一步拆分的一個條件為非(如果分布式工作單位太小,並行計算往往比順序計算要慢,而且要是生成的並行任務比處理器內核數多很多的話就毫無意義了)。
  2. 現在,所有的子流都可以並行處理,即對每個子流應用順序歸約算法。
  3. 最后,使用收集器combiner方法返回的函數,將所有的部分結果兩兩合並。這時,會把原始流每次拆分得到的子流對應的結果合並起來。

characteristics方法

最后一個方法characteristics會返回一個不可變的Characteristics集合,它定義了收集器的行為--尤其是關於流是否可以並行歸約,以及可以使用哪些優化的提示。

Characteristics是一個包含三個項目的枚舉:

  1. UNORDERED--歸約結果不受流中項目的遍歷和累積順序的影響
  2. CONCURRENT--accumulator函數可以從多個線程同時調用,且該收集器可以並行歸約流。如果收集器沒有標為UNORDERED, 那它僅在用於無序數據源時才可以並行歸約。
  3. IDENTITY_FINISH--這表明完成器方法返回的函數是一個恆等函數,可以跳過。這種情況下,累加器對象將會直接用做歸約過程的最終結果。這也意味着,將累加器A不加檢查地轉換為結果R是安全的。

我們迄今為止ToListCollector是IDENTITY_FINISH的,因為用來累積流中元素的List已經是我們要的最終結果,用不着進一步轉換了,但它並不是UNORDERED,因為用在有序流上的時候,我們還是希望順序能夠保留在得到到List中。最后,他是CONCURRENT的,但我們剛才說過了,僅僅在背后的數據源無序時才會並行處理。

上面這段話說的有點繞口,大概是說像Set生成的stream是無序的,這時候toList就可以並行。而ArrayList這種隊列一樣的數據結構則生成有序的stream,不能並行。

使用

直接傳給collect方法就好。

List<Dish> rs = dishes
            .stream()
            .collect(new ToListCollector<>());

我們這樣費盡心思去創建一個toListCollector,一個是為了熟悉Collector接口的用法,一個是方便重用。當再遇到這樣的需求的時候就可以直接用這個自定義的函數了,所以才有toList()這個靜態方法。否則,其實collect提供了重載函數可以直接定義這幾個函數。比如,可以這樣實現toList

List<Dish> dishes = dishes
                    .stream()
                    .collect(
                        ArrayList::new, //supplier
                        List::add, //accumulator
                        List::addAll //combiner
                    );

這種方法雖然簡單,但可讀性較差,而且當再次遇到這個需求時還要重寫一遍,復用性差。

關於性能

對於stream提供的幾個收集器已經可以滿足絕大部分開發需求了,reduce提供了各種自定義。但有時候還是需要自定義collector才能實現。文中舉例還是質數枚舉算法。之前我們通過遍歷平方根之內的數字來求質數。這次提出要用得到的質數減少取模運算。然而,悲劇的是我本地測算的結果顯示,這個而所謂的優化版反而比原來的慢100倍。不過,還是把這個自定義收集器列出來。值得銘記的是,這個收集器是有序的,所以不能並行,那個這個combiner方法可以不要的,最好返回UnsupportedOperationException來警示此收集器的非並行性。

測試見 https://github.com/Ryan-Miao/l4Java/blob/master/src/test/java/com/test/java/stream/collect/PrimeNumbersCollectorTest.java

public class PrimeNumbersCollector implements
    Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {

    @Override
    public Supplier<Map<Boolean, List<Integer>>> supplier() {
        return () -> {
            Map<Boolean, List<Integer>> map = new HashMap<>();
            map.put(true, new ArrayList<>());
            map.put(false, new ArrayList<>());
            return map;
        };
    }

    @Override
    public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
        return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
            acc.get(isPrime(acc.get(true), candidate)).add(candidate);
        };
    }

    /**
     * 從質數列表里取出來,看看是不是candidate的約數.
     *
     * @param primes 質數列表
     * @param candidate 判斷值
     * @return true -> 質數; false->非質數。
     */
    private static Boolean isPrime(
        List<Integer> primes,
        Integer candidate) {
        int candidateRoot = (int) Math.sqrt((double) candidate);
        return primes.stream().filter(p -> p<=candidateRoot).noneMatch(i -> candidate % i == 0);
    }

    @Override
    public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
        return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
            map1.get(true).addAll(map2.get(true));
            map1.get(false).addAll(map2.get(false));
            return map1;
        };
    }

    @Override
    public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
    }
}

參考

  • Java8 in Action


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM