本章節繼續介紹:Flux和Mono操作符(二)
1.條件操作符
Reactor中常用的條件操作符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。
1、defaultIfRmpty
defaultIfRmpty操作符返回來自原始數據流的元素,如果原始數據流中沒有元素,則返回一個默認元素。
defaultIfRmpty操作符在實際開發過程中應用廣泛,通常用在對方法返回值的處理上。如下controller層對service層返回值的處理。
@GetMapper("/article/{id}") public Mono<ResponseEntity<Article>> findById(@PathVariable String id){ return articleService.findOne(id) .map(ResponseEntity::ok) .defaultIfRmpty(ResponseEntity.status(404).body(null)); }
2、takeUntil
takeUntil操作符的基本用法是takeUntil(Predicate<? super T>> predicate)
,其中Predicate代表一種斷言條件,takeUntil將提取元素直到斷言條件返回true。
示例代碼如下:
Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);
3、takeWhile
takeWhile操作符的基本用法是takeWhile(Predicate<? super T>> continuePredicate)
,其中continuePredicate也代表一種斷言條件。與takeUntil不同的是,takeWhile會在continuePredicate條件返回true時才進行元素的提取。
示例代碼如下:
Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);
4、skipUntil
與takeUntil相對應,skipUntil的基本用法是skipUntil(Predicate<? super T>> predicate)
。skipUntil將丟棄原始數據中的元素,直到Predicate返回true。
5、skipWhile
與takeWhile相對應,skipWhile操作符的基本用法是skipWhile(Predicate<? super T>> continuePredicate)
。當continuePredicate返回true時才進行元素的丟棄。
2.數學操作符
Reactor中常用的數學操作符有concat、count、reduce等。
1、concat
concat用來合並來自不同Flux的數據,這種合並采用的是順序的方式。
2、count
count操作符比較簡單,用來統計Flux中元素的個數。
3、reduce
reduce操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的Mono序列。具體的累計操作也是通過一個BiFunction來實現的。
示例代碼如下:
Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);
這里BiFunction就是一個求和函數,用來對1到10的數字進行求和,運行結果為55。
與其類似的還有一個reduceWith。
示例代碼如下:
Flux.range(1,10).reduceWith(() - >5,(x,y) -> x+y).subscribe(System.out::println);
這里使用5來初始化求和過程,得到的結果是60。
3.Observable工具操作符
Reactor中常用的Observable操作符有delay、subscribe、timeout等。
1、delay
delay將時間的傳遞向后延遲一段時間。
2、subscribe
在前面的代碼演示了subscribe操作符的用法,我們可以通過subscribe()方法來添加相應的訂閱邏輯。
在前面章節中我們提到了Reactor中的消息類型有三種,即正常消息,異常消息和完成消息。subscribe操作符可以只處理其中包含的正常消息,也可以同時處理異常消息和完成消息。當我們用subscribe處理異常消息時可以采用以下方式。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .subscribe(System.out::println,System.err::println);
以上代碼執行結果如下,我們得到了一個100,同時也獲取了IllegalStateExxeption這個異常。
100
java.lang.IllegalStateExxeption
有時候我們不想直接拋出異常,而是想采用一個容錯策略來返回一個默認值,就可以采用以下方式。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .onErrorReturn(0) .subscribe(System.out::println);
以上代碼執行結果如下。當產生異常時,使用onErrorReturn()方法返回一個默認值0.
100
0
另外容錯策略也是通過switchOnError()方法使用另外的流產生元素。以下代碼示例演示了這種策略。
與上面的執行結果相同。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .switchOnError(Mono.just(0)) .subscribe(System.out::println);
3、timeout
timeout操作符維持原始被觀察者的狀態,在特定時間內沒有產生任何事件時,將生成一個異常。
4、block
block操作符在沒有接收到下一個元素之前一直被阻塞。block操作符通常用來把響應式的數據流轉換成傳統的數據流。
例如,使用如下方法時,我們分別將Flux數據流和Mono數據流轉變成了普通的List<Order>
對象和單個Order對象,同樣也可以設置block的等待時間。
public List<Order> getAllOrder(){ return orderService.getAllOrders().block(Duration.ofSecond(5)); } public Order getOrderById(Long orderId){ return orderService.getOrderById(orderId).block(Duration.ofSecond(2)); }
往期
實戰SpringCloud響應式微服務系列教程(第五章)