實戰SpringCloud響應式微服務系列教程(第六章)


本章節介紹:Flux和Mono操作符

和其他主流的響應式編程一樣,Reactor框架的設計目標也是為了簡化相應式流的使用方法。為此Reactor框架提供了大量操作符用於操作Flux和Mono對象。

本節不打算全面詳細介紹,我們的思路是將這些操作符分類,然后對每一類中具有代表性的操作符展開討論。

對於其他沒有介紹到的操作符可參考Reactor框架的官方文檔。

在本節中我們把Flux和Mono操作符分為以下7大類。

  • 轉換 (Transforming)操作符負責對序列中的元素進行轉變。
  • 過濾 (Filtering)操作符負責將不需要的數據從序列中進行過濾。
  • 組合 (Combining) 操作符負責將序列中的元素進行合並和連接。
  • 條件 (Conditional) 操作符負責根據特定條件對序列中的元素進行處理。
  • 數學 (Mathematical) 操作符負責對序列中的元素執行各種數學操作。
  • Obserable工具(Utility) 操作符提供的是一些針對流失處理的輔助性工具。
  • 日志和調試(Log&Debug) 操作符提供了針對運行時日志以及如何對序列進行代碼調試的工具類。

1. 轉換操作符

Reactor框架中常用的轉換操作符包括buffer、map、flatMap和window。

(1)buffer

buffer操作符把當前流中的元素收集到集合中,並把集合對象作為流中的新元素。使用buffer操作符在進行元素收集時可以指定集合對象所包含的元素的最大數量。

以下代碼先使用range()方法創建了1~50這50個元素,然后演示了使用buffer從包含這50個元素的流中構建集合,每個集合包含10個元素,一共構建5個集合。

Flux.range(1,50).buffer(10).subscribe(System.out::println);

 

上面代碼執行結果如下:

[1,2,3,4,5,6,7,8,9,10]
[11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30]
[31,32,33,34,35,36,37,38,39,40]
[41,42,43,44,45,46,47,48,49,50]

 

buffer操作符的另一種用法是指定收集的時間間隔,由此演變出了bufferTimeout()方法。bufferTimeout()方法可以指定時間間隔為一個Duration對象或者毫秒數,即使用bufferTimeoutMillis()或者bufferMillis()這兩個方法。

除了指定元素數量和時間間隔,還可以通過bufferUnitl和bufferWhile操作符進行數據收集。bufferUnitl會一直收集,知道斷言(Predicate)條件返回true。使得斷言條件返回true的那個元素可以選擇添加到當前集合或者下一個集合當中。

而bufferWhile只有當斷言條件返回true時才會收集,一旦值為false,會立即開始下一次收集。

代碼如下:

Flux.range(1,10).bufferUnitl(i -> i%2 ==0).subscribe(System.out::println);
System.out.println("-----------------------------------");
Flux.range(1,10).bufferWhile(i -> i%2 ==0).subscribe(System.out::println);

 

以上代碼執行結果如下:

[1,2]
[3,4]
[5.6]
[7,8]
[9,10]
--------------------------
[2]
[4]
[6]
[8]
[10]

 

(2)map

map操作符相當於一種映射操作,他對流中每一個元素映射一個函數,從而達到一個變幻效果。

Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2;
                })
                .subscribe(e -> log.info("get:{}",e));

 

以上代碼運行結果:

10:53:57.058 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
10:53:57.062 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
10:53:57.063 [main] INFO reactor.Flux.Array.1 - | onNext(1)
10:53:58.067 [main] INFO com.example.demo.FluxTest - get:2
10:53:58.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
10:53:59.071 [main] INFO com.example.demo.FluxTest - get:4
10:53:59.071 [main] INFO reactor.Flux.Array.1 - | onNext(3)
10:54:00.076 [main] INFO com.example.demo.FluxTest - get:6
10:54:00.076 [main] INFO reactor.Flux.Array.1 - | onNext(4)
10:54:01.080 [main] INFO com.example.demo.FluxTest - get:8
10:54:01.081 [main] INFO reactor.Flux.Array.1 - | onComplete()

 

(3) flatMap

與map不同,操作符把流中的每一個元素轉換成一個流,再把轉換之后得到的所有流中的元素進行合並。

flatMap操作符非常實用,代碼示例如下:

Flux.just(1,5).flatMap(x -> Mono.just(x*x)).subscribe(System.out::println);

 

以上代碼中我們對1和5這兩個元素進行了flatMap操作,操作的結果是返回他們的平方值進行合並,執行結果如下:

1
25

 

