rxjava源碼解析:操作符subscribeOn


1.subscribe流程

先看一個簡單的例子:

//標記為Observable1
Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello world!");
            subscriber.onCompleted();
        }
    })
    .subscribeOn(Schedulers.newThread())

    //Subscriber標記為Subscriber1
    .subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }
    });

 

subscribeOn的流程如下:

 

  1. 首先會根據原來的Observable1生成一個新的Observable<Observable<String>>我們命名為Observable2
  2. 然后調用Observable2.lift(OperatorSubscribeOn);
  3. 返回調用lift之后生成的新的Observable3. subscribeOn過程執行完畢

subscribe()過程跟之前分析的的一樣。 注意:

  1. Observable<Observable<String>>2中onSubscribe中的call()方法,返回的是Observable1
  2. OperatorSubscribeOn中生成的Subscriber2對象負責把Observable<Observable<String>>2發射的Observable1跟Subsriber1關聯調用

現在分析OperatorSubscribeOn生成的Subscriber2

 

  1. Subscriber2中調用scheduler創建不同的調度器的worker
  2. worker調用schedule()去執行Observable1的subscribe()
  3. Observable1的subscribe()方法中的Subscriber<String>調用了Subscriber1<String>中的onNext() onCompleted()等。

完畢

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM