如何將兩個數據流接到一起,然后進行操作
Flux是Project Reactor中的概念。
一個需求
我有兩個數據流的源頭,想要把他們合並到一起 然后組合成一個新流去返回。
思路一
我將兩個flux流轉化為mono,在其中一個流中進行一個flatMap操作,然后將兩個流連接到一起。
看代碼
@Test
void name() {
final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
final List<Integer> totalList = oddSource.collectList()
.flatMap(oddList -> evenSource.collectList().map(evenList -> {
evenList.addAll(oddList);
return evenList;
})).block();
assertThat(totalList)
.containsExactly(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
.size()
.isEqualTo(10);
}
思路二
感覺這個是騷操作。並且我流中的數據順序並不是我期望的那樣:第一個流中的數據,應該在前面
。
於是找到了另外一個操作符concatWith,並且保序是我需要的
看代碼
@Test
void name1() {
final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
final List<Integer> totalList = oddSource.concatWith(evenSource)
.collectList().block();
assertThat(totalList)
.containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
.size()
.isEqualTo(10);
}
后邊發現需求又變了:
我有四個流,然后要將這個四個流合並到一起,然后去返回。
三種方法
- 將flux換成mono,然后多進行幾次
flatMap
操作 - 多次調用flux的實例方法
concatWith
- 調用flux的靜態方法
concat
方法一和方法二大致和前邊兩種寫法一致,
方法三
@Test
void name2() {
final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
final List<Integer> totalList = Flux.concat(oddSource,evenSource)
.collectList().block();
assertThat(totalList)
.containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
.size()
.isEqualTo(10);
}
Flux.concat
方法接收的是比變長參數就不展示了,可以看下該方法的源碼
該方法接受的是publisher類型的參數,所以Mono的source也可以接上來
代碼
@Test
void test3() {
final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
final Mono<Integer> monoSource = Mono.just(11);
final List<Integer> totalList = Flux.concat(oddSource,evenSource, monoSource)
.collectList().block();
assertThat(totalList)
.containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10,11)
.size()
.isEqualTo(11);
}
其實這種reactive stream 和java的stream有很多相似之處,java如何將兩個stream連接到一起java 8提供了Stream.concat()
一樣的思想,java 8 的stream用的多了,一些流里操作符也就很容易對應到Reactive Stream 上了。