在系統開發過程中我們經常會碰到對數據庫中的數據進行逐一處理的場景,這時候可以充分利用flatMap操作符的特性開展相關操作。以下代碼展示了如何使用flatMap對數據庫中獲取的數據進行逐一刪除的方法。

Mono<void> deleteFiles = 
fileRepository.findByname(flieName).flatMap(fileRepository::delete);

 

(4)window

window操作符類似於buffer,所不同的是,window操作符是把當前流中的元素收集到另一個Flux序列中。因此它的返回值類型是Flux<flux>,而不是簡單的Flux。</flux

示例代碼:

Flux.range(1,5).window(2).toIterable().forEach(w -> {
   w.subscribe(System.out::println);
   System.out.println("---------------------------")
})

 

以上代碼執行結果如下。這里生成了5個元素,然后通過window操作符把這個5個元素轉變成3個Flux對象,並通過forEach()工具把這些對象打印出來。

1
2
-----------------
3
4
-----------------
5

 

2.過濾操作符

Reactor中 常用過操作符包括filter、first、last、skip/skipLast、take/takeLast等。

(1)filter

filter操作符的含義與普通的過濾器類似,就是對流中包含的元素進行過濾,只是留下滿足指定條件的元素。

例如,我們想對1~10這10個元素進行過濾,只獲取能被2取余的元素,可以使用如下代碼。

Flux.range(1,10).filter(i -> i % 2 ==0).subscribe(System.out::println);

 

(2)first

first操作符的執行效果即為返回流中的第一個元素。

(3)last

last操作符與first類似,返回流中的最后一個元素。

(4)skip/skipLast

如果使用skip操作符,將會忽略數據流中的前n個元素。類似的如果使用skipLast將會忽略數據流中的后n個元素。

(5)take/takeLast

take系列操作符用來從當前流中提取元素。我們可以按照指定的數量來提取元素,對應的方法是take(long n);同時,也可以按照指定的時間間隔來提取元素,分別使用take(Duration time) 和takeMillis(long time)。類似的takeLast操作符用來從當前流中尾部提取元素。

take和takeLast操作符示例代碼如下:

Flux.range(1,100).take(10).subscribe(System.out::println);
Flux.range(1,100).takeLast(10).subscribe(System.out::println);

 

3. 組合操作符

Reactor中常用的組合操作符有then/when、merge、starWith和zip

(1)then/when

then操作符的含義是等到上一個操作符完成時在做下一個。

when操作符的含義是等到多個操作儀器完成。

如下代碼很好的展示了when操作符的實際應用場景。

public Mono<Void> updateFiles(Flux<FilePart> files){
        return files.flatMap(file ->{
            Mono<Void> copyFileToFileServer =...;
            Mono<Void> saFilePathToDataBase = ...;
            return Mono.when(copyFileToFileServer);
        });
    }

 

(2)starWith

starWith操作符的含義是在數據元素序列的開頭插入指定的元素項。

(3)merge

merge操作符用來把多個流合並成一個Flux序列,該操作符按照所有流中的元素實際生產順序合並。

merge操作符示例代碼如下:

Flux.merge(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

 

請注意這里兩個Flux.intervalMillis()方法都是在限制10ms內生產一個新元素。

運行結果如下:

0
0
1
1
2
2

 

不同於merge,mergeSequeetial操作符則是按照所有流被訂閱的順序以流為單位進行合並。

請看如下代碼:

Flux.mergeSequeetial(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

 

我們僅僅只是將merge換成了mergeSequeetial。

運行結果如下:

0
1
2
0
1
2

 

(4)zipWith

zipWith把當前流中的元素與另外一個流中的元素按照一對一的方式進行合並。使用zipWith
操作符在合並時可以不做任何處理,如此得到一個元素類型為Tuple2的流,示例代碼如下:

Flux.just("a","b").zipWith(Flux.just("c","b")).subscribe(System.out::println);

 

運行結果如下:

[a,c]
[b,d]

 

另外,我們還可以通過一個BiFunction函數對合並的元素進行處理,所得到的流的元素類型為該函數的返回值。

代碼如下:

Flux.just("a","b").zipWith(Flux.just("c","b"),(s1,s2) -> String.format("%+%",s1,s2))
   .subscribe(System.out::println);

 

運行結果如下:

a+c
b+d

 

本章節完!歷史章節

實戰SpringCloud響應式微服務系列教程(第一章)

實戰SpringCloud響應式微服務系列教程(第二章)

實戰SpringCloud響應式微服務系列教程(第三章)

實戰SpringCloud響應式微服務系列教程(第四章)

實戰SpringCloud響應式微服務系列教程(第五章)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM