RxJava 操作符 on和doOn 線程切換 調度 Schedulers 線程池 MD


Markdown版本筆記 我的GitHub首頁 我的博客 我的微信 我的郵箱
MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina.com

RxJava 操作符 on和doOn 線程切換 調度 Schedulers 線程池 MD


目錄

RxJava 線程池

RxJava 線程池

RxJava中的多線程操作主要是由強大的Scheduler集合提供的。在RxJava中,我們無法直接訪問或操作線程。如果想要使用線程的話,必須要通過內置的Scheduler來實現。如果你需要在特定的線程中執行任務的話,我們就需要此選擇恰當的Scheduler,Scheduler接下來會從它的池中獲取一個可用的線程,並基於該線程執行任務。

在RxJava框架中有多種類型的Scheduler,但是這里比較有技巧的一點就是為合適的工作選擇恰當的Scheduler。如果你沒有選擇恰當的Scheduler的話,那么任務就無法最優地運行,所以接下來,我們嘗試理解每一個Scheduler。

  • Schedulers.io()

    這是由無邊界線程池作為支撐的一個Scheduler,它適用於非CPU密集的I/O工作,比如訪問文件系統、執行網絡調用、訪問數據庫等等。
    這個Scheduler是沒有限制的,它的線程池可以按需一直增長。
    注意:在使用無邊界線程池支撐的Scheduler時,我們要特別小心,因為它有可能會導致線程池無限增長,使系統中出現大量的線程。

  • Schedulers.computation()

    這個Scheduler用於執行CPU密集的工作,比如處理大規模的數據集、圖像處理等等。它由一個有界的線程池作為支撐,線程的最大數量就是可用的處理器數量
    因為這個Scheduler只適用於CPU密集的任務,我們希望限制線程的數量,這樣的話,它們不會彼此搶占CPU時間或出現線程餓死的現象。

  • Schedulers.newThread()

    這個Scheduler 每次都會創建一個全新的線程來完成一組工作。它不會從任何線程池中受益,線程的創建和銷毀都是很昂貴的,所以你需要非常小心,不要衍生出太多的線程,導致服務器系統變慢或出現內存溢出的錯誤。
    理想情況下,你應該很少使用這個Scheduler,它大多用於在一個完全分離的線程中開始一項長時間運行、隔離的一組任務。

  • Schedulers.single()

    這個Scheduler是RxJava 2新引入的,它的背后只有一個線程作為支撐,只能按照有序的方式執行任務。如果你有一組后台任務要在App的不同地方執行,但是同時只能承受一個任務執行的話,那么這個Scheduler就可以派上用場了。

  • Schedulers.from(Executor executor)

    我們可以使用它創建自定義的Scheduler,它是由我們自己的Executor作為支撐的。在有些場景下,我們希望創建自定義的Scheduler為App執行特定的任務,這些任務可能需要自定義的線程邏輯。
    假設,我們想要限制App中並行網絡請求的數量,那么我們就可以創建一個自定義的Scheduler,使其具有一個固定線程池大小的Executor:Scheduler.from(Executors.newFixedThreadPool(n)),然后將其應用到代碼中所有網絡相關的Observable上。

  • AndroidSchedulers.mainThread()

    這是一個特殊的Scheduler,它無法在核心RxJava庫中使用,要使用它,必須要借助RxAndroid擴展庫。這個Scheduler對Android App特別有用,它能夠在應用的主線程中執行基於UI的任務。
    默認情況下,它會在應用主線程關聯的looper中進行任務排隊,但是它有一個其他的變種,允許我們以API的形式使用任意的Looper:AndroidSchedulers.from(Looper looper)

正常的流程

正常的流程為:

doOnSubscribe、onSubscribe、【create】、doOnNext、onNext、doOnComplete、onComplete

例如最基礎的形式:

Observable.create(emitter -> {
    log("【開始create】");
    emitter.onNext("救命啊");
    emitter.onComplete();
    log("【結束create】");
})
        .doOnNext(s -> log("【doOnNext】"))
        .doOnError(e -> log("【doOnError】"))
        .doOnComplete(() -> log("【doOnComplete】"))
        .doOnSubscribe(disposable -> log("【doOnSubscribe】"))
        .subscribe(o -> log("【onNext】"), e -> log("【onError】"), () -> log("【onComplete】"), d -> log("【onSubscribe】"));

打印方法為:

private void log(String s) {
    String date = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss SSS", Locale.getDefault()).format(new Date());
    Log.i("bqt", s + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
}

打印結果為:

【doOnSubscribe】2018.09.15 13:38:01 163,true
【onSubscribe】2018.09.15 13:38:01 165,true

【開始create】2018.09.15 13:38:01 166,true

【doOnNext】2018.09.15 13:38:01 167,true
【onNext】2018.09.15 13:38:01 168,true

【doOnComplete】2018.09.15 13:38:01 171,true
【onComplete】2018.09.15 13:38:01 172,true

【結束create】2018.09.15 13:38:01 173,true

將 Observable 換成下面的幾種形式后,效果都和上面基本是一樣的:

Observable.just("救命啊") //相應的只是沒有打印【開始create】和【結束create】
Observable.create(emitter -> {
    log("【開始create】");
    emitter.onNext("救命啊"); //相應的只是沒有【doOnComplete】和【onComplete】
    log("【結束create】");
})
Observable.create(emitter -> {
    log("【開始create】");
    emitter.onNext("救命啊");
    emitter.onNext("HELP");  //相應的有兩組 **Next,順序為【doOnNext】【onNext】【doOnNext】【onNext】
    emitter.onComplete();
    log("【結束create】");
})
Observable.create(emitter -> {
    log("【開始create】");
    emitter.onNext("救命啊");
    emitter.onError(new RuntimeException("game over")); //相應的是【doOnError】和【onError】
    log("【結束create】");
})

切換線程對 on** 方法的影響

通過上面的打印結果,我們可以知道,如果沒有添加切換線程的操作,則以上所有方法默認都執行在主線程中。

先說一個極端情況,如果我們整個在一個子線程中執行以上邏輯,例如:

new Thread(() -> ... ).start(); //子線程

則以上所有方法都是在子線程中執行的。

下面我們考慮切換線程對【on**】方法的影響。

指定被觀察者發布事件的線程

Observable.create(...)
    .subscribeOn(Schedulers.io()) //指定被觀察者發布事件的線程:
    //.observeOn(Schedulers.io()) //如果訂閱者接收事件的線程和被觀察者發布事件的線程相同,則可以不必指定接收事件的線程
    .subscribe(...);

打印結果為:

【onSubscribe】2018.09.15 14:36:44 010,true
【開始create】2018.09.15 14:36:44 011,false
【onNext】2018.09.15 14:36:44 012,false
【onComplete】2018.09.15 14:36:44 015,false
【結束create】2018.09.15 14:36:44 015,false

這種情況下,其線程效果等價於:

Observable.create(emitter -> new Thread(() -> {...}).start()) //將 create 中的所有操作放到了子線程
    .subscribe(...);

指定訂閱者(觀察者)接收事件的線程

Observable.create(...)
    //.subscribeOn(AndroidSchedulers.mainThread()) //因為默認就在主線程,所以可以不指定
    .observeOn(Schedulers.io()) //指定訂閱者(觀察者)接收事件的線程
    .subscribe(...);

打印結果為:

【onSubscribe】2018.09.15 14:25:58 682,true
【開始create】2018.09.15 14:25:58 697,true
【結束create】2018.09.15 14:25:58 699,true //此案例中,由於【create】和后面幾個回調不在一個線程中,因為存在線程競爭的情況,所以【結束create】的回調時機是不確定的,有可能在【onNext】前也可能在【onNext】后,甚至有可能在【onComplete】之后
【onNext】2018.09.15 14:25:58 700,false
【onComplete】2018.09.15 14:25:58 701,false

這種情況下,其線程效果等價於:

Observable.create(emitter -> {
    log("【開始create】"); //主線程
    new Thread(() -> {
        emitter.onNext("救命啊"); //子線程
        emitter.onComplete(); //子線程
    }).start();
    log("【結束create】"); //主線程
})
    .subscribe(...);

線程切換

在被觀察者發布事件時將線程切換到一個線程,在訂閱者接收事件時將線程切換到另一個線程

Observable.create(...)
    .subscribeOn(Schedulers.io()) //指定被觀察者發布事件的線程:
    .observeOn(AndroidSchedulers.mainThread()) //指定訂閱者(觀察者)接收事件的線程
    .subscribe(...);

打印結果為:

【onSubscribe】2018.09.15 14:12:11 711,true
【開始create】2018.09.15 14:12:11 713,false
【結束create】2018.09.15 14:12:11 714,false //此案例中,由於【create】和后面幾個回調不在一個線程中,因為存在線程競爭的情況,所以【結束create】的回調時機是不確定的,有可能在【onNext】前也可能在【onNext】后,甚至有可能在【onComplete】之后
【onNext】2018.09.15 14:12:11 723,true
【onComplete】2018.09.15 14:12:11 724,true

這種情況下,其線程效果等價於:

Observable.create(emitter -> new Thread(() -> {
    log("【開始create】"); //子線程
    runOnUiThread(() -> {
        emitter.onNext("救命啊"); //主線程
        emitter.onComplete(); //主線程
    });
    log("【結束create】"); //子線程
}).start())
    .subscribe(...);

切換線程對 doOn** 的影響

注意:下面這些沒必要去記,也根本不可能記住,只需要知道,這幾個 doOn 方法的執行時機和對應的 on 方法相比並不穩定,且這幾個 doOn 方法執行時所在線程也並不穩定即可。

切換線程的操作放在最上面

Observable.create(...)
    .subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
    .observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
    .doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
    .doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
    .doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
    .subscribe(...);

打印結果為:

【doOnSubscribe】2018.08.26 18:05:23 799,true
【onSubscribe】2018.08.26 18:05:23 800,true //【沒有改變過】
​
【create】2018.08.26 18:05:23 803,false //【沒有改變過】
​
【doOnNext】2018.08.26 18:05:23 806,true
【onNext】2018.08.26 18:05:23 807,true //【沒有改變過】
​
【doOnComplete】 2018.08.26 18:05:23 807,true
【onComplete】2018.08.26 18:05:23 807,true //【沒有改變過】

向下移動1下

Observable.create(...)
    .doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
    .subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
    .observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
    .doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
    .doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
    .subscribe(...);

打印結果為:

【doOnSubscribe】2018.08.26 18:50:32 149,true
【onSubscribe】2018.08.26 18:50:32 150,true //【沒有改變過】
​
【create】2018.08.26 18:50:32 158,false //【沒有改變過】
​
【doOnNext】2018.08.26 18:50:32 158,false
【onNext】2018.08.26 18:50:32 172,true //【沒有改變過】
​
【doOnComplete】 2018.08.26 18:50:32 172,true
【onComplete】2018.08.26 18:50:32 172,true //【沒有改變過】

向下移動2下

Observable.create(...)
    .doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
    .doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
    .subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
    .observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
    .doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
    .subscribe(...);

打印結果為:

【doOnSubscribe】2018.08.26 18:58:01 329,true
【onSubscribe】2018.08.26 18:58:01 349,true //【沒有改變過】
​
【create】2018.08.26 18:58:01 359,false //【沒有改變過】
​
【doOnNext】2018.08.26 18:58:01 359,false
【doOnComplete】 2018.08.26 18:58:01 360,false //注意順序已經和之前的不一樣了
【onNext】2018.08.26 18:58:01 371,true //【沒有改變過】
【onComplete】2018.08.26 18:58:01 372,true //【沒有改變過】

向下移動3下

Observable.create(...)
    .doOnNext(s -> Log.i("bqt", "【doOnNext】" + currentData() + "," + isMainThread()))
    .doOnComplete(() -> Log.i("bqt", "【doOnComplete】 " + currentData() + "," + isMainThread()))
    .doOnSubscribe(disposable -> Log.i("bqt", "【doOnSubscribe】" + currentData() + "," + isMainThread()))
    .subscribeOn(Schedulers.io())//指定 subscribe 時所發生的線程,發射事件的線程
    .observeOn(AndroidSchedulers.mainThread()) //指定下游 Observer 回調發生的線程,訂閱者接收事件的線程
    .subscribe(...);

打印結果為:

【onSubscribe】2018.08.26 19:01:51 196,true //【沒有改變過】
【doOnSubscribe】2018.08.26 19:01:51 200,false//注意順序已經和之前的不一樣了
​
【create】2018.08.26 19:01:51 201,false //【沒有改變過】
​
【doOnNext】2018.08.26 19:01:51 201,false
【doOnComplete】 2018.08.26 19:01:51 201,false//注意順序已經和之前的不一樣了
【onNext】2018.08.26 19:01:51 227,true //【沒有改變過】
【onComplete】2018.08.26 19:01:51 228,true //【沒有改變過】

別人總結的規律

  • doOnSubscribe()與onStart()相似,均在代碼調用時就會回調。但doOnSubscribe()能夠通過subscribeOn()操作符改變運行的線程且越在后面運行越早;
  • doOnSubscribe()后面緊跟subscribeOn(),那么doOnSubscribe()將於subscribeOn()指定的線程保持一致。假設doOnSubscribe()在subscribeOn()之后,他的運行線程得再看情況分析;
  • doOnSubscribe()假設在observeOn()后(注意:observeon()后沒有緊接着再調用subcribeon()方法)。那么doOnSubscribe的運行線程就是main線程,與observeon()指定的線程沒有關系。
  • 假設在observeOn()之前沒有調用過subcribeOn()方法,observeOn()之后subscribe面()方法之前調用subcribeOn()方法,那么他會改變整個代碼流程中全部調用doOnSubscribe()方法所在的線程。同一時候也會改變observeOn()方法之前全部操作符所在的線程(有個重要前提:不滿足第2點的條件,也就是doOnSubscribe()后面沒有調用subscribeOn()方法)。
  • 假設在observeOn()前后都沒有調用過subcribeOn()方法,那么整個代碼流程中的doOnSubscribe()運行在main線程,與observeOn()指定的線程無關。同一時候observeOn()之前的操作符也將運行在main線程,observeOn()之后的操作符與observeOn()指定的線程保持一致。

再次強調:上面這些沒必要去記,也根本不可能記住,只需要知道,這幾個 doOn** 方法的執行時機和對應的 on** 方法相比並不穩定,且這幾個 doOn** 方法執行時所在線程也並不穩定即可。

2018-9-15


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM