Reactor 簡介


官方的介紹如下:

Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements) — and extensively implements the Reactive Streams specification.

大致翻譯如下:

Reactor 是一個完全非阻塞的 JVM 反應式編程基礎,具有高效的需求管理(以管理“背壓”的形式)。它直接整合了 Java 8 中的函數式編程的 API,尤其是 CompletableFutureStreamDuration。它提供了可組合的異步序列 API— Flux(用於 N 個元素)和 Mono(用於0個或一個元素),並實現了 Reactive Stream 規范

使用 Reactor 的原因

Reactor Programming (響應式編程)是一種新的編程范式,可以通過使用類似於函數式編程的方式來構建異步處理管道。它是一個基於事件的模型,其中數據在可用時推送給消費者。

Reactor Programming 具有能夠有效地利用資源並且增加應用程序為大量客戶端提供服務的能力,相比較傳統的創建一個單獨的線程來處理請求,響應式編程能夠降低編寫並發代碼的難度。

通過圍繞完全異步和非阻塞的核心支柱構建,響應式編程是 JDK 中那些功能有限的異步處理 API (CallBackFuture 等)的更好的替代方案

關鍵概念

  • Publisher

    數據的發布者,即數據產生的地方

  • Subscriber

    數據的訂閱者,即數據接受方

    Publisher 的關系如下:

    image.png
  • Processor

    代表一個處理階段,既是 Publisher,也是 Subscriber

  • Subscription

    代表 PublisherSubscriber 的一對一生命周期,只能由一個 Subscriber 使用一次

對應的關系圖如下:

image.png

首先定義實體類:

public class User {

    public static final User SKYLER = new User("swhite", "Skyler", "White");
    public static final User JESSE = new User("jpinkman", "Jesse", "Pinkman");
    public static final User WALTER = new User("wwhite", "Walter", "White");
    public static final User SAUL = new User("sgoodman", "Saul", "Goodman");

    private final String username;
    private final String firstname;
    private final String lastname;

    public User(String username, String firstname, String lastname) {
        this.username = username;
        this.firstname = firstname;
        this.lastname = lastname;
    }
    
    // 省略一部分 getter 和 Object 相關的代碼
}

定義一個轉換函數

// 這個函數的任務是將 User 的每個字段都轉換成為對應的大寫形式
Function<User, User> capital = user ->
    new User(
    	user.getUsername().toUpperCase(),
    	user.getFirstname().toUpperCase(),
    	user.getLastname().toUpperCase()
	);

Flux

Flux 是 Reactive Stream 中的 Publisher,增加了許多可用於生成、轉換、編排 Flux 序列的運算符。Flux 用於發送 0 個或者多個元素(onNext() 事件觸發),然后成功結束或者直到出現 error(onCompelete()onError() 都是終止事件),如果沒有終止事件的產生,那么 Flux 將會產生無窮無盡的元素

基本使用

  • 靜態方法

    • 靜態工廠方法

      靜態方法主要是用於創建元素流,或者通過幾種回調類型來生成這些流的元素

      創建 Flux 主要有以下幾種方法:

      // 創建一個空的 Flux
      static <T> Flux<T> empty();
      
      // 解析傳入的參數值,將它們組合成一個 Flux
      static <T> Flux<T> just(T... data);
      
      // 從一個 Iterable 對象中創建對應的 Flux
      static <T> Flux<T> fromIterable(Iterable<? extends T> it);
      
      // 創建一個 Exception 元素,這是在 Reactor Stream 中處理異常的方式
      static <T> Flux<T> error(Throwable error);
      
      // 每個一定時間間隔產生一個 Long 類型的元素到 Flux 的流中
      static Flux<Long> interval(Duration period);
      
      // 通過一個 Consumer,每次在請求元素時調用這個函數
      static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);
      
      // 支持從
      static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);
      
      static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter);
      
    • generate

      通過響應請求,產生不同的元素。

      image.png

      具體示例如下:

      Scanner sc = new Scanner(System.in);
      
      // 從控制台獲取輸入,然后生成對應的流元素,將將這些元素轉換為大寫之后再輸出到控制台
      Flux.<String>generate(sink -> {
          sink.next(sc.next()); // 這里的 sink 是同步的
      })
          .doFinally(any -> sc.close())
          .map(String::toUpperCase)
          .subscribe(System.out::println);
      // 這種方式在響應客戶的相關請求時比較有用
      
    • create

      image.png

      該方法將 Publisher 的生產步驟暴露給外部,使得將產生元素的邏輯拆分到外部,而將元素的處理邏輯封裝起來,減少系統的耦合

    • merge

      方法簽名如下:

      // 將多個 Publisher 合並為一個 Flux,這樣合並的 Flux 的元素組成將是交錯(無順序的)
      static <I> Flux<I> merge(int prefetch, boolean delayError, Publisher<? extends I>... sources)
      
    • concat

      // 將多個 Publisher 合並為一個 Flux,這樣得到的 Flux 是有序的
      static <T> Flux<T> concat(Publisher<? extends T>... sources)
      
  • 實例方法

    • map

      // 函數原型,該方法的主要任務是 Flux 中的每個元素進行通過轉換函數進行轉換。。。
      public final <V> Flux<V> map(Function<? super T, ? extends V> mapper);
      

      具體使用:

      // 將 Flux 中的每個 User 對象執行對應的轉換,得到轉換之后的一個新的 Flux
      Flux<User> capitalizeMany(Flux<User> flux) {
          return flux.map(capital);
      }
      
    • flatMap

      有的轉換函數可能是阻塞的,由於 Reactor 要求每個操作應當都是非阻塞的,為了解決這個問題,可以將相關的 map 操作放入一個 Mono 中進行異步處理以達到非阻塞的效果

      FluxflatMap 的函數原型如下:

      /* 
      	將當前 Flux 中的元素以異步的方式發送到 Publisher 中,然后通過合並的方式將這些 Publisher
      	扁平化為單個的 Flux,這個操作允許 Flux 交錯(不按順序)地進行
      */
      public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
      

      具體的過程如下所示:

      具體示例如下:

      Flux<User> asyncCapitalizeMany(Flux<User> flux) {
          /* 
          	將 Flux 的每個元素以異步的方式放入到 Mono 中,最后再將這些 Mono 的元素整合到
          	一個 Flux 中
          */
          return flux.flatMap(this::asyncCapitalizeUser);
      }
      
      Mono<User> asyncCapitalizeUser(User u) {
          return Mono.just(
              	new User(
              		u.getUsername().toUpperCase(), 
              		u.getFirstname().toUpperCase(), 
              		u.getLastname().toUpperCase()
          		)
                );
      }
      

Mono

MonoFLux 類似,主要的區別在於 Mono 的元素個數只能是 0 個或一個,或者元素是一個異常

基本使用

  • 靜態方法

    • 靜態工廠方法

      靜態工廠方法主要用於創建 Mono

      // 創建一個空的 Mono
      static <T> Mono<T> empty();
      
      /* 
      	創建一個不會產生任何數據的 Mono,與空的 Mono 不同,
      	這樣得到的 Mono 不會產生任何 onComplete 事件
      */
      static <T> Mono<T> never();
      
      // 從單個的元素產生一個 Mono
      static <T> Mono<T> just(T data);
      
      // 產生一個帶有異常的 Mono
      static <T> Mono<T> error(Throwable error);
      
    • fromSupplier

      創建一個 Mono,產生的元素由對應的 Supplier 來提供,如果 Supplier 產生的是 null,那么就會得到一個空的 Mono

      方法原型如下:

      public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier)
      

      這個方法在結合 Flux 實現異步處理的時候非常有用,類似的方法還有:fromCallablefromRunnablefromFuture

    • defer

      創建一個 Mono ,通過這種方式創建的 Mono 會為每個 subscriber 提供一個新的一致的 Mono

      方法原型如下:

      public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)
      
  • 實例方法

    • map

      方法原型如下:

      // 將元素執行對應的轉換。。。
      public final <R> Mono<R> map(Function<? super T, ? extends R> mapper);
      
    • flatMap

      方法原型如下:

      // 將當前的元素放入到一個 Publisher 中,以實現非阻塞的目標
      final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>>
      

StepVerifier

Reactor Programming 和一般的編程不一樣,由於它是完全非阻塞的,因此代碼的調試可能會變得很困難,使用 StepVerifier 類能夠有效地檢測 Reactor Stream 是否時按照預期的定義進行的

首先,需要添加 reactor-test 相關的依賴

<!-- 在 Spring Boot 中由 SpringBoot Parent 管理,因此無須添加版本號 -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
</dependency>

基本使用

  • 檢測預期的元素

    // 創建一個 Flux 元素流
    Flux<Long> counter = Flux.interval(Duration.ofMillis(10)).take(5);
    // 檢測每一步得到的元素,如果不是預期的元素,則會得到一個斷言失敗的異常。。。
    StepVerifier.create(counter)
        .expectNext(0L)
        .expectNext(1L)
        .expectNext(2L)
        .expectNext(3L)
        .expectNext(4L)
        .verifyComplete();
    
  • 斷言出現的元素

    static class {
        private final String username;
        private final String firstname;
        private final String lastname;
        
        public User(String username, String firstname, String lastname) {
            this.username = username;
            this.firstname = firstname;
            this.lastname = lastname;
        }
        
        // 省略一部分代碼
    }
    
    void expectSkylerJesseComplete(Flux<User> flux) {
        StepVerifier.create(flux)
            // 下一個到來的元素應當是 username 為 "swhite" 的 User 對象
            .assertNext(user -> user.getUsername().equals("swhite"))
            // ...............
            .assertNext(user -> user.getUsername().equals("jpinkman"))
            .verifyComplete(); // 此時元素流應當已經結束
    }
    
  • 檢測出現的異常

    StepVerifier.create(flux) // 從對應的 Flux 創建元素流
        .expectError(RuntimeException.class) // 下一個到來的元素應當是一個 RuntimeException
        .verify();
    
  • 虛擬時間檢測

    使用 Fluxinternal() 方法將會按照一定的時間間隔產生一個元素,如果產生的元素很多,同時每個元素產生的間隔又很大,這個時候就會導致測試時間會變得很長,為了解決這個問題,StepVerifier 提供了 withVirtualTime 的方法來解決這個問題

    Supplier<Flux<Long>> supplier = () -> Flux.interval(Duration.ofSeconds(1)).take(3600L);
    
    StepVerifier.withVirtualTime(supplier)
        .thenAwait(Duration.ofHours(1)) // 使得當前的 Flux 元素流覺得 3600s 已經過去了
        .expectNextCount(3600) // 此時應當已經接受了 3600 個元素
        .verifyComplete(); // 測試結束
    

背壓(backpressure)

前文介紹了有關 PublisherSubscriber 之間的關系,在關系圖中存在一個稱為 “backpressure” 的輪子,這個組件的作用用於限制 PublisherSubscriber 發送數據的速度,使得 Subscriber 能夠正常地處理數據,不至於由於收到的數據過多無法處理而導致 Subscriber 宕機。

這是一種反饋機制,由 SubscriberPublisher 發送一個 “反饋信息”,表示自己准備處理多少數據,而 Publisher 通過這一 “反饋信息” 限制自己的發送數據的速度(具體可以將多余的數據丟棄或放入緩沖區等),從而達到一個動態的平衡。

Flux.just(1, 2, 3, 4, 5)
    .log() // 打印相關的記錄信息。。。。
    .subscribe(new Subscriber<Integer>() {
        private Subscription subscription;
        private int amt = 0;

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(2); // 首先請求兩個元素
        }
        
        /* 
        	之后,每獲取到兩個元素就發送一個 “獲取下兩個元素” 的反饋信息給 Publisher,
        	以此達到背壓的效果
        */
        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext: " + integer);
            amt++;
            if (amt % 2 == 0) this.subscription.request(2);
        }

        @Override
        public void onError(Throwable throwable) {
            // nothing should to do....
        }

        @Override
        public void onComplete() {
            // nothing should to do....
        }
    });

處理策略

Publisher 發布元素的速度大於 Subscriber 能夠處理的速度時,將會導致 Publisher 的·數據積壓,需要采取不同的處理策略來解決這個問題。

具體的處理策略定義於 reactor.core.publisher.FluxSink.OverflowStrategy 中,主要有以下幾種策略:

  • BUFFER(默認)

    將多余的元素放入 buffer 中,這種方式在速率相差很大的情況下將會產生異常

  • LATEST

    訂閱者僅僅獲得最新的元素,和 DROP 策略有點類似

  • DROP

    Publisher 丟棄產生的多余的元素

  • ERROR

    直接傳一個 IllegalStateExceptionSubscriber

  • IGNORE

    完全忽略來自 Subscriber 的背壓請求(當訂閱者隊列已滿時,這可能會導致 IllegalStateException

冷流和熱流

一般由 Flux.jus(...) 方法創建的元素流是靜態的、有長度限制的並且只能被 Subscriber 一次,處理起來也比較容易,這種元素流也被稱為 “冷流”。實際使用過程中,這種情況不太可能會遇到,一般情況下,沒有長度限制的元素流、能夠被多個訂閱者訂閱的元素流才是常見的情況

冷流 —> 熱流

通過調用 Flux 對象的 publish()replay 方法可以將 “冷流” 轉換為 “熱流”

  • publish()

    image.png

    如圖,調用 publish() 方法之后,每個 subscriber 都有自己的一個 Flux 元素流,從而使得多個 subscriber 能夠訂閱一個 Flux

    具體示例如下:

    ConnectableFlux<Object> publish = Flux.create(sink -> {
                        while (true) {
                            sink.next(System.currentTimeMillis()); // 不斷地產生元素
                        }
                    })
                    .sample(Duration.ofMillis(500)) // 每隔 500ms publish 一個元素
                    .publish();
    publish.subscribe(s -> System.out.println("Subscribe-1: " + s)); // subscrbe-1
    publish.subscribe(s -> System.out.println("Subscribe-2: " + s)); // subscrbe-1
    publish.connect(); // 在 connect 之前不會產生進一步的動作
    
    // 也可以在調用 publish() 方法之后直接調用 autoConnect() 方法使得訂閱的 subscriber 能夠自動連接,或者也可以通過 refCount(int n) 方法使得在有 n 個訂閱者訂閱時自動連接來達到同樣的效果
    
  • replay()

    image.png

    replay() 同樣可以將 “冷流” 轉換為 “熱流”,不同的地方在與 replay() 方法會使得新來的 subscribe 首先獲取前面的元素,再正式完成元素流的訂閱。

並發

一般情況下,創建的元素流是在當前的線程下啟動的,由於 Reactor 是完全非阻塞的,因此如果某個流的操作阻塞的,那么在這種情況下可能就無法看到執行的信息(但實際上確實做了這個任務)。

如果想要將當前流的執行環境放入到另一個線程中,可以考慮使用 Reactor 提供的 Schedulers 來實現

具體示例如下:

Flux.just(1, 2, 3, 4)
    .log()
    .map(i -> i * 2)
    .subscribeOn(Schedulers.parallel()) // 修改 Subscribe 所在的線程,這里由 Schedulers 控制
    .doOnNext(s -> System.out.println("Current Thread: " + Thread.currentThread().getName())) // 每次獲取到元素時打印當前的線程
    .subscribe();

/* 
	由於 JVM 會在不存在非守護線程時退出,而 Reactor 又是完全非阻塞的,因此 Reactor 運行時的線程會被視為一個已完成任務的線程(實際上還沒有),在 Main 方法中,直接這么編寫將會導致 JVM 提前結束;為了解決這個問題,主要有兩種思路:使得當前線程睡眠一會兒使得所有任務有機會執行;開啟一個新的非守護線程去執行這個任務(在當前的 Schedule 調度的情況下,會創建一個新的單線程的線程池去執行,因此這么做也是非阻塞的)。
*/
Thread.sleep(1000);

得到的輸出如下:

圖中的 "parallel-1" 就是 Flux 所在的流執行的線程

如果使用創建新的線程的方式來執行,具體示例如下:

// 創建一個
Flux<Integer> flux = Flux.just(1, 2, 3, 4)
    .log()
    .map(i -> i * 2)
    /* 
    	注意,不要使用 Schedulers 來執行調度,這樣會使得當前的“父線程”(當前 Flux 的執行線程)
    	為非阻塞的而直接結束
    */
    // .subscribeOn(Schedulers.parallel()) 
    .doOnNext(s -> System.out.println("Current Thread: " + Thread.currentThread().getName()));

/* 
	創建一個名為 "subscribe-Thread" 的線程去執行這個任務,由於 Flux 的元素是在這個新的線程中執行的,
	因此在這個過程中這個線程始終都是存活的,這樣就可以避免 JVM 提前退出的問題了
*/
new Thread(flux::subscribe, "subscribe-Thread").start();

執行結果如下:

實際使用

異步處理

現在,有了 Reactor 之后,實現任務的異步處理就變得十分簡單了。在 Reactor 之前,如果使用傳統的 Future 或者 CompletableFuture 來實現類似功能,盡管看上去是異步的,但是實際上 Futureget() 操作依舊是阻塞的

現在,來看一個具體的問題:現在需要從一些文件中讀取一些數據,再將它們進行排序。

由於 IO 操作是一個阻塞操作,按照一般的同步的方式進行文件的讀取,當遇到一個文件比較大時,將會導致整個系統整體的響應時間會比較大,使用異步的方式可以有效地降低系統的響應時間。

File[] readFiles = new File[]{
    new File(basPath + "/data_1.txt"), // data_1 的數據量為 10000 條
    new File(basPath + "/data_2.txt"), // data_2 的數據量為 100 條
    new File(basPath + "/data_3.txt"), // data_3 的數據量為 30000 條
    new File(basPath + "/data_4.txt"), // data_4 的數據量為 600 條
    new File(basPath + "/data_5.txt"), // data_5 的數據量為 200 條
    new File(basPath + "/data_6.txt"), // data_6 的數據量為 20000 條
};

AtomicLong start = new AtomicLong();
AtomicLong end = new AtomicLong();
Flux.just(readFiles)
        .doOnSubscribe(any -> start.set(System.currentTimeMillis()))
        .flatMap(
                file -> Mono.just(new ReadFunction(file.getName()).apply(file))
                        .subscribeOn(Schedulers.newParallel("Thread-" + file.getName()))
                        .flatMap(element -> Mono.just(new SortFunction().apply(element)))
        )
        .doOnNext(element -> System.out.println(element.getName() + " has been finished......"))
        .subscribe(new Subscriber<Element>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1L);
            }

            @Override
            public void onNext(Element element) {
                System.out.println("Get Element=" + element.getName());
                this.subscription.request(1L);
            }

            @Override
            public void onError(Throwable throwable) {}

            @Override
            public void onComplete() {
                end.set(System.currentTimeMillis());
                System.out.println("Take Time: " + (end.get() - start.get()) + " ms");
                System.exit(0); // 由於使用別的線程來處理,有時這些線程會一直存在,導致 JVM 無法正常退出。。。。
            }
        });

運行結果如下:

可以看到,最終的結果是通過異步的方式來執行的(不是按照元素流的順序),ReactorJava 實現異步編程很有用的工具

任務調度

有四個程序:service-1、service-2、service-3、service-4 存在以下依賴關系:service-1 必須等待 service-2 程序執行完成之后才能執行,service-4 必須在 service-1 和 service-3 全部執行完成之后才能執行。

如果通過其它的 Future 類來完成這幾個服務程序的處理,代碼將會變得相當復雜,但是使用 Reactor 則可以簡化這些代碼。

首先,模擬四個服務程序:

public class Util {
    public static void print(Flux<?> flux) {
        flux.subscribe(System.out::println);
    }
    
    // service-1 執行 1000 ms
    public static String service1() throws InterruptedException {
        Thread.sleep(1000);
        return "service-1";
    }
    
    // service-2 執行 1500 ms
    public static String service2(String inputFrom1) throws InterruptedException {
        Thread.sleep(1500);
        return "service-2 " + inputFrom1;
    }
    
    // service-3 執行 1200 ms
    public static String service3() throws InterruptedException {
        Thread.sleep(1200);
        return "service-3";
    }
    
    // service-4 執行 500 ms
    public static String service4(String inputFrom2, String inputFrom3) throws InterruptedException {
        Thread.sleep(500);
        return "service-4: " + inputFrom2 + " : " + inputFrom3;
    }
}

現在通過 Reactor 的方式來將這四個程序進行調度處理:

Mono<String> mono2 = Mono.fromCallable(Util::service1)
    .flatMap(ret1 -> Mono.fromCallable(() -> Util.service2(ret1))
             .subscribeOn(Schedulers.newSingle("service2"))
            ); // service-1 和 service-2 之間的依賴關系

Mono<String> mono3 = Mono.fromCallable(Util::service3)
    .subscribeOn(Schedulers.newSingle("service3")); // service-3 是一個單獨的執行任務

Mono<String> ret4 = Flux.zip(mono2, mono3).single() // zip 方法將兩個 publisher 合並到一起
    .flatMap(tuple -> Mono.fromCallable( () -> Util.service4(tuple.getT1(), tuple.getT2()))); // 合並 service-1 和 service-3,並執行 service-4 的任務

System.out.println("=======================================");
AtomicLong start = new AtomicLong(), end = new AtomicLong();
ret4.doOnSubscribe(any -> start.set(System.currentTimeMillis()))
    .doFinally(any -> {
        end.set(System.currentTimeMillis());
        System.out.println("take time: " + (end.get() - start.get()) + " ms");
        System.exit(0);
    })
    .subscribe(System.out::println);

輸出結果如下:

由於線程睡眠、喚醒以及上下文切換之間存在一定的開銷,因此最終的執行時間會大於必要的串行執行的總和 \(1000ms + 1500ms + 500ms = 3000ms\)

參考:

[1] https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro

[2] https://www.baeldung.com/reactor-core


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM