學習響應式編程 Reactor (2) - 初識 reactor


Reactor

Reactor 是用於 Java 的異步非阻塞響應式編程框架,同時具備背壓控制的能力。它與 Java 8 函數式 Api 直接集成,比如 分為CompletableFuture、Stream、以及 Duration
。它提供了異步 Api 響應流 Flux (用於 [0 - N] 個元素)和 Mono (用於 [0或1] 個元素),並完全遵守和實現了響應式規范。

引入 reactor

reactor 自 3.0.4 版本之后,采用了 BOM (Bill Of Materials)的方式,使用 BOM 可以管理一組良好集成的 maven artifacts,而無需擔心不同版本組件之間的相互依賴問題,在 maven 項目中在 dependencyManagement 中 加入 reactor 的 bom 定義即可。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Dysprosium-SR8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

在需要使用 reactor 的項目中,依賴對應 reactor 模塊即可。

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
</dependencies>

在我學習的過程中,reactor 的最新版本是 Dysprosium-SR8 ,它的命名來自元素周期表,按順序遞增。通過 https://github.com/reactor/reactor/releases 獲取最新版本。

reactor bom 中 包含了如下組件:

序號 模塊 artifactId 說明
1 Reactor Core reactor-core 基於 Java 8 的響應式流規范實現
2 Reactor Test reactor-test reactor 的測試工具集
3 Reactor Extra reactor-extra 為 Flux 額外提供的操作符
4 Reactor Netty reactor-netty 基於 Netty 實現的 TCP、UDP、HTTP 的客戶端和服務端
5 Reactor Adapter reactor-adapter 和其他響應式庫(如RxJava2、SWT Scheduler、 Akka Scheduler)的適配器
6 Reactor Kafka reactor-kafka Apache Kafka 的響應式橋接實現
7 Reactor Kotlin Extensions reactor-kotlin-extensions 在 Dysprosium 版本后額外提供的 Kotlin 擴展
8 Reactor RabbitMQ reactor-rabbitmq RabbitMQ 的響應式橋接實現
9 Reactor Pool reactor-pool 響應式應用程序的通用對象池
10 Reactor Tools reactor-tools 一組用於改善 Project Reactor 調試和開發經驗的工具。

序號 [1 - 3] 為我們學習 Reactor 過程中主要涉及的模塊,序號 [4 - 9] 在我們學習 Spring WebFlux 的過程中會有所涉及,序號 [10] 是用於 Reactor 調試的,下面會講到。

使用 gradle 的同學請自行百度。
如果需要嘗鮮 Reactor 里程碑版或開發者預覽版的同學,添加 Spring Milestones repository 的倉庫即可。

Reactor 之 初體驗

上面說了那么多,我們先來體驗下 Reactor。

在學習 Java Stream 的環節中,不知是否有同學有提出這樣的疑問:在進行中間操作或終端消費操縱時,如何獲取流中元素的序號值呢?
假如有如下單詞 [ the, quick, brown, fox, jumped, over, the, lazy, dog ] ,使用 Stream 可否實現輸出時並打印每個單詞的序號呢?
仔細想想,似乎沒有直接的辦法可以獲取,我們只能通過外部創建變量獲取並遞增來實現。

來看下 Stream 的實現:

AtomicInteger index = new AtomicInteger(1);
Arrays.stream(WORDS)
        .map(word -> StrUtil.format("{}. {}", index.getAndIncrement(), word))
        .forEach(System.out::println);

來看下 Reactor 的實現:

Flux.fromArray(WORDS)                                                   // ①
        .zipWith(Flux.range(1, Integer.MAX_VALUE),                      // ②
                (word, index) -> StrUtil.format("{}. {}", index, word)) // ③
        .subscribe(System.out::println);                                // ④

先不看 Reactor 代碼的含義,感覺怎么樣,Reactor 的代碼看起來是不是更清新一點,沒有定義任何三方變量解決了這個問題。

有了 Stream 的基礎,Reactor 的代碼很容易理解了,我們稍微來解釋下 Reactor 上段的代碼:

  1. 序號① 的代碼 Flux 是我們之前提到的 一個能夠發出 0 到 N 個元素的響應流發布者,fromArray 是它的靜態方法,用來創建 Flux 響應流
  2. 序號② 的代碼 Flux 的 range 操作符和 Stream 的 range 相同,用來生成 整數 Flux 響應流;zipWith 操作符用來合並兩個 Flux,並將響應流中的元素一一對應,當其中一個響應流完成時,合並結束,之前未結束的響應流剩下的元素將被忽略
  3. 序號③ 的代碼 zipWith 操作符 支持傳遞一個 BiFunction 的函數式接口實現,定義如何來合並兩個數據流中的元素,本例中我們將索引和單詞連接起來
  4. 序號④ 的代碼 subscribe 即為訂閱方法,此處我們做了數據流中元素輸出至控制台

Reactor 之 測試 & 調試

測試

Reactor 的測試需要依賴測試模塊:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

編寫測試代碼如下:

// 創建 Flux 響應流
Flux<String> source = Flux.just("foo", "bar");
// 使用 concatWith 操作符連接 2 個響應流
Flux<String> boom = source.concatWith(Mono.error(new IllegalArgumentException("boom")));
// 創建一個 StepVerifier 構造器來包裝和校驗一個 Flux。
StepVerifier.create(boom)
        .expectNext("foo")          // 第一個我們期望的信號是 onNext,它的值為 foo
        .expectNext("bar")          // 第二個我們期望的信號是 onNext,它的值為 bar
        .expectErrorMessage("boom") // 最后我們期望的是一個終止信號 onError,異常內容應該為 boom
        .verify();                  // 使用 verify() 觸發測試。

除了正常測試外,Reactor 還提供了諸如:

  1. 測試基於時間操作符相關的方法,使用 StepVerifier.withVirtualTime 來進行
  2. 使用 StepVerifier 的 expectAccessibleContext 和 expectNoAccessibleContext 來測試 Context
  3. 用 TestPublisher 手動發出元素
  4. 用 PublisherProbe 檢查執行路徑

測試方面暫時不是我們學習的重點,這塊內容,我們快速跳過,等到學習到相關場景,需要的時候,我們回過頭來再彌補。

調試

響應式編程的調試令人生畏,因為它不像命令式編程,我們可以從異常的堆棧信息中看到發生錯誤代碼的位置及具體錯誤信息,這也是響應式編程學習曲線比較陡峭的原因。

有如下代碼:

Flux.range(1, 3)
    .flatMap(n -> Mono.just(n + 100))
    .single()
    .map(n -> n * 2)
    .subscribe(System.out::println);

執行測試時,打印錯誤日志如下:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
	at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:79)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	...
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)

我們從上述的錯誤中獲取到發生了 IndexOutOfBoundsException 數據越界異常,從上往下看,應該是 MonoSingle 響應式流發出了不止一個元素,查看 Mono#singe 操作符描述,我們看到 single 有一個規定: 源必須只能發出一個元素。看來是有一個源發出了多於一個元素,從而違反了這一規定。

粗略過一下這些行,我們可以大概勾畫出一個大致的出問題的鏈:涉及 MonoSingle、FluxFlatMap、FluxRange(每一個都對應 trace 中的幾行,但總體涉及這三個類)。 所以難道是 range().flatMap().single() 這樣的鏈?

但是如果在我們的應用中多處都用到這一模式,那怎么辦?通過這些還是不能確定為什么會拋除這個異常, 搜索 single 也找不到問題所在。直到最后幾行指向了我們的代碼,查看代碼和我們之前的預測的調用鏈一樣。

但是最終我們怎么快速確定代碼的問題在哪里呢?

方案1: 開啟調試模式

使用 Hooks.onOperatorDebug(); 在程序初始的地方開啟調試模式

錯誤日志如下:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
	reactor.core.publisher.Flux.single(Flux.java:7851)
	top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
Error has been observed at the following site(s):
	|_ Flux.single ⇢ at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
	|_    Mono.map ⇢ at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:84)
Stack trace:
		at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
		at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
		at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
		at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
		at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
		at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:85)

我們從 Error has been observed at the following site(s) 這行錯誤起,可以看到錯誤沿着操作鏈傳播的軌跡(從錯誤點到訂閱點),我們從 Assembly trace from
producer 這行開始的錯誤中也看到了源代碼 83 行開始報錯,也確定了上一行的 flatMap 操作符發出了不止一個元素導致。

方案2: 使用 checkpoint 操作符埋點調試

使用方案1開啟全局調試有較高的成本即影響性能,我們可以在可能發生錯誤的代碼中加入操作符 checkpoint 來檢測本段響應式流的問題,而不影響其他數據流的執行。
checkpoint 通常用在明確的操作符的錯誤檢查,類似於埋點檢查的概念。同時該操作符支持 3個重載方法:checkpoint(); checkpoint(String description); checkpoint
(String description, boolean forceStackTrace);
description 為埋點描述,forceStackTrace 為是否打印堆棧

方案3: 啟用調試代理

1. 在項目中引入 reactor-tools 依賴

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
</dependency>

2. 使用 ReactorDebugAgent.init(); 初始化代理

由於該代理是在加載類時對其進行檢測,因此放置它的最佳位置是在main(String [])方法中的所有其他項之前

3. 如果是測試類,使用如下代碼處理現有的類

注意,在測試類中需要提前運行,比如在 @Before 中

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();

總結

本篇我們了解了如何引入 Reactor ;初步體驗了 Reactor 的 Hello World 代碼;最后我們了解了如何測試及調試 Reactor,這些內容為我們后面學習 Reactor 的基礎,希望大家都能掌握。

今天的內容就學到這里,我們下篇開始 Reactor 的基礎和特性學習。

源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note{:target="_blank"} 下 02-reactor-core-learning 模塊。

參考

  1. Reactor 3 Reference Guide
  2. Reactor 3 中文指南


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM