引子
將行為作為數據傳遞
怎樣在一行代碼里同時計算一個列表的和、最大值、最小值、平均值、元素個數、奇偶分組、指數、排序呢?
答案是思維反轉!將行為作為數據傳遞。 文藝青年的代碼如下所示:
public class FunctionUtil { public static <T,R> List<R> multiGetResult(List<Function<List<T>, R>> functions, List<T> list) { return functions.stream().map(f -> f.apply(list)).collect(Collectors.toList()); } public static void main(String[] args) { System.out.println(multiGetResult( Arrays.asList( list -> list.stream().collect(Collectors.summarizingInt(x->x)), list -> list.stream().filter(x -> x < 50).sorted().collect(Collectors.toList()), list -> list.stream().collect(Collectors.groupingBy(x->(x%2==0? "even": "odd"))), list -> list.stream().sorted().collect(Collectors.toList()), list -> list.stream().sorted().map(Math::sqrt).collect(Collectors.toMap(x->x, y->Math.pow(2,y)))), Arrays.asList(64,49,25,16,9,4,1,81,36))); } }
呃,有點賣弄小聰明。 不過要是能將行為作為數據自由傳遞和施加於數據集產生結果,那么其代碼表達能力將如庄子之言,恣意瀟灑而無所極限。病毒,不就是將行為匿身於數據中的一種表現么?
行為就是數據。
Java8函數框架解讀
函數編程的最直接的表現,莫過於將函數作為數據自由傳遞,結合泛型推導能力,使代碼表達能力獲得飛一般的提升。那么,Java8是怎么支持函數編程的呢?主要有三個核心概念:
- 函數接口(Function)
- 流(Stream)
- 聚合器(Collector)
函數接口
關於函數接口,需要記住的就是兩件事:
- 函數接口是行為的抽象;
- 函數接口是數據轉換器。
最直接的支持就是 java.util.Function 包。定義了四個最基礎的函數接口:
- Supplier<T>: 數據提供器,可以提供 T 類型對象;無參的構造器,提供了 get 方法;
- Function<T,R>: 數據轉換器,接收一個 T 類型的對象,返回一個 R類型的對象; 單參數單返回值的行為接口;提供了 apply, compose, andThen, identity 方法;
- Consumer<T>: 數據消費器, 接收一個 T類型的對象,無返回值,通常用於設置T對象的值; 單參數無返回值的行為接口;提供了 accept, andThen 方法;
- Predicate<T>: 條件測試器,接收一個 T 類型的對象,返回布爾值,通常用於傳遞條件函數; 單參數布爾值的條件性接口。提供了 test (條件測試) , and-or- negate(與或非) 方法。
其中, compose, andThen, and, or, negate 用來組合函數接口而得到更強大的函數接口。
其它的函數接口都是通過這四個擴展而來。
- 在參數個數上擴展: 比如接收雙參數的,有 Bi 前綴, 比如 BiConsumer<T,U>, BiFunction<T,U,R> ;
- 在類型上擴展: 比如接收原子類型參數的,有 [Int|Double|Long][Function|Consumer|Supplier|Predicate]
- 特殊常用的變形: 比如 BinaryOperator , 是同類型的雙參數 BiFunction<T,T,T> ,二元操作符 ; UnaryOperator 是 Function<T,T> 一元操作符。
那么,這些函數接口可以接收哪些值呢?
- 類/對象的靜態方法引用、實例方法引用。引用符號為雙冒號 ::
- 類的構造器引用,比如 Class::new
- lambda表達式
在博文“使用函數接口和枚舉實現配置式編程(Java與Scala實現)”, “精練代碼:一次Java函數式編程的重構之旅” 給出了基本的例子。后面還有更多例子。重在自己練習和嘗試。
聚合器
先說聚合器。每一個流式計算的末尾總有一個類似 collect(Collectors.toList()) 的方法調用。collect 是 Stream 的方法,而參數則是聚合器Collector。已有的聚合器定義在Collectors 的靜態方法里。 那么這個聚合器是怎么實現的呢?
Reduce
大部分聚合器都是基於 Reduce 操作實現的。 Reduce ,名曰推導,含有三個要素: 初始值 init, 二元操作符 BinaryOperator, 以及一個用於聚合結果的數據源S。
Reduce 的算法如下:
STEP1: 初始化結果 R = init ;
STEP2: 每次從 S 中取出一個值 v,通過二元操作符施加到 R 和 v ,產生一個新值賦給 R = BinaryOperator(R, v);重復 STEP2, 直到 S 中沒有值可取為止。
比如一個列表求和,Sum([1,2,3]) , 那么定義一個初始值 0 以及一個二元加法操作 BO = a + b ,通過三步完成 Reduce 操作:step1: R = 0; step2: v=1, R = 0+v = 1; step2: v=2, R = 1 + v = 3 ; step3: v = 3, R = 3 + v = 6。
四要素
一個聚合器的實現,通常需要提供四要素:
- 一個結果容器的初始值提供器 supplier ;
- 一個用於將每次二元操作的中間結果與結果容器的值進行操作並重新設置結果容器的累積器 accumulator ;
- 一個用於對Stream元素和中間結果進行操作的二元操作符 combiner ;
- 一個用於對結果容器進行最終聚合的轉換器 finisher(可選) 。
Collectors.CollectorImpl 的實現展示了這一點:
static class CollectorImpl<T, A, R> implements Collector<T, A, R> { private final Supplier<A> supplier; private final BiConsumer<A, T> accumulator; private final BinaryOperator<A> combiner; private final Function<A, R> finisher; private final Set<Characteristics> characteristics; CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Set<Characteristics> characteristics) { this.supplier = supplier; this.accumulator = accumulator; this.combiner = combiner; this.finisher = finisher; this.characteristics = characteristics; } }
列表類聚合器
列表類聚合器實現,基本是基於Reduce 操作完成的。 看如下代碼:
public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID);
首先使用 ArrayList::new 創造一個空列表; 然后 List:add 將Stream累積操作的中間結果加入到這個列表;第三個函數則將兩個列表元素進行合並成一個結果列表中。 就是這么簡單。集合聚合器 toSet(), 字符串連接器 joining(),以及列表求和(summingXXX)、最大(maxBy)、最小值(minBy)等都是這個套路。
映射類聚合器
映射類聚合器基於Map合並來完成。看這段代碼:
private static <K, V, M extends Map<K,V>> BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) { return (m1, m2) -> { for (Map.Entry<K,V> e : m2.entrySet()) m1.merge(e.getKey(), e.getValue(), mergeFunction); return m1; }; }
根據指定的值合並函數 mergeFunction, 返回一個map合並器,用來合並兩個map里相同key的值。mergeFunction用來對兩個map中相同key的值進行運算得到新的value值,如果value值為null,會移除相應的key,否則使用value值作為對應key的值。這個方法是私有的,主要為支撐 toMap,groupingBy 而生。
toMap的實現很簡短,實際上就是將指定stream的每個元素分別使用給定函數keyMapper, valueMapper進行映射得到 newKey, newValue,從而形成新的Map[newKey,newValue] (1), 再使用mapMerger(mergeFunction) 生成的 map 合並器將其合並到 mapSupplier (2)。如果只傳 keyMapper, valueMapper,那么就只得到結果(1)。
public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction, Supplier<M> mapSupplier) { BiConsumer<M, T> accumulator = (map, element) -> map.merge(keyMapper.apply(element), valueMapper.apply(element), mergeFunction); return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID); }
toMap 的一個示例見如下代碼:
List<Integer> list = Arrays.asList(1,2,3,4,5); Supplier<Map<Integer,Integer>> mapSupplier = () -> list.stream().collect(Collectors.toMap(x->x, y-> y * y)); Map<Integer, Integer> mapValueAdd = list.stream().collect(Collectors.toMap(x->x, y->y, (v1,v2) -> v1+v2, mapSupplier)); System.out.println(mapValueAdd);
將一個 List 轉成 map[1=1,2=2,3=3,4=4,5=5],然后與另一個map[1=1,2=4,3=9,4=16,5=25]的相同key的value進行相加。注意到, toMap 的最后一個參數是 Supplier<Map> , 是 Map 提供器,而不是 Map 對象。如果用Map對象,會報 no instances of type variables M exists so that conforms to Supplier<M>。 在函數式編程的世界里,通常是用函數來表達和組合的。
自定義聚合器
讓我們仿照 Collectors.toList() 做一個自定義的聚合器。實現一個含N個數的斐波那契序列 List<Integer>。由於 Reduce 每次都從流中取一個數,因此需要生產一個含N個數的stream;可使用 Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream() , 亦可使用 IntStream.range(1,11) ,不過兩者的 collector 方法是不一樣的。這里我們取前者。
現在,需要構造四要素:
- 可變的結果容器提供器 Supplier<List<Integer>> = () -> [0, 1] ; 注意這里不能使用 Arrays.asList , 因為該方法生成的列表是不可變的。
- 累積器 BiConsumer<List<Integer>, Integer> accumulator(): 這里流的元素未用,僅僅用來使計算進行和終止。新的元素從結果容器中取最后兩個相加后產生新的結果放到結果容器中。
- 組合器 BinaryOperator<List<Integer>> combiner() : 照葫蘆畫瓢,目前沒看出這步是做什么用;直接 return null; 也是OK的。
- 最終轉換器 Function<List<Integer>, List<Integer>> finisher() :在最終轉換器中,移除初始設置的兩個值 0, 1 。
代碼如下:
/** * Created by shuqin on 17/12/5. */ public class FiboCollector implements Collector<Integer, List<Integer>, List<Integer>> { public Supplier<List<Integer>> supplier() { return () -> { List<Integer> result = new ArrayList<>(); result.add(0); result.add(1); return result; }; } @Override public BiConsumer<List<Integer>, Integer> accumulator() { return (res, num) -> { Integer next = res.get(res.size()-1) + res.get(res.size()-2); res.add(next); }; } @Override public BinaryOperator<List<Integer>> combiner() { return null; //return (left, right) -> { left.addAll(right); return left; }; } @Override public Function<List<Integer>, List<Integer>> finisher() { return res -> { res.remove(0); res.remove(1); return res; }; } @Override public Set<Characteristics> characteristics() { return Collections.emptySet(); } } List<Integer> fibo = Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream().collect(new FiboCollector()); System.out.println(fibo);
流
流(Stream)是Java8對函數式編程的重要支撐。大部分函數式工具都圍繞Stream展開。
Stream的接口
Stream 主要有四類接口:
- 流到流之間的轉換:比如 filter(過濾), map(映射轉換), mapTo[Int|Long|Double] (到原子類型流的轉換), flatMap(高維結構平鋪),flatMapTo[Int|Long|Double], sorted(排序),distinct(不重復值),peek(執行某種操作,流不變,可用於調試),limit(限制到指定元素數量), skip(跳過若干元素) ;
- 流到終值的轉換: 比如 toArray(轉為數組),reduce(推導結果),collect(聚合結果),min(最小值), max(最大值), count (元素個數), anyMatch (任一匹配), allMatch(所有都匹配), noneMatch(一個都不匹配), findFirst(選擇首元素),findAny(任選一元素) ;
- 直接遍歷: forEach (不保序遍歷,比如並行流), forEachOrdered(保序遍歷) ;
- 構造流: empty (構造空流),of (單個元素的流及多元素順序流),iterate (無限長度的有序順序流),generate (將數據提供器轉換成無限非有序的順序流), concat (流的連接), Builder (用於構造流的Builder對象)
除了 Stream 本身自帶的生成Stream 的方法,數組和容器及StreamSupport都有轉換為流的方法。比如 Arrays.stream , [List|Set|Collection].[stream|parallelStream] , StreamSupport.[int|long|double|]stream;
流的類型主要有:Reference(對象流), IntStream (int元素流), LongStream (long元素流), Double (double元素流) ,定義在類 StreamShape 中,主要將操作適配於類型系統。
flatMap 的一個例子見如下所示,將一個二維數組轉換為一維數組:
List<Integer> nums = Arrays.asList(Arrays.asList(1,2,3), Arrays.asList(1,4,9), Arrays.asList(1,8,27)) .stream().flatMap(x -> x.stream()).collect(Collectors.toList()); System.out.println(nums);
collector實現
這里我們僅分析串行是怎么實現的。入口在類 java.util.stream.ReferencePipeline 的 collect 方法:
container = evaluate(ReduceOps.makeRef(collector)); return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
這里的關鍵是 ReduceOps.makeRef(collector)。 點進去:
public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; } private static abstract class Box<U> { U state; Box() {} // Avoid creation of special accessor public U get() { return state; } }
Box 是一個結果值的持有者; ReducingSink 用begin, accept, combine 三個方法定義了要進行的計算;ReducingSink是有狀態的流數據消費的計算抽象,閱讀Sink接口文檔可知。ReduceOps.makeRef(collector) 返回了一個封裝了Reduce操作的ReduceOps對象。注意到,這里都是聲明要執行的計算,而不涉及計算的實際過程。展示了表達與執行分離的思想。真正的計算過程啟動在 ReferencePipeline.evaluate 方法里:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
使用 IDE 的 go to implementations 功能, 跟進去,可以發現,最終在 AbstractPipeLine 中定義了:
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
Spliterator 用來對流中的元素進行分區和遍歷以及施加Sink指定操作,可以用於並發計算。Spliterator的具體實現類定義在 Spliterators 的靜態類和靜態方法中。其中有:
數組Spliterator:
static final class ArraySpliterator<T> implements Spliterator<T> static final class IntArraySpliterator implements Spliterator.OfInt static final class LongArraySpliterator implements Spliterator.OfLong static final class DoubleArraySpliterator implements Spliterator.OfDouble 迭代Spliterator: static class IteratorSpliterator<T> implements Spliterator<T> static final class IntIteratorSpliterator implements Spliterator.OfInt static final class LongIteratorSpliterator implements Spliterator.OfLong static final class DoubleIteratorSpliterator implements Spliterator.OfDouble 抽象Spliterator: public static abstract class AbstractSpliterator<T> implements Spliterator<T> private static abstract class EmptySpliterator<T, S extends Spliterator<T>, C> public static abstract class AbstractIntSpliterator implements Spliterator.OfInt public 