使用 RxJava 的正確姿勢


最近在使用 RxJava 時遇到了一些比較詭異的問題,排查的過程中做了點研究,發現平時大家的用法多多少少都有些問題。有些地方存在隱患,有些用法不夠簡練,我把這些問題簡單做一下分類和總結,供大家參考。

數據源類型選擇

RxJava2 中的數據源類型有5種,分別是 Observable,Flowable,Single,Maybe 和 Completable,它們的區別如下,看到有些同學只用 Observable,其實這並不是個很好的習慣。

類別 特點
Observable 多個數據,不支持背壓
Flowable 多個數據,支持背壓
Single 一個數據
Maybe 一個或沒有數據
Completable 沒有數據,只有結束信號

我們舉數據庫操作的例子,例如在 Repository 中有一個方法,根據一本書的序列號去查詢這本書的名稱:

    private String getBookName(int serialNumber) throws InterruptedException {
        Thread.sleep(1000); // 模擬耗時操作
        return "Pride And Prejudice";
    }

嘗試把它轉化成一個 Rx 風格的方法,如果使用 Observable 是這樣的:

    public Observable<String> getBookNameObservable(int serialNumber) {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String name = getBookName(serialNumber);
                if (name != null) {
                    emitter.onNext(name);
                    emitter.onComplete();
                } else {
                    emitter.onError(new NullPointerException("Not Found!"));
                }
            }
        });
    }

訪問數據庫確定有且只有一次返回結果,所以可以改成Single:

    public Single<String> getBookNameSingle(int serialNumber) {
        return Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
                String name = getBookName(serialNumber);
                if (name != null) {
                    emitter.onSuccess(name);
                } else {
                    emitter.onError(new NullPointerException("Not Found!"));
                }
            }
        });
    }

這樣寫的好處是:

  • 語義明確,看到是 Single 類型就知道只有一個返回值
  • 防止遺漏 onComplete 調用,保證流正常結束
  • 寫起來更簡潔一些,對應的訂閱者也是
  • Single 和 Completable 是使用最廣泛的數據源,它們的語義簡單,操作符更豐富

我們項目中會有這樣一些常用的場景:

  • 持續監聽某個組件狀態(PUSH類型),監聽航線執行狀態,監聽用戶的點擊事件等,這種情況會連續返回數據,一般使用 Observable 就可以了,如果有特殊情況需要支持背壓可以考慮 Flowable,不過我目前還沒有遇到,如果推送頻率過高,使用 throttle 操作符過濾一下即可;
  • 主動獲取某個傳感器的某個狀態值(GET類型),訪問數據庫查找數據,發起網絡請求等,獲得一個返回值或者失敗信息,應該使用 Single;
  • 設置某個組件的狀態、開關(SET類型),控制某個組件執行任務(ACTION類型),數據庫增刪改等,不需要返回值,只需知道是否執行完成或者失敗,應該使用Completable;
  • 工作流比較復雜,需要一步一步完成任務,中間還需要線程轉換等,這種情況大多只需要傳遞上一個操作完成的結果或者信號,視情況使用 Single 或者 Completable 就足夠了。

裝配過程

Single.just 引發的血案

先列舉我遇到過的一個問題,和調用時序相關,類似這樣:

public class SingleJustTest {
    private static int index = 0;

    public static void main(String[] args) {
        Single<Integer> single = Single.just(index);
        index++;
        single.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("index from Single is : " + integer);
                System.out.println("real index is : " + index);
            }
        });
    }
}

原以為 integer 這個值應該和 index 相等,結果卻出乎意料:

index from Single is : 0
real index is : 1

這是為什么呢?看一下 Single.just 的實現:

    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }

SingleJust 這個類也很簡單:

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

這個對象保存了 Single.just 方法傳入的參數,並且在訂閱時傳給下游,所以上面這個 integer 的值並不會隨着 index 值變化。
要想達到目的,有兩種解決方法,使用 Single.fromCallable 或者用 Single.defer:

public class SingleJustTest {
    private static int index = 0;

    public static void main(String[] args) {
        Single<Integer> single = Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return index;
            }
        });
        index++;
        single.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("index from Single is : " + integer);
                System.out.println("real index is : " + index);
            }
        });
    }
}

public class SingleJustTest {
    private static int index = 0;

    public static void main(String[] args) {
        Single<Integer> single = Single.defer(new Callable<SingleSource<? extends Integer>>() {
            @Override
            public SingleSource<? extends Integer> call() throws Exception {
                return Single.just(index);
            }
        });
        index++;
        single.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("index from Single is : " + integer);
                System.out.println("real index is : " + index);
            }
        });
    }
}

這兩種方式都能得到正確的結果:

index from Single is : 1
real index is : 1

原理也很簡單,Single.fromCallable 是通過方法調用的方式,訂閱后執行傳入的 Callable 取得 index 的值; Single.defer 是在訂閱時才開始生成真正的數據源,所以也沒有問題。
強調一下,just 操作符(包括 Single.just / Observalbe.just / Flowable.just / Maybe.just)雖然使用方便,但是存在上述問題,除非傳入的參數是常量,否則應當避免使用。

正確創建數據源

下面是我們項目中經常看到的用法:

public class RxJavaTest {

    public static void main(String[] args) {
        getBookNameObservable(123)
                .subscribeOn(Schedulers.io())
                // 非 Android 環境演示線程調度需要使用 blockingSubscribe 方式訂閱,否則當主線程執行完后就退出了
                .blockingSubscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        log("onNext : s = " + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("onError " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        log("onComplete");
                    }
                });
    }

    private static Observable<String> getBookNameObservable(int serialNumber) {
        //先執行一系列運算,最后返回數據源
        String bookName = getBookNameFromDB(serialNumber);
        return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log("call");
                return bookName;
            }
        });
    }

    private static String getBookNameFromDB(int serialNumber) {
        try {
            log("getBookNameFromDB");
            Thread.sleep(1000); // 模擬耗時操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Pride And Prejudice";
    }

    private static void log(String msg) {
        System.out.println(msg + ", in thread: " + Thread.currentThread().getName());
    }
}

注意 getBookNameObservable 這個方法,先去查詢數據庫獲得 bookName,然后返回了一個 Observable,我們執行一下:

getBookNameFromDB, in thread: main
onSubscribe, in thread: main
call, in thread: RxCachedThreadScheduler-1
onNext : s = Pride And Prejudice, in thread: main
onComplete, in thread: main

看起來好像沒有什么問題,也得到了正確的結果,但是注意調用的順序和每個方法運行的線程:getBookNameFromDB 這個方法是耗時操作,我們本意是把它放到 io 線程去執行,但是它卻在 main 線程執行了,而且調用時機要早於 onSubscribe 方法。
為什么會這樣呢?其實很好理解,需要分清楚上面這段代碼中哪些是在裝配過程中調用,哪些是在訂閱之后調用的。getBookNameObservable 這個方法是在裝配過程中被調用,應該返回一個未訂閱的數據源,當用戶訂閱時才開始執行 getBookNameFromDB 獲取並發射數據,但現在這種寫法,getBookNameObservable 方法中 return 之前的語句都會在裝配過程中被調用,這就能解釋上面的 log 信息了。
總結一下這種寫法會帶來的問題:

  • 本該在流中執行的代碼會在訂閱之前執行,產生時序問題
  • 如果裝配完成后沒有馬上訂閱,那么訂閱時接收到的數據可能已經與真實數據不同
  • 線程調度不起作用
  • 一旦這部分代碼拋出了異常,訂閱者無法在 onError 中接收到錯誤信息

所以雖然很多地方是這么用的,結果也沒有影響,但這並不表示這種用法就是正確的,只是沒有暴露問題而已。強調一下,實現返回數據源方法時,必須第一行就是return語句,所有的操作應當包含在操作流中,這樣做能避免很多問題。

簡便操作符

此處是要講一些常用的操作符,能讓我們把非 RxJava 代碼快速轉換過來。

just

前面已經提過,雖然好用,但是只有參數為常量時才能使用。

fromCallable

前面也有演示,一般 Single 操作符使用這個會很方便,直接在 call 方法中返回 onSuccess 的值即可。Observable 和 Flowable 在只有一次 onNext 的情況下也可以用。

fromAction

Completable 專用,相當於執行完成 Action 中的代碼並且調用 onComplete,很方便。

flatMap

這個大家用的比較多了,多用於數據源類型轉換。

andThen

Completable 因為沒有返回值,所以也就沒有 flatMap 操作符,可以用 andThen 來連接下一個數據源。

doOnSuccess / doAfterSuccess / doOnNext / doOnComplete

在特定位置插入操作,有些可以對發射的值做進一步處理。

暫時先講這幾個,RxJava 的操作符很豐富,不要只會用 create 和 just,可以多了解一下,有需要去查 官方 API 文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM