Reactor 3 學習筆記(2)


上篇繼續學習各種方法:

4.9、reduce/reduceWith

    @Test
    public void reduceTest() {
        Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
        Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);
    }

輸出:

55
65

上面的代碼,reduce相當於把1到10累加求和,reduceWith則是先指定一個起始值,然后在這個起始值基礎上再累加。(tips: 除了累加,還可以做階乘) 

reduce示意圖:

reduceWith示意圖:

 

4.10、merge/mergeSequential/contact

    @Test
    public void mergeTest() {
        Flux.merge(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);

        System.out.println("-----------------------------");

        Flux.mergeSequential(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);
    }  

merge就是將把多個Flux"按元素實際產生的順序"合並,而mergeSequential則是按多個Flux"被訂閱的順序"來合並,以上面的代碼來說,二個Flux,從時間上看,元素是交替產生的,所以merge的輸出結果,是混在一起的,而mergeSequential則是能分出Flux整體的先后順序。

0
0
1
1
2
2
3
3
4
4
-----------------------------
0
1
2
3
4
0
1
2
3
4

merge示意圖:

mergeSequential示意圖:

 

與mergeSequential類似的,還有一個contact方法,示意圖如下:

 

4.11、combineLatest

    @Test
    public void combineLatestTest() {
        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(3),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);

        System.out.println("------------------");

        Flux.combineLatest(
                Arrays::toString,
                Flux.just(0, 1),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);

        System.out.println("------------------");

        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(1000, ChronoUnit.MILLIS)).take(2),
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(2))
                .toStream().forEach(System.out::println);
    }

該操作會將所有流中的最新產生的元素合並成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合並操作就會被執行一次。

分析一下第一段輸出:

第1個Flux用了延時生成,第1個數字0,10秒后才產生,這時第2個Flux中的A,B早就生成完畢,所以此時二個Flux中最新生在的元素,就是[0,B],類似的,10秒后,第2個數字1依次產生,再執行1次合並,生成[1,B]...

輸出:

[0, B]
[1, B]
[2, B]
------------------
[1, A]
[1, B]
------------------
[1, 0]
[1, 1]

示意圖如下:

 

4.12、first

    @Test
    public void firstTest() {
        Flux.first(Flux.fromArray(new String[]{"A", "B"}),
                Flux.just(1, 2, 3))
                .subscribe(System.out::println);
    }

這個很簡單理解,多個Flux,只取第1個Flux的元素。輸出如下:

A
B

示意圖:

 

 

4.13、 map

    @Test
    public void mapTest() {
        Flux.just('A', 'B', 'C').map(a -> (int) (a)).subscribe(System.out::println);
    }

 map相當於把一種類型的元素序列,轉換成另一種類型,輸出如下:

65
66
67

示意圖:

 

 

五、消息處理

寫代碼時,難免會遇到各種異常或錯誤,所謂消息處理,就是指如何處理這些異常。

5.1 訂閱錯誤消息

    @Test
    public void subscribeTest1() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .subscribe(System.out::println, System.err::println);
    }

注意:這里subscribe第2個參數,指定了System.err::println ,即錯誤消息,輸出到異常控制台上。

輸出效果:

示意圖:

 

5.2 onErrorReturn

即:遇到錯誤時,用其它指定值返回

    @Test
    public void subscribeTest2() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .onErrorReturn("X")
                .subscribe(System.out::println, System.err::println);
    }

輸出:

A
B
C
X

示意圖:

 

5.3 onErrorResume

跟onErrorReturn有點接近,但更靈活,可以根據異常的類型,有選擇性的處理返回值。

    @Test
    public void subscribeTest3() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .onErrorResume(e -> {
                    if (e instanceof IndexOutOfBoundsException) {
                        return Flux.just("X", "Y", "Z");
                    } else {
                        return Mono.empty();
                    }
                })
                .subscribe(System.out::println, System.err::println);
    }

輸出:

A
B
C
X
Y
Z

示意圖:

 

 

5.4 retry

即:遇到異常,就重試。

    @Test
    public void subscribeTest4() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!")))
                .retry(1)
                .subscribe(System.out::println, System.err::println);
    }

輸出:

示意圖:

 

六、(線程)調度器

reactor中到處充滿了異步調用,內部必然有一堆線程調度,Schedulers提供了如下幾種調用策略:

6.1 Schedulers.immediate() - 使用當前線程
6.2 Schedulers.elastic() - 使用線程池
6.3 Schedulers.newElastic("test1") - 使用(新)線程池(可以指定名稱,更方便調試)
6.4 Schedulers.single() - 單個線程
6.5 Schedulers.newSingle("test2") - (新)單個線程(可以指定名稱,更方便調試)
6.6 Schedulers.parallel() - 使用並行處理的線程池(取決於CPU核數)
6.7 Schedulers.newParallel("test3")  - 使用並行處理的線程池(取決於CPU核數,可以指定名稱,方便調試)
6.8 Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(這個最靈活)

示例代碼:

    @Test
    public void schedulesTest() {
        Flux.fromArray(new String[]{"A", "B", "C", "D"})
                .publishOn(Schedulers.newSingle("TEST-SINGLE", true))
                .map(x -> String.format("[%s]: %s", Thread.currentThread().getName(), x))
                .toStream()
                .forEach(System.out::println);
    }

輸出: 

[TEST-SINGLE-1]: A
[TEST-SINGLE-1]: B
[TEST-SINGLE-1]: C
[TEST-SINGLE-1]: D

 

七、測試&調試

異步處理,通常是比較難測試的,reactor提供了StepVerifier工具來進行測試。

7.1 常規單元測試

    @Test
    public void stepTest() {
        StepVerifier.create(Flux.just(1, 2)
                .concatWith(Mono.error(new IndexOutOfBoundsException("test")))
                .onErrorReturn(3))
                .expectNext(1)
                .expectNext(2)
                .expectNext(3)
                .verifyComplete();
    }

上面的示例,Flux先生成1,2這兩個元素,然后拋了個錯誤,但馬上用onErrorReturn處理了異常,所以最終應該是期待1,2,3,complete這樣的序列。 

 

7.2 模擬時間流逝

Flux.interval這類延時操作,如果延時較大,比如幾個小時之類的,要真實模擬的話,效率很低,StepVerifier提供了withVirtualTime方法,來模擬加快時間的流逝(是不是很體貼^_^)

    @Test
    public void stepTest2() {
        StepVerifier.withVirtualTime(() -> Flux.interval(Duration.of(10, ChronoUnit.MINUTES),
                Duration.of(5, ChronoUnit.SECONDS))
                .take(2))
                .expectSubscription()
                .expectNoEvent(Duration.of(10, ChronoUnit.MINUTES))
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(0L)
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(1L)
                .verifyComplete();
    }

上面這個Flux先停10分鍾,然后每隔5秒生成一個數字,然后取前2個數字。代碼先調用

expectSubscription 期待流被訂閱,然后
expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) 期望10分鍾內,無任何事件(即:驗證Flux先暫停10分鍾),然后
thenAwait(Duration.of(5, ChronoUnit.SECONDS)) 等5秒鍾,這時已經生成了數字0
expectNext(0L) 期待0L
... 后面的大家自行理解吧。

 

7.3 記錄日志 

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .log("MY-TEST")
                .subscribe(System.out::println);
    }

輸出:

示意圖:

 

7.4 checkpoint檢查點

可以在一些懷疑的地方,加上checkpoint檢查,參考下面的代碼:

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .checkpoint("AAA")
                .subscribe(System.out::println);
    }

輸出:

點擊查看原圖

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM