RXJava之鏈式調用(一)


阿里P7移動互聯網架構師進階視頻(每日更新中)免費學習請點擊:https://space.bilibili.com/474380680

Rxjava中鏈式調用怎么實現的?

        Observable.just("a")     //Observable1
                .map(new Func1<String, String>() {  //Observable2   
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
                        return s + s;
                    }
                })
                .subscribe(new Subscriber<String>() { //代碼⑥ Subscriber
                    @Override
                    public void onCompleted() {
                        System.out.print(Thread.currentThread().getName()+"\n");
                        System.out.print("completed"+"\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("error");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
    }

先說說自己的理解,若把整個鏈條看成一個整體對象,那么just創建被觀察者對象,而subscribe()里的Subscriber作為觀察者;若每一步都分開看,just()和subscribe()中間的操作符即是觀察者,又是被觀察者。

Observable中每個操作符基本都會創建出一個新的Observable;因此可以解理成后一級的操作符去觀察前一個Observable對象;以上例來說,.subscribe的Subscriber所觀察的對象就是.map返回的Observable2,而.map的Subscriber所觀察的對象就是 Observable.just("a")得到的對象Observable1;

下面介紹實現代碼,整個鏈式調用真正開始的地方是.subscribe(),我們就從這里開始。省略掉一些代碼,只看關鍵部分如下:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
      ...
      try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
            return hook.onSubscribeReturn(subscriber);
        }
        ...
}

hook.onSubscribeStart(observable, observable.onSubscribe)得到的對象就是observable.onSubscribe,而此處的observable明顯就是this,也就是上例中的observable2對象,即把subscriber傳入到了observable2里面以供其調用。

再跟着代碼進入observable2(.map操作符)的實現。其主要實現是lift和OperatorMap。如下:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

lift和OperatorMap各自干了什么事情呢?先看OperatorMap,Func1也作為構造參數傳入。關鍵代碼:

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {  //代碼②
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

這里new出了一個觀察者對象Subscriber,它實現了什么功能通過 o.onNext(transformer.call(t));即將例子中的Func1代碼執行后將結果傳入到下一層。即這里運行了Func1的代碼。

再看lift()操作符,看其返回值也就是我們定義的observable2對象。因此subscribe里的"代碼①"的call即是此處observable2里OnSubscribe的call方法;再看call方法,“代碼④”部分則是調用到了observable1對象里OnSubscribe的call方法,而“代碼③”將Func1操作動作轉變為Subscriber,通過call(o)完成對下一級Subscriber的引用。

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);  //代碼④
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

到這里“代碼④”執行,即到了observable1對象,也就是例子中 Observable.just("a")所得到對象的OnSubscribe的call()方法,如下:

  public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }
ScalarSynchronousObservable類代碼如下:

 public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
 }
 protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                /*
                 *  We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
                 *  See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
                 *  The assumption here is that when asking for a single item we should emit it and not concern ourselves with 
                 *  being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will 
                 *  filter it out (such as take(0)). This prevents us from paying the price on every subscription. 
                 */
                s.onNext(t);  //代碼⑤
                s.onCompleted();
            }

        });
        this.t = t;
    }

其中"代碼⑤"是關鍵點,t即是我們just傳入的"a",s則是代碼④傳入的st,它其實是observable2的Subscriber(觀察者),相當於observable1持有observable2的引用。通過 s.onNext(t),完成了observable1向下一層的observable2的回調,也就是Func1對象所在的Subscriber(OperatorMap),再通過 o.onNext(transformer.call(t));回到例子中“代碼⑥”,至此,整個調用鏈完成。

上面的分析比較混亂,重新梳理代碼執行流程 :
1、subscribe里,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
2、map里,通過lift()將Func1操作符生成Subserber,Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
onSubscribe.call(st); //代碼④
3、just里create(), s.onNext(t); //代碼⑤
4、map里, OperatorMap里對象, o.onNext(transformer.call(t));
5、subscribe 的Subscriber();

Observable的所有鏈式調用,知道兩個其兩個關鍵點即可梳理清楚整個數據流傳遞原理;

Observable.onSubscribe對象,完成以call方法來向上一層傳遞;
Subserber向下一層的Subserber調用;

原文鏈接:https://www.jianshu.com/p/b5ca80311746

阿里P7移動互聯網架構師進階視頻(每日更新中)免費學習請點擊:https://space.bilibili.com/474380680


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM