| Markdown版本筆記 | 我的GitHub首頁 | 我的博客 | 我的微信 | 我的郵箱 |
|---|---|---|---|---|
| MyAndroidBlogs | baiqiantao | baiqiantao | bqt20094 | baiqiantao@sina.com |
RxJava 操作符 on和doOn 線程切換 調度 Schedulers 線程池 MD
目錄
RxJava 線程池
正常的流程
切換線程對 on** 方法的影響
指定被觀察者發布事件的線程
指定訂閱者(觀察者)接收事件的線程
線程切換
切換線程對 doOn** 的影響
切換線程的操作放在最上面
向下移動1下
向下移動2下
向下移動3下
別人總結的規律
RxJava 線程池
RxJava中的多線程操作主要是由強大的Scheduler集合提供的。在RxJava中,我們無法直接訪問或操作線程。如果想要使用線程的話,必須要通過內置的Scheduler來實現。如果你需要在特定的線程中執行任務的話,我們就需要此選擇恰當的Scheduler,Scheduler接下來會從它的池中獲取一個可用的線程,並基於該線程執行任務。
在RxJava框架中有多種類型的Scheduler,但是這里比較有技巧的一點就是為合適的工作選擇恰當的Scheduler。如果你沒有選擇恰當的Scheduler的話,那么任務就無法最優地運行,所以接下來,我們嘗試理解每一個Scheduler。
- Schedulers.io()
這是由
無邊界線程池作為支撐的一個Scheduler,它適用於非CPU密集的I/O工作,比如訪問文件系統、執行網絡調用、訪問數據庫等等。
這個Scheduler是沒有限制的,它的線程池可以按需一直增長。
注意:在使用無邊界線程池支撐的Scheduler時,我們要特別小心,因為它有可能會導致線程池無限增長,使系統中出現大量的線程。 - Schedulers.computation()
這個Scheduler用於執行
CPU密集的工作,比如處理大規模的數據集、圖像處理等等。它由一個有界的線程池作為支撐,線程的最大數量就是可用的處理器數量。
因為這個Scheduler只適用於CPU密集的任務,我們希望限制線程的數量,這樣的話,它們不會彼此搶占CPU時間或出現線程餓死的現象。 - Schedulers.newThread()
這個Scheduler 每次都會創建一個
全新的線程來完成一組工作。它不會從任何線程池中受益,線程的創建和銷毀都是很昂貴的,所以你需要非常小心,不要衍生出太多的線程,導致服務器系統變慢或出現內存溢出的錯誤。
理想情況下,你應該很少使用這個Scheduler,它大多用於在一個完全分離的線程中開始一項長時間運行、隔離的一組任務。 - Schedulers.single()
這個Scheduler是RxJava 2新引入的,它的背后只有一個線程作為支撐,只能按照有序的方式執行任務。如果你有一組后台任務要在App的不同地方執行,但是同時只能承受一個任務執行的話,那么這個Scheduler就可以派上用場了。
- Schedulers.from(Executor executor)
我們可以使用它創建自定義的Scheduler,它是由我們自己的
Executor作為支撐的。在有些場景下,我們希望創建自定義的Scheduler為App執行特定的任務,這些任務可能需要自定義的線程邏輯。
假設,我們想要限制App中並行網絡請求的數量,那么我們就可以創建一個自定義的Scheduler,使其具有一個固定線程池大小的Executor:Scheduler.from(Executors.newFixedThreadPool(n)),然后將其應用到代碼中所有網絡相關的Observable上。 - AndroidSchedulers.mainThread()
這是一個特殊的Scheduler,它無法在核心RxJava庫中使用,要使用它,必須要借助
RxAndroid擴展庫。這個Scheduler對Android App特別有用,它能夠在應用的主線程中執行基於UI的任務。
默認情況下,它會在應用主線程關聯的looper中進行任務排隊,但是它有一個其他的變種,允許我們以API的形式使用任意的Looper:AndroidSchedulers.from(Looper looper)。
正常的流程
正常的流程為:
doOnSubscribe、onSubscribe、【create】、doOnNext、onNext、doOnComplete、onComplete
例如最基礎的形式:
Observable.create(emitter -> {
log("【開始create】");
emitter.onNext("救命啊");
emitter.onComplete();
log("【結束create】");
})
.doOnNext(s -> log("【doOnNext】"))
.doOnError(e -> log("【doOnError】"))
.doOnComplete(() -> log("【doOnComplete】"))
.doOnSubscribe(disposable -> log("【doOnSubscribe】"))
.subscribe(o -> log("【onNext】"), e -> log("【onError】"), () -> log("【onComplete】"), d -> log("【onSubscribe】"));
打印方法為:
private void log(String s) {
String date = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss SSS", Locale.getDefault()).format(new Date());
Log.i("bqt", s + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
}
打印結果為:
【doOnSubscribe】2018.09.15 13:38:01 163,true
【onSubscribe】2018.09.15 13:38:01 165,true
【開始create】2018.09.15 13:38:01 166,true
【doOnNext】2018.09.15 13:38:01 167,true
【onNext】2018.09.15 13:38:01 168,true
【doOnComplete】2018.09.15 13:38:01 171,true
【onComplete】2018.09.15 13:38:01 172,true
【結束create】2018.09.15 13:38:01 173,true
將 Observable 換成下面的幾種形式后,效果都和上面基本是一樣的:
Observable.just("救命啊") //相應的只是沒有打印【開始create】和【結束create】
Observable.create(emitter -> {
log("【開始create】");
emitter.onNext("救命啊"); //相應的只是沒有【doOnComplete】和【onComplete】
log("【結束create】");
})
Observable.create(emitter -> {
log("【開始create】");
emitter.onNext("救命啊");
emitter.onNext("HELP"); //相應的有兩組 **Next,順序為【doOnNext】【onNext】【doOnNext】【onNext】
emitter.onComplete();
log("【結束create】");
})
Observable.create(emitter -> {
log("【開始create】");
emitter.onNext("救命啊");
emitter.onError(new RuntimeException("game over")); //相應的是【doOnError】和【onError】
log("【結束create】");
})
切換線程對 on** 方法的影響
通過上面的打印結果,我們可以知道,如果沒有添加切換線程的操作,則以上所有方法默認都執行在主線程中。
先說一個極端情況,如果我們整個在一個子線程中執行以上邏輯,例如:
new Thread(() -> ... ).start(); //子線程
則以上所有方法都是在子線程中執行的。
下面我們考慮切換線程對【on**】方法的影響。
指定被觀察者發布事件的線程
Observable.create(...)
.subscribeOn(Schedulers.io()) //指定被觀察者發布事件的線程:
//.observeOn(Schedulers.io()) //如果訂閱者接收事件的線程和被觀察者發布事件的線程相同,則可以不必指定接收事件的線程
.subscribe(...);
打印結果為:
【onSubscribe】2018.09.15 14:36:44 010,true
【開始create】2018.09.15 14:36:44 011,false
【onNext】2018.09.15 14:36:44 012,false
【onComplete】2018.09.15 14:36:44 015,false
【結束create】2018.09.15 14:36:44 015,false
這種情況下,其線程效果等價於:
Observable.create(emitter -> new Thread(() -> {...}).start()) //將 create 中的所有操作放到了子線程
.subscribe(...);
指定訂閱者(觀察者)接收事件的線程
Observable.create(...)
//.subscribeOn(AndroidSchedulers.mainThread()) //因為默認就在主線程,所以可以不指定
.observeOn(Schedulers.io()) //指定訂閱者(觀察者)接收事件的線程
.subscribe(...);
打印結果為:
【onSubscribe】2018.09.15 14:25:58 682,true
【開始create】2018.09.15 14:25:58 697,true
【結束create】2018.09.15 14:25:58 699,true //此案例中,由於【create】和后面幾個回調不在一個線程中,因為存在線程競爭的情況,所以【結束create】的回調時機是不確定的,有可能在【onNext】前也可能在【onNext】后,甚至有可能在【onComplete】之后
【onNext】2018.09.15 14:25:58 700,false
【onComplete】2018.09.15 14:25:58 701,false
這種情況下,其線程效果等價於:
Observable.create(emitter -> {
log("【開始create】"); //主線程
new Thread(() -> {
emitter.onNext("救命啊"); //子線程
emitter.onComplete(); //子線程
}).start();
log("【結束create】"); //主線程
})
.subscribe(...);
線程切換
在被觀察者發布事件時將線程切換到一個線程,在訂閱者接收事件時將線程切換到另一個線程
Observable.create(...)
.subscribeOn(Schedulers.io()) //指定被觀察者發布事件的線程:
.observeOn(AndroidSchedulers.mainThread()) //指定訂閱者(觀察者)接收事件的線程
.subscribe(...);
打印結果為:
【onSubscribe】2018.09.15 14:12:11 711,true
【開始create】2018.09.15 14:12:11 713,false
【結束create】2018.09.15 14:12:11 714,false //此案例中,由於【create】和后面幾個回調不在一個線程中,因為存在線程競爭的情況,所以【結束create】的回調時機是不確定的,有可能在【onNext】前也可能在【onNext】后,甚至有可能在【onComplete】之后
【onNext】2018.09.15 14:12:11 723,true
【onComplete】2018.09.15 14:12:11 724,true
這種情況下,其線程效果等價於:
Observable.create(emitter -> new Thread(() -> {
log("【開始create】"); //子線程
runOnUiThread(() -> {
emitter.onNext("救命啊"); //主線程
emitter.onComplete(); //主線程
});
log("【結束create】"); //子線程
}).start())
.subscribe(...);
切換線程對 doOn** 的影響
注意:下面這些沒必要去記,也根本不可能記住,只需要知道,這幾個 doOn 方法的執行時機和對應的 on 方法相比並不穩定,且這幾個 doOn 方法執行時所在線程也並不穩定即可。
切換線程的操作放在最上面
Observable.create(...)
.subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
.observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
.doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
.doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
.doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
.subscribe(...);
打印結果為:
【doOnSubscribe】2018.08.26 18:05:23 799,true
【onSubscribe】2018.08.26 18:05:23 800,true //【沒有改變過】
【create】2018.08.26 18:05:23 803,false //【沒有改變過】
【doOnNext】2018.08.26 18:05:23 806,true
【onNext】2018.08.26 18:05:23 807,true //【沒有改變過】
【doOnComplete】 2018.08.26 18:05:23 807,true
【onComplete】2018.08.26 18:05:23 807,true //【沒有改變過】
向下移動1下
Observable.create(...)
.doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
.subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
.observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
.doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
.doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
.subscribe(...);
打印結果為:
【doOnSubscribe】2018.08.26 18:50:32 149,true
【onSubscribe】2018.08.26 18:50:32 150,true //【沒有改變過】
【create】2018.08.26 18:50:32 158,false //【沒有改變過】
【doOnNext】2018.08.26 18:50:32 158,false
【onNext】2018.08.26 18:50:32 172,true //【沒有改變過】
【doOnComplete】 2018.08.26 18:50:32 172,true
【onComplete】2018.08.26 18:50:32 172,true //【沒有改變過】
向下移動2下
Observable.create(...)
.doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
.doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
.subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
.observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
.doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
.subscribe(...);
打印結果為:
【doOnSubscribe】2018.08.26 18:58:01 329,true
【onSubscribe】2018.08.26 18:58:01 349,true //【沒有改變過】
【create】2018.08.26 18:58:01 359,false //【沒有改變過】
【doOnNext】2018.08.26 18:58:01 359,false
【doOnComplete】 2018.08.26 18:58:01 360,false //注意順序已經和之前的不一樣了
【onNext】2018.08.26 18:58:01 371,true //【沒有改變過】
【onComplete】2018.08.26 18:58:01 372,true //【沒有改變過】
向下移動3下
Observable.create(...)
.doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
.doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
.doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
.subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
.observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
.subscribe(...);
打印結果為:
【onSubscribe】2018.08.26 19:01:51 196,true //【沒有改變過】
【doOnSubscribe】2018.08.26 19:01:51 200,false//注意順序已經和之前的不一樣了
【create】2018.08.26 19:01:51 201,false //【沒有改變過】
【doOnNext】2018.08.26 19:01:51 201,false
【doOnComplete】 2018.08.26 19:01:51 201,false//注意順序已經和之前的不一樣了
【onNext】2018.08.26 19:01:51 227,true //【沒有改變過】
【onComplete】2018.08.26 19:01:51 228,true //【沒有改變過】
別人總結的規律
- doOnSubscribe()與onStart()相似,均在代碼調用時就會回調。但doOnSubscribe()能夠通過subscribeOn()操作符改變運行的線程且越在后面運行越早;
- doOnSubscribe()后面緊跟subscribeOn(),那么doOnSubscribe()將於subscribeOn()指定的線程保持一致。假設doOnSubscribe()在subscribeOn()之后,他的運行線程得再看情況分析;
- doOnSubscribe()假設在observeOn()后(注意:observeon()后沒有緊接着再調用subcribeon()方法)。那么doOnSubscribe的運行線程就是main線程,與observeon()指定的線程沒有關系。
- 假設在observeOn()之前沒有調用過subcribeOn()方法,observeOn()之后subscribe面()方法之前調用subcribeOn()方法,那么他會改變整個代碼流程中全部調用doOnSubscribe()方法所在的線程。同一時候也會改變observeOn()方法之前全部操作符所在的線程(有個重要前提:不滿足第2點的條件,也就是doOnSubscribe()后面沒有調用subscribeOn()方法)。
- 假設在observeOn()前后都沒有調用過subcribeOn()方法,那么整個代碼流程中的doOnSubscribe()運行在main線程,與observeOn()指定的線程無關。同一時候observeOn()之前的操作符也將運行在main線程,observeOn()之后的操作符與observeOn()指定的線程保持一致。
再次強調:上面這些沒必要去記,也根本不可能記住,只需要知道,這幾個 doOn** 方法的執行時機和對應的 on** 方法相比並不穩定,且這幾個 doOn** 方法執行時所在線程也並不穩定即可。
2018-9-15
