Reactor 操作符
數據在響應式流中的處理,就像流過一條裝配流水線。Reactor 既是傳送帶,又是一個個的裝配工或機器人。原材料從源頭(最初的 Publisher )流出,經過一個個的裝配線中裝配工或機器人的工位加工(operator 操作),最終被加工成成品,等待被推送到消費者( subscribe 操作)。
在 Reactor 中,每個操作符對 Publisher 進行處理,然后將 Publisher 包裝為另一個新的 Publisher 。就像一個鏈條,數據源自第一個 Publisher ,然后順鏈條而下,在每個環節進行相應的處理。最終,訂閱者(Subscriber )終結這個過程。所以, 響應式編程按照鏈式方式進行開發。
注意,如同 Java Stream 的終端操作,訂閱者( Subscriber )在沒有訂閱( subscribe )到一個發布者( Publisher )之前,什么也不會發生。
如同 Java Stream 的中間操作一樣,Reactor 的 Flux 和 Mono 也為我們提供了多種操作符(遠多於 Stream ),我們將它們分類如下:
| 序號 | 類型 | 操作符 |
|---|---|---|
| 1 | 轉換 | as, cast, collect, collectList, collectMap, collectMultimap, collectSortedList, concatMap, concatMapDelayError, concatMapIterable, elapsed, expand, expandDeep, flatMap, flatMapDelayError, flatMapIterable, flatMapSequential, flatMapSequentialDelayError, groupJoin, handle, index, join, map, switchMap, switchOnFirst, then, thenEmpty, thenMany, timestamp, transform, transformDeferred |
| 2 | 篩選 | blockFirst, blockLast, distinct, distinctUntilChanged, elementAt, filter, filterWhen, ignoreElements, last, next, ofType, or, repeat, retry, single, singleOrEmpty, sort, take, takeLast, takeUntil, takeUntilOther, takeWhile |
| 3 | 組合 | concatWith, concatWithValues, mergeOrderWith, mergeWith, startWith, withLatestFrom, zipWith, zipWithIterable |
| 4 | 條件 | defaultIfEmpty, delayUntil, retryWhen, switchIfEmpty |
| 5 | 時間 | delayElements, delaySequence, delaySubscription, sample, sampleFirst, sampleTimeout, skip, skipLast, skipUntil, skipUntilOther, skipWhile, timeout |
| 6 | 統計 | count, reduce, reduceWith, scan, scanWith |
| 7 | 匹配 | all, any, hasElement, hasElements |
| 8 | 分組 | buffer, bufferTimeout, bufferUntil, bufferUntilChanged, bufferWhen, groupBy, window, windowTimeout, windowUntil, windowUntilChanged, windowWhen, windowWhile |
| 9 | 事件 | doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, onBackpressureBuffer, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorMap, onErrorResume, onErrorReturn, onErrorStop |
| 10 | 調試 | checkpoint, hide, log |
| 11 | 其它 | cache, dematerialize, limitRate, limitRequest, materialize, metrics, name, onTerminateDetach, parallel, publish, publishNext, publishOn, replay, share, subscribeOn, subscriberContext, subscribeWith, tag |
接下來我們來挨個學習各類的操作符,如同前面學習響應式流創建一樣,講解操作符時,如果是 Flux 或 Mono 獨有的,會在方法名前增加類名前綴。
轉換類操作符
轉換類的操作符數量最多,平常過程中也是使用最頻繁的。
as
將響應式流轉換為目標類型,既可以是非響應式對象,也可以是 Flux 或 Mono。
Flux.range(3, 8)
.as(Mono::from)
.subscribe(System.out::println);
cast
將響應式流內的元素強轉為目標類型,如果類型不匹配(非父類類型或當前類型),將拋出 ClassCastException ,見圖知意:

Flux.range(1, 3)
.cast(Number.class)
.subscribe(System.out::println);
Flux#collect
通過應用收集器,將 Flux 發出的所有元素收集到一個容器中。當此流完成時,發出收集的結果。 Flux 提供了 2 個重載方法,主要區別在於應用的收集器不同,一個是 Java Stream 的 Collector, 另一個是自定義收集方法(同 Java Stream 中 collect 方法):
<R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector);
<E> Mono<E> collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector);
見圖知意:

Flux.range(1, 5)
.collect(Collectors.toList())
.subscribe(System.out::println);
Flux#collectList
當此 Flux 完成時,將此流發出的所有元素收集到一個列表中,該列表由生成的 Mono 發出。見圖知意:

Flux.range(1, 5)
.collectList()
.subscribe(System.out::println);
Flux#collectMap
將 Flux 發出的所有元素按照鍵生成器和值生成器收集到 Map 中,之后由 Mono 發出。Flux 提供了 3 個重載方法:
<K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier);
它們的主要區別在於是否提供值生成器和初始的Map,意同 Java Stream 中的 Collectors#toMap 。見圖知意:

Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectMultimap
collectMultimap 與 collectMap 的區別在於,map 中的 value 類型不同,一個是集合,一個是元素。 collectMultimap 對於流中出現重復的 key 的 value,加入到了集合中,而 collectMap 做了替換。在這點上,reactor 不如 Java Stream 中的 Collectors#toMap 方法,沒有提供 key 重復時的合並函數。也提供了 3 個重載方法。
<K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
見圖知意:

Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMultimap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectSortedList
將 Flux 發出的元素在完成時進行排序,之后由 Mono 發出。Flux 提供了 2 個重載方法:
Mono<List<T>> collectSortedList();
Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);
見圖知意:

Flux.just(1, 3, 5, 3, 2, 5, 1, 4)
.collectSortedList()
.subscribe(System.out::println);
總結
本篇我們介紹了 Reactor 操作符的分類,之后介紹了部分轉換類操作符,講解示例時都是單個操作符,相信大家都能理解。
今天的內容就學到這里,我們下篇繼續學習 Reactor 的操作符。
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperatorTest 測試類。
