Reactive Stream: 如何將兩個數據流接到一起,然后進行操作


如何將兩個數據流接到一起,然后進行操作

Flux是Project Reactor中的概念。

一個需求

我有兩個數據流的源頭,想要把他們合並到一起 然后組合成一個新流去返回。

思路一

我將兩個flux流轉化為mono,在其中一個流中進行一個flatMap操作,然后將兩個流連接到一起。

看代碼

@Test
void name() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = oddSource.collectList()
        .flatMap(oddList -> evenSource.collectList().map(evenList -> {
            evenList.addAll(oddList);
            return evenList;
        })).block();
    assertThat(totalList)
        .containsExactly(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
        .size()
        .isEqualTo(10);

}

思路二

感覺這個是騷操作。並且我流中的數據順序並不是我期望的那樣:第一個流中的數據,應該在前面
於是找到了另外一個操作符concatWith,並且保序是我需要的

看代碼

@Test
void name1() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = oddSource.concatWith(evenSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
        .size()
        .isEqualTo(10);

}

后邊發現需求又變了:

我有四個流,然后要將這個四個流合並到一起,然后去返回。

三種方法

  • 將flux換成mono,然后多進行幾次flatMap操作
  • 多次調用flux的實例方法concatWith
  • 調用flux的靜態方法concat

方法一和方法二大致和前邊兩種寫法一致,

方法三
@Test
void name2() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = Flux.concat(oddSource,evenSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
        .size()
        .isEqualTo(10); 

}

Flux.concat方法接收的是比變長參數就不展示了,可以看下該方法的源碼

該方法接受的是publisher類型的參數,所以Mono的source也可以接上來

代碼

@Test
void test3() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final Mono<Integer> monoSource = Mono.just(11);
    final List<Integer> totalList = Flux.concat(oddSource,evenSource, monoSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10,11)
        .size()
        .isEqualTo(11);

}

其實這種reactive stream 和java的stream有很多相似之處,java如何將兩個stream連接到一起java 8提供了Stream.concat()一樣的思想,java 8 的stream用的多了,一些流里操作符也就很容易對應到Reactive Stream 上了。

Source reop: https://github.com/1483523635/blogs/blob/master/java/reactive-streaming/flux.md


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM