RxJava線程切換——ObserveOn和SubscribeOn的區別


RxJava很優勢的一個方面就是他的線程切換,基本是依靠ObserveOn和SubscribeOn這兩個操作符來完成的。

先來看看什么是ObserveOn和SubscribeOn,官方對他們的定義是這樣的:

  • ObserveOn

specify the Scheduler on which an observer will observe this Observable
指定一個觀察者在哪個調度器上觀察這個Observable

  • SubscribeOn

specify the Scheduler on which an Observable will operate
指定Observable自身在哪個調度器上執行

 
官方的圖例

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

從上面的圖例中可以看出,SubscribeOn這個操作符指定的是Observable自身在哪個調度器上執行,而且跟調用的位置沒有關系。而ObservableOn則是指定一個觀察者在哪個調度器上觀察這個Observable,當每次調用了ObservableOn這個操作符時,之后都會在選擇的調度器上進行觀察,直到再次調用ObservableOn切換了調度器。
那么,如果多次調用SubscribeOn,會有什么效果呢?寫個例子測試一下就知道了。

      Observable.just(1) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-1:"+Thread.currentThread().getName()); return integer; } }) .subscribeOn(Schedulers.newThread()) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-2:"+Thread.currentThread().getName()); return integer; } }) .subscribeOn(Schedulers.io()) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-3:"+Thread.currentThread().getName()); return integer; } }) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.i(TAG, "subscribe:"+Thread.currentThread().getName()); } }); 

在例子用,我多次調用了subscribeOn操作符,並且在每個map操作符中打印了當前線程的名稱。


 
多次調用SubscribeOn

從打印的日志中可以看出,只有第一次調用SubscribeOn時選擇的調度器.subscribeOn(Schedulers.newThread())有作用,而后來選擇的都沒有作用。這說明了SubscribeOn這個操作符,與調用的位置無關,而且只有第一次調用時會指定Observable自己在哪個調度器執行。

其實有一種情況特殊,就是在DoOnSubscribe操作符之后調用,可以使DoOnSubscribe在指定的調度器中執行。

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception { Log.i(TAG, "create:" + Thread.currentThread().getName()); observableEmitter.onNext(1); observableEmitter.onComplete(); } }); observable.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map:" + Thread.currentThread().getName()); return integer; } }) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { Log.i(TAG, "doOnSubscribe:" + Thread.currentThread().getName()); } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.i(TAG, "subscribe:" + Thread.currentThread().getName()); } }); 
 
切換DoOnSubscribe調度器

由此可見,SubscribeOn不僅可以指定Observable自身的調度器,也可以指定DoOnSubscribe執行的調度器。

我們知道了多次調用SubscribeOn並不會起作用,那么多次調用ObservableOn呢?還是同樣的例子,將所有SubscribeOn換為ObservableOn試試。

      Observable.just(1) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-1:"+Thread.currentThread().getName()); return integer; } }) .observeOn(Schedulers.newThread()) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-2:"+Thread.currentThread().getName()); return integer; } }) .observeOn(Schedulers.io()) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { Log.i(TAG, "map-3:"+Thread.currentThread().getName()); return integer; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.i(TAG, "subscribe:"+Thread.currentThread().getName()); } }); 

從打印的日志中可以看出,每次調用了ObservableOn操作符之后,之后的Map和Subscribe操作符都會發生在指定的調度器中,實現了線程的切換。


 
多次調用ObservableOn

平時我們見到最多的使用場景,可能就是官方圖例上描述的那樣,配合Retrofit從網絡拿回數據、在io線程或子線程執行某些耗時操作,比如一些變換操作,然后再切換到主線程去更新UI,可以用RxJava的線程切換很方便的實現。但是這只是很簡單的應用場景之一,RxJava能做的遠不止這些,我仍在不斷的摸索中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM