Reactor:深入理解reactor core


簡介

上篇文章我們簡單的介紹了Reactor的發展史和基本的Flux和Mono的使用,本文將會進一步挖掘Reactor的高級用法,一起來看看吧。

自定義Subscriber

之前的文章我們提到了4個Flux的subscribe的方法:

Disposable subscribe(); 

Disposable subscribe(Consumer<? super T> consumer); 

Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

這四個方法,需要我們使用lambda表達式來自定義consumer,errorConsumer,completeSonsumer和subscriptionConsumer這四個Consumer。

寫起來比較復雜,看起來也不太方便,我們考慮一下,這四個Consumer是不是和Subscriber接口中定義的4個方法是一一對應的呢?

    public static interface Subscriber<T> {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }

對的,所以我們有一個更加簡單點的subscribe方法:

public final void subscribe(Subscriber<? super T> actual) 

這個subscribe方法直接接收一個Subscriber類。從而實現了所有的功能。

自己寫Subscriber太麻煩了,Reactor為我們提供了一個BaseSubscriber的類,它實現了Subscriber中的所有功能,還附帶了一些其他的方法。

我們看下BaseSubscriber的定義:

public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
                                                   Disposable

注意,BaseSubscriber是單次使用的,這就意味着,如果它首先subscription到Publisher1,然后subscription到Publisher2,那么將會取消對第一個Publisher的訂閱。

因為BaseSubscriber是一個抽象類,所以我們需要繼承它,並且重寫我們需要自己實現的方法。

下面看一個自定義的Subscriber:

public class CustSubscriber<T> extends BaseSubscriber<T> {

    public void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1);
    }

    public void hookOnNext(T value) {
        System.out.println(value);
        request(1);
    }
}

BaseSubscriber中有很多以hook開頭的方法,這些方法都是我們可以重寫的,而Subscriber原生定義的on開頭的方法,在BaseSubscriber中都是final的,都是不能重寫的。

我們看一個定義:

	@Override
	public final void onSubscribe(Subscription s) {
		if (Operators.setOnce(S, this, s)) {
			try {
				hookOnSubscribe(s);
			}
			catch (Throwable throwable) {
				onError(Operators.onOperatorError(s, throwable, currentContext()));
			}
		}
	}

可以看到,它內部實際上調用了hook的方法。

上面的CustSubscriber中,我們重寫了兩個方法,一個是hookOnSubscribe,在建立訂閱的時候調用,一個是hookOnNext,在收到onNext信號的時候調用。

在這些方法中,給了我們足夠的自定義空間,上面的例子中我們調用了request(1),表示再請求一個元素。

其他的hook方法還有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。

Backpressure處理

我們之前講過了,reactive stream的最大特征就是可以處理Backpressure。

什么是Backpressure呢?就是當consumer處理過不來的時候,可以通知producer來減少生產速度。

我們看下BaseSubscriber中默認的hookOnSubscribe實現:

	protected void hookOnSubscribe(Subscription subscription){
		subscription.request(Long.MAX_VALUE);
	}

可以看到默認是request無限數目的值。 也就是說默認情況下沒有Backpressure。

通過重寫hookOnSubscribe方法,我們可以自定義處理速度。

除了request之外,我們還可以在publisher中限制subscriber的速度。

	public final Flux<T> limitRate(int prefetchRate) {
		return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
	}

在Flux中,我們有一個limitRate方法,可以設定publisher的速度。

比如subscriber request(100),然后我們設置limitRate(10),那么最多producer一次只會產生10個元素。

創建Flux

接下來,我們要講解一下怎么創建Flux,通常來講有4種方法來創建Flux。

使用generate

第一種方法就是最簡單的同步創建的generate.

先看一個例子:

    public void useGenerate(){
        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);
    }

輸出結果:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

上面的例子中,我們使用generate方法來同步的生成元素。

generate接收兩個參數:

	public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 

第一個參數是stateSupplier,用來指定初始化的狀態。

第二個參數是一個generator,用來消費SynchronousSink,並生成新的狀態。

上面的例子中,我們每次將state+1,一直加到10。

然后使用subscribe來將所有的生成元素輸出。

使用create

Flux也提供了一個create方法來創建Flux,create可以是同步也可以是異步的,並且支持多線程操作。

因為create沒有初始的state狀態,所以可以用在多線程中。

create的一個非常有用的地方就是可以將第三方的異步API和Flux關聯起來,舉個例子,我們有一個自定義的EventProcessor,當處理相應的事件的時候,會去調用注冊到Processor中的listener的一些方法。

    interface MyEventListener<T> {
        void onDataChunk(List<T> chunk);
        void processComplete();
    }

我們怎么把這個Listener的響應行為和Flux關聯起來呢?

   public void useCreate(){
        EventProcessor myEventProcessor = new EventProcessor();
        Flux<String> bridge = Flux.create(sink -> {
            myEventProcessor.register(
                    new MyEventListener<String>() {
                        public void onDataChunk(List<String> chunk) {
                            for(String s : chunk) {
                                sink.next(s);
                            }
                        }
                        public void processComplete() {
                            sink.complete();
                        }
                    });
        });
    }

使用create就夠了,create接收一個consumer參數:

    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

這個consumer的本質是去消費FluxSink對象。

上面的例子在MyEventListener的事件中對FluxSink對象進行消費。

使用push

push和create一樣,也支持異步操作,但是同時只能有一個線程來調用next, complete 或者 error方法,所以它是單線程的。

使用Handle

Handle和上面的三個方法不同,它是一個實例方法。

它和generate很類似,也是消費SynchronousSink對象。

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

不同的是它的參數是一個BiConsumer,是沒有返回值的。

看一個使用的例子:

    public void useHandle(){
        Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
                .handle((i, sink) -> {
                    String letter = alphabet(i);
                    if (letter != null)
                        sink.next(letter);
                });

        alphabet.subscribe(System.out::println);
    }

    public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
            return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
    }

本文的例子learn-reactive

本文作者:flydean程序那些事

本文鏈接:http://www.flydean.com/reactor-core-in-depth/

本文來源:flydean的博客

歡迎關注我的公眾號:「程序那些事」最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM