接上一篇:
1.1.4 引入Reactor框架
如果想在代碼中集成 Reactor框架,則需要添加如下的 Maven依賴,分別引入 Reactor的核心功能以及用於支持測試的相關工具類。
<dependency> <groupid>io.projectreactor</groupid> <cartifactid>reactor-cores/artifactid> </dependency> <dependency> <groupid>io.projectreactor</groupid> <cartifactid>reactor-tests/artifactid> <scope>test</scope> </dependency>
Reactor框架在實現響應式流規范的基礎上有其特定的設計思想。本節先介紹 Reactor框架的異步數據序列,然后介紹Flux和Mono這兩個核心組件。
1. Reactor異步數據序列
當使用 Reactor開發響應式應用程序時,無論采用何種操作符,都將得到一個如圖所示的異步數據序列。
onNext x 0..N [onError |onComplete]
以上公式包含三種不同類型方法的調用,分別處理不同場景下的消息通知。
-
onNext():正常的包含元素的消息通知。
-
onCompleted():序列結束的消息通知,可以沒有。
-
onError():序列出錯的消息通知,可以沒有。
按照響應式流規范,當這些消息通知產生時,異步序列的訂閱者中對應的這三個方法將被調用。如果序列沒有出錯,則 onerror()方法不會被調用;如果不調用 onComplete()方法,就會得到一個無限異步序列。通常,無限異步序列應該只用於測試等特殊場景。
2.Flux組件
Flux代表0至N個元素的異步序列,如下圖,序列的三種消息通知都是用於Flux.
以下代碼示例展示了在具體項目中使用Fux組件的方法。如果我們了解微服務架構中基於 Hystrix I的服務回退( Fallback)機制,就應該知道代碼中的 getordersfallbacko是一個典型的回退函數,我們通過FIux.fromlterableo方法構建了 Flux<Order>
對象,作為回退函數的返回值。
關於服務回退機制,將在之后章節中具體介紹
private Flux<order> getordersfallback(){ List<order> fallbacklist = new Arraylist<>(); Order order= new Order(); order.setid("orderinvalidid"); order.setaccountid("Invalidid"); order.setitem("Order list is not available"); order.setcreatetime (new Date(); fallbacklist.add (order); return Flux.fromiterable (fallbacklist); }
下面的示例更加容易理解一點,從位於方法名上的@ Getmapping注解可以看出,這是個 Controller中的端點,用於返回一個 Order對象列表。這里返回的 Order列表同樣通過Flux< Order>
對象進行呈現。
@getmapping("/vl/orders") public Flux<Order> getorderlist(){ Flux<order> orders= orderservice. getorders(); return orders; }
3.Mono組件
在 Reactor中,Mono表示包含0個或1個元素的異步序列,如圖所示,該序列中同樣可以包含與Fux相同的三種類型的消息通知。
請注意,Mono也可以用來表示一個空的異步序列,該序列沒有任何元素,僅僅包含序列結束的概念(類似於Java中的 Runnable)。我們可以用Mono<void>
代表一個空的異步序列。
與FIux組件一樣,通過服務回退來演示Mono組件的用法,示例代碼如下。
private Mono<order> getorderfallback(){ Order order = new Order(); order.setid("orderinvalidid"); order.setaccountid("Invalidid"); order.setitem("Order list is not available"); order.setcreatetime(new Date()); return Mono.just(order); }
這里首構建一個 Order對象,然后通過 Mono.just()方法返回一個Mono對象。
Controller層組件也是一樣的,通過d獲取Mono<Order>
對象的端點示例如下。
@tapping("/vl/orders/{id}") public Mono<order> getorder(@Pathvariable String id){ Mono<order> order orderservice.getorderbyid(id); return order; }
相較Mono,Flux是更通用的一種響應式組件,所以針對FIux的操作要比Mono更豐富。另一方面,FIux和Mono之間可以相互轉換。例如,把兩個Mono序列合並起來就得到一個Flux序列,而對一個FIux序列進行計數操作,得到的就是Mono對象。