接上篇繼續學習各種方法:
4.9、reduce/reduceWith
@Test
public void reduceTest() {
Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);
}
輸出:
55 65
上面的代碼,reduce相當於把1到10累加求和,reduceWith則是先指定一個起始值,然后在這個起始值基礎上再累加。(tips: 除了累加,還可以做階乘)
reduce示意圖:

reduceWith示意圖:

4.10、merge/mergeSequential/contact
@Test
public void mergeTest() {
Flux.merge(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
.toStream().forEach(System.out::println);
System.out.println("-----------------------------");
Flux.mergeSequential(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
.toStream().forEach(System.out::println);
}
merge就是將把多個Flux"按元素實際產生的順序"合並,而mergeSequential則是按多個Flux"被訂閱的順序"來合並,以上面的代碼來說,二個Flux,從時間上看,元素是交替產生的,所以merge的輸出結果,是混在一起的,而mergeSequential則是能分出Flux整體的先后順序。
0 0 1 1 2 2 3 3 4 4 ----------------------------- 0 1 2 3 4 0 1 2 3 4
merge示意圖:

mergeSequential示意圖:

與mergeSequential類似的,還有一個contact方法,示意圖如下:

4.11、combineLatest
@Test
public void combineLatestTest() {
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(3),
Flux.just("A", "B"))
.toStream().forEach(System.out::println);
System.out.println("------------------");
Flux.combineLatest(
Arrays::toString,
Flux.just(0, 1),
Flux.just("A", "B"))
.toStream().forEach(System.out::println);
System.out.println("------------------");
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.of(1000, ChronoUnit.MILLIS)).take(2),
Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(2))
.toStream().forEach(System.out::println);
}
該操作會將所有流中的最新產生的元素合並成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合並操作就會被執行一次。
分析一下第一段輸出:
第1個Flux用了延時生成,第1個數字0,10秒后才產生,這時第2個Flux中的A,B早就生成完畢,所以此時二個Flux中最新生在的元素,就是[0,B],類似的,10秒后,第2個數字1依次產生,再執行1次合並,生成[1,B]...
輸出:
[0, B] [1, B] [2, B] ------------------ [1, A] [1, B] ------------------ [1, 0] [1, 1]
示意圖如下:

4.12、first
@Test
public void firstTest() {
Flux.first(Flux.fromArray(new String[]{"A", "B"}),
Flux.just(1, 2, 3))
.subscribe(System.out::println);
}
這個很簡單理解,多個Flux,只取第1個Flux的元素。輸出如下:
A B
示意圖:
4.13、 map
@Test
public void mapTest() {
Flux.just('A', 'B', 'C').map(a -> (int) (a)).subscribe(System.out::println);
}
map相當於把一種類型的元素序列,轉換成另一種類型,輸出如下:
65 66 67
示意圖:
五、消息處理
寫代碼時,難免會遇到各種異常或錯誤,所謂消息處理,就是指如何處理這些異常。
5.1 訂閱錯誤消息
@Test
public void subscribeTest1() {
Flux.just("A", "B", "C")
.concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
.subscribe(System.out::println, System.err::println);
}
注意:這里subscribe第2個參數,指定了System.err::println ,即錯誤消息,輸出到異常控制台上。
輸出效果:

示意圖:

5.2 onErrorReturn
即:遇到錯誤時,用其它指定值返回
@Test
public void subscribeTest2() {
Flux.just("A", "B", "C")
.concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
.onErrorReturn("X")
.subscribe(System.out::println, System.err::println);
}
輸出:
A B C X
示意圖:

5.3 onErrorResume
跟onErrorReturn有點接近,但更靈活,可以根據異常的類型,有選擇性的處理返回值。
@Test
public void subscribeTest3() {
Flux.just("A", "B", "C")
.concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
.onErrorResume(e -> {
if (e instanceof IndexOutOfBoundsException) {
return Flux.just("X", "Y", "Z");
} else {
return Mono.empty();
}
})
.subscribe(System.out::println, System.err::println);
}
輸出:
A B C X Y Z
示意圖:
5.4 retry
即:遇到異常,就重試。
@Test
public void subscribeTest4() {
Flux.just("A", "B", "C")
.concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
.retry(1)
.subscribe(System.out::println, System.err::println);
}
輸出:

示意圖:

六、(線程)調度器
reactor中到處充滿了異步調用,內部必然有一堆線程調度,Schedulers提供了如下幾種調用策略:
6.1 Schedulers.immediate() - 使用當前線程
6.2 Schedulers.elastic() - 使用線程池
6.3 Schedulers.newElastic("test1") - 使用(新)線程池(可以指定名稱,更方便調試)
6.4 Schedulers.single() - 單個線程
6.5 Schedulers.newSingle("test2") - (新)單個線程(可以指定名稱,更方便調試)
6.6 Schedulers.parallel() - 使用並行處理的線程池(取決於CPU核數)
6.7 Schedulers.newParallel("test3") - 使用並行處理的線程池(取決於CPU核數,可以指定名稱,方便調試)
6.8 Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(這個最靈活)
示例代碼:
@Test
public void schedulesTest() {
Flux.fromArray(new String[]{"A", "B", "C", "D"})
.publishOn(Schedulers.newSingle("TEST-SINGLE", true))
.map(x -> String.format("[%s]: %s", Thread.currentThread().getName(), x))
.toStream()
.forEach(System.out::println);
}
輸出:
[TEST-SINGLE-1]: A [TEST-SINGLE-1]: B [TEST-SINGLE-1]: C [TEST-SINGLE-1]: D
七、測試&調試
異步處理,通常是比較難測試的,reactor提供了StepVerifier工具來進行測試。
7.1 常規單元測試
@Test
public void stepTest() {
StepVerifier.create(Flux.just(1, 2)
.concatWith(Mono.error(new IndexOutOfBoundsException("test")))
.onErrorReturn(3))
.expectNext(1)
.expectNext(2)
.expectNext(3)
.verifyComplete();
}
上面的示例,Flux先生成1,2這兩個元素,然后拋了個錯誤,但馬上用onErrorReturn處理了異常,所以最終應該是期待1,2,3,complete這樣的序列。
7.2 模擬時間流逝
Flux.interval這類延時操作,如果延時較大,比如幾個小時之類的,要真實模擬的話,效率很低,StepVerifier提供了withVirtualTime方法,來模擬加快時間的流逝(是不是很體貼^_^)
@Test
public void stepTest2() {
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.of(10, ChronoUnit.MINUTES),
Duration.of(5, ChronoUnit.SECONDS))
.take(2))
.expectSubscription()
.expectNoEvent(Duration.of(10, ChronoUnit.MINUTES))
.thenAwait(Duration.of(5, ChronoUnit.SECONDS))
.expectNext(0L)
.thenAwait(Duration.of(5, ChronoUnit.SECONDS))
.expectNext(1L)
.verifyComplete();
}
上面這個Flux先停10分鍾,然后每隔5秒生成一個數字,然后取前2個數字。代碼先調用
expectSubscription 期待流被訂閱,然后 expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) 期望10分鍾內,無任何事件(即:驗證Flux先暫停10分鍾),然后 thenAwait(Duration.of(5, ChronoUnit.SECONDS)) 等5秒鍾,這時已經生成了數字0 expectNext(0L) 期待0L ... 后面的大家自行理解吧。
7.3 記錄日志
@Test
public void publisherTest() {
Flux.just(1, 0)
.map(c -> 1 / c)
.log("MY-TEST")
.subscribe(System.out::println);
}
輸出:

示意圖:

7.4 checkpoint檢查點
可以在一些懷疑的地方,加上checkpoint檢查,參考下面的代碼:
@Test
public void publisherTest() {
Flux.just(1, 0)
.map(c -> 1 / c)
.checkpoint("AAA")
.subscribe(System.out::println);
}
輸出:

