假設你對RxJava1.x還不是了解,能夠參考以下文章。
1. RxJava使用介紹 【視頻教程】
2. RxJava操作符
• Creating Observables(Observable的創建操作符) 【視頻教程】
• Transforming Observables(Observable的轉換操作符) 【視頻教程】
• Filtering Observables(Observable的過濾操作符) 【視頻教程】
• Combining Observables(Observable的組合操作符) 【視頻教程】
• Error Handling Operators(Observable的錯誤處理操作符) 【視頻教程】
• Observable Utility Operators(Observable的輔助性操作符) 【視頻教程】
• Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視頻教程】
• Mathematical and Aggregate Operators(Observable數學運算及聚合操作符) 【視頻教程】
• 其它如observable.toList()、observable.connect()、observable.publish()等等。 【視頻教程】
3. RxJava Observer與Subcriber的關系 【視頻教程】
4. RxJava線程控制(Scheduler) 【視頻教程】
5. RxJava 並發之數據流發射太快怎樣辦(背壓(Backpressure)) 【視頻教程】
前言
在有心課堂《RxJava之旅》中有學員留言:map和doOnSubscribe默認調度器是IO調度器,這里說錯了吧?
以下我們分析下。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用作流程開始前的初始化。然而 onStart() 由於在 subscribe() 發生時就被調用了,因此不能指定線程。而是僅僅能運行在 subscribe() 被調用時的線程。這就導致假設 onStart() 中含有對線程有要求的代碼(比如在界面上顯示一個 ProgressBar,這必須在主線程運行),將會有線程非法的風險,由於有時你無法預測 subscribe() 將會在什么線程運行。
而與 Subscriber.onStart() 相相應的。有一個方法 Observable.doOnSubscribe() 。
它和 Subscriber.onStart() 相同是在 subscribe() 調用后並且在事件發送前運行。但差別在於它能夠指定線程。
默認情況下, doOnSubscribe() 運行在 subscribe() 發生的線程;而假設在 doOnSubscribe() 之后有 subscribeOn() 的話,它將運行在離它近期的 subscribeOn() 所指定的線程。
演示樣例代碼:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 須要在主線程運行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
subscribeOn 作用於該操作符之前的 Observable 的創建操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的創建操作符總是被其之后近期的 subscribeOn 控制 。沒看懂不要緊,看以下代碼你就懂了。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?
super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("00doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { System.out.println("map1在線程" + Thread.currentThread().getName() + "中"); return integer + ""; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("11doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(new Func1<String, String>() { @Override public String call(String s) { System.out.println("map2在線程" + Thread.currentThread().getName() + "中"); return s + "1"; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("22doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext在線程" + Thread.currentThread().getName() + "中"); } });
運行結果例如以下:
22doOnSubscribe在線程RxNewThreadScheduler-1中
11doOnSubscribe在線程RxIoScheduler-3中
00doOnSubscribe在線程RxNewThreadScheduler-2中
map1在線程RxNewThreadScheduler-2中
map2在線程RxIoScheduler-2中
onNext在線程RxIoScheduler-2中
依據代碼和運行結果我總結例如以下:
- doOnSubscribe()與onStart()相似,均在代碼調用時就會回調。但doOnSubscribe()能夠通過subscribeOn()操作符改變運行的線程且越在后面運行越早;
- doOnSubscribe()后面緊跟subscribeOn(),那么doOnSubscribe()將於subscribeOn()指定的線程保持一致。假設doOnSubscribe()在subscribeOn()之后,他的運行線程得再看情況分析;
- doOnSubscribe()假設在observeOn()后(注意:observeon()后沒有緊接着再調用subcribeon()方法)。那么doOnSubscribe的運行線程就是main線程,與observeon()指定的線程沒有關系。
- 假設在observeOn()之前沒有調用過subcribeOn()方法,observeOn()之后subscribe面()方法之前調用subcribeOn()方法,那么他會改變整個代碼流程中全部調用doOnSubscribe()方法所在的線程。同一時候也會改變observeOn()方法之前全部操作符所在的線程(有個重要前提:不滿足第2點的條件,也就是doOnSubscribe()后面沒有調用subscribeOn()方法)。
- 假設在observeOn()前后都沒有調用過subcribeOn()方法,那么整個代碼流程中的doOnSubscribe()運行在main線程,與observeOn()指定的線程無關。同一時候observeOn()之前的操作符也將運行在main線程,observeOn()之后的操作符與observeOn()指定的線程保持一致。
今天就分析到這里,假設有問題請大家反饋交流。