實戰SpringCloud響應式微服務系列教程(第七章)


本章節繼續介紹:Flux和Mono操作符(二)

1.條件操作符

Reactor中常用的條件操作符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。

1、defaultIfRmpty

defaultIfRmpty操作符返回來自原始數據流的元素,如果原始數據流中沒有元素,則返回一個默認元素。

defaultIfRmpty操作符在實際開發過程中應用廣泛,通常用在對方法返回值的處理上。如下controller層對service層返回值的處理。

@GetMapper("/article/{id}")
public Mono<ResponseEntity<Article>> findById(@PathVariable String id){
     return articleService.findOne(id)
               .map(ResponseEntity::ok)
               .defaultIfRmpty(ResponseEntity.status(404).body(null));
}

 

2、takeUntil

takeUntil操作符的基本用法是takeUntil(Predicate<? super T>> predicate),其中Predicate代表一種斷言條件,takeUntil將提取元素直到斷言條件返回true。

示例代碼如下:

Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);

 

3、takeWhile

takeWhile操作符的基本用法是takeWhile(Predicate<? super T>> continuePredicate),其中continuePredicate也代表一種斷言條件。與takeUntil不同的是,takeWhile會在continuePredicate條件返回true時才進行元素的提取。

示例代碼如下:

Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);

 

4、skipUntil

與takeUntil相對應,skipUntil的基本用法是skipUntil(Predicate<? super T>> predicate)。skipUntil將丟棄原始數據中的元素,直到Predicate返回true。

5、skipWhile

與takeWhile相對應,skipWhile操作符的基本用法是skipWhile(Predicate<? super T>> continuePredicate)。當continuePredicate返回true時才進行元素的丟棄。

2.數學操作符

Reactor中常用的數學操作符有concat、count、reduce等。

1、concat

concat用來合並來自不同Flux的數據,這種合並采用的是順序的方式。

2、count

count操作符比較簡單,用來統計Flux中元素的個數。

3、reduce

reduce操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的Mono序列。具體的累計操作也是通過一個BiFunction來實現的。

示例代碼如下:

Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);

 

這里BiFunction就是一個求和函數,用來對1到10的數字進行求和,運行結果為55。

與其類似的還有一個reduceWith。

示例代碼如下:

Flux.range(1,10).reduceWith(() - >5,(x,y) -> x+y).subscribe(System.out::println);

 

這里使用5來初始化求和過程,得到的結果是60。

3.Observable工具操作符

Reactor中常用的Observable操作符有delay、subscribe、timeout等。

1、delay

delay將時間的傳遞向后延遲一段時間。

2、subscribe

在前面的代碼演示了subscribe操作符的用法,我們可以通過subscribe()方法來添加相應的訂閱邏輯。

在前面章節中我們提到了Reactor中的消息類型有三種,即正常消息,異常消息和完成消息。subscribe操作符可以只處理其中包含的正常消息,也可以同時處理異常消息和完成消息。當我們用subscribe處理異常消息時可以采用以下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .subscribe(System.out::println,System.err::println);

 

以上代碼執行結果如下,我們得到了一個100,同時也獲取了IllegalStateExxeption這個異常。

100
java.lang.IllegalStateExxeption

有時候我們不想直接拋出異常,而是想采用一個容錯策略來返回一個默認值,就可以采用以下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .onErrorReturn(0)
         .subscribe(System.out::println);

 

以上代碼執行結果如下。當產生異常時,使用onErrorReturn()方法返回一個默認值0.

100
0

另外容錯策略也是通過switchOnError()方法使用另外的流產生元素。以下代碼示例演示了這種策略。

與上面的執行結果相同。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just(0))
         .subscribe(System.out::println);

 

3、timeout

timeout操作符維持原始被觀察者的狀態,在特定時間內沒有產生任何事件時,將生成一個異常。

4、block

block操作符在沒有接收到下一個元素之前一直被阻塞。block操作符通常用來把響應式的數據流轉換成傳統的數據流。

例如,使用如下方法時,我們分別將Flux數據流和Mono數據流轉變成了普通的List<Order>對象和單個Order對象,同樣也可以設置block的等待時間。

public List<Order> getAllOrder(){
      return orderService.getAllOrders().block(Duration.ofSecond(5));
}

public Order getOrderById(Long orderId){
      return orderService.getOrderById(orderId).block(Duration.ofSecond(2));
}

往期

實戰SpringCloud響應式微服務系列教程(第一章)

實戰SpringCloud響應式微服務系列教程(第二章)

實戰SpringCloud響應式微服務系列教程(第三章)

實戰SpringCloud響應式微服務系列教程(第四章)

實戰SpringCloud響應式微服務系列教程(第五章)

實戰SpringCloud響應式微服務系列教程(第六章)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM