最近在使用 RxJava 時遇到了一些比較詭異的問題,排查的過程中做了點研究,發現平時大家的用法多多少少都有些問題。有些地方存在隱患,有些用法不夠簡練,我把這些問題簡單做一下分類和總結,供大家參考。
數據源類型選擇
RxJava2 中的數據源類型有5種,分別是 Observable,Flowable,Single,Maybe 和 Completable,它們的區別如下,看到有些同學只用 Observable,其實這並不是個很好的習慣。
類別 | 特點 |
---|---|
Observable | 多個數據,不支持背壓 |
Flowable | 多個數據,支持背壓 |
Single | 一個數據 |
Maybe | 一個或沒有數據 |
Completable | 沒有數據,只有結束信號 |
我們舉數據庫操作的例子,例如在 Repository 中有一個方法,根據一本書的序列號去查詢這本書的名稱:
private String getBookName(int serialNumber) throws InterruptedException {
Thread.sleep(1000); // 模擬耗時操作
return "Pride And Prejudice";
}
嘗試把它轉化成一個 Rx 風格的方法,如果使用 Observable 是這樣的:
public Observable<String> getBookNameObservable(int serialNumber) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String name = getBookName(serialNumber);
if (name != null) {
emitter.onNext(name);
emitter.onComplete();
} else {
emitter.onError(new NullPointerException("Not Found!"));
}
}
});
}
訪問數據庫確定有且只有一次返回結果,所以可以改成Single:
public Single<String> getBookNameSingle(int serialNumber) {
return Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
String name = getBookName(serialNumber);
if (name != null) {
emitter.onSuccess(name);
} else {
emitter.onError(new NullPointerException("Not Found!"));
}
}
});
}
這樣寫的好處是:
- 語義明確,看到是 Single 類型就知道只有一個返回值
- 防止遺漏 onComplete 調用,保證流正常結束
- 寫起來更簡潔一些,對應的訂閱者也是
- Single 和 Completable 是使用最廣泛的數據源,它們的語義簡單,操作符更豐富
我們項目中會有這樣一些常用的場景:
- 持續監聽某個組件狀態(PUSH類型),監聽航線執行狀態,監聽用戶的點擊事件等,這種情況會連續返回數據,一般使用 Observable 就可以了,如果有特殊情況需要支持背壓可以考慮 Flowable,不過我目前還沒有遇到,如果推送頻率過高,使用 throttle 操作符過濾一下即可;
- 主動獲取某個傳感器的某個狀態值(GET類型),訪問數據庫查找數據,發起網絡請求等,獲得一個返回值或者失敗信息,應該使用 Single;
- 設置某個組件的狀態、開關(SET類型),控制某個組件執行任務(ACTION類型),數據庫增刪改等,不需要返回值,只需知道是否執行完成或者失敗,應該使用Completable;
- 工作流比較復雜,需要一步一步完成任務,中間還需要線程轉換等,這種情況大多只需要傳遞上一個操作完成的結果或者信號,視情況使用 Single 或者 Completable 就足夠了。
裝配過程
Single.just 引發的血案
先列舉我遇到過的一個問題,和調用時序相關,類似這樣:
public class SingleJustTest {
private static int index = 0;
public static void main(String[] args) {
Single<Integer> single = Single.just(index);
index++;
single.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("index from Single is : " + integer);
System.out.println("real index is : " + index);
}
});
}
}
原以為 integer 這個值應該和 index 相等,結果卻出乎意料:
index from Single is : 0
real index is : 1
這是為什么呢?看一下 Single.just 的實現:
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "value is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
SingleJust 這個類也很簡單:
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
這個對象保存了 Single.just 方法傳入的參數,並且在訂閱時傳給下游,所以上面這個 integer 的值並不會隨着 index 值變化。
要想達到目的,有兩種解決方法,使用 Single.fromCallable 或者用 Single.defer:
public class SingleJustTest {
private static int index = 0;
public static void main(String[] args) {
Single<Integer> single = Single.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return index;
}
});
index++;
single.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("index from Single is : " + integer);
System.out.println("real index is : " + index);
}
});
}
}
public class SingleJustTest {
private static int index = 0;
public static void main(String[] args) {
Single<Integer> single = Single.defer(new Callable<SingleSource<? extends Integer>>() {
@Override
public SingleSource<? extends Integer> call() throws Exception {
return Single.just(index);
}
});
index++;
single.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("index from Single is : " + integer);
System.out.println("real index is : " + index);
}
});
}
}
這兩種方式都能得到正確的結果:
index from Single is : 1
real index is : 1
原理也很簡單,Single.fromCallable 是通過方法調用的方式,訂閱后執行傳入的 Callable 取得 index 的值; Single.defer 是在訂閱時才開始生成真正的數據源,所以也沒有問題。
強調一下,just 操作符(包括 Single.just / Observalbe.just / Flowable.just / Maybe.just)雖然使用方便,但是存在上述問題,除非傳入的參數是常量,否則應當避免使用。
正確創建數據源
下面是我們項目中經常看到的用法:
public class RxJavaTest {
public static void main(String[] args) {
getBookNameObservable(123)
.subscribeOn(Schedulers.io())
// 非 Android 環境演示線程調度需要使用 blockingSubscribe 方式訂閱,否則當主線程執行完后就退出了
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("onSubscribe");
}
@Override
public void onNext(String s) {
log("onNext : s = " + s);
}
@Override
public void onError(Throwable e) {
log("onError " + e.getMessage());
}
@Override
public void onComplete() {
log("onComplete");
}
});
}
private static Observable<String> getBookNameObservable(int serialNumber) {
//先執行一系列運算,最后返回數據源
String bookName = getBookNameFromDB(serialNumber);
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
log("call");
return bookName;
}
});
}
private static String getBookNameFromDB(int serialNumber) {
try {
log("getBookNameFromDB");
Thread.sleep(1000); // 模擬耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Pride And Prejudice";
}
private static void log(String msg) {
System.out.println(msg + ", in thread: " + Thread.currentThread().getName());
}
}
注意 getBookNameObservable 這個方法,先去查詢數據庫獲得 bookName,然后返回了一個 Observable,我們執行一下:
getBookNameFromDB, in thread: main
onSubscribe, in thread: main
call, in thread: RxCachedThreadScheduler-1
onNext : s = Pride And Prejudice, in thread: main
onComplete, in thread: main
看起來好像沒有什么問題,也得到了正確的結果,但是注意調用的順序和每個方法運行的線程:getBookNameFromDB 這個方法是耗時操作,我們本意是把它放到 io 線程去執行,但是它卻在 main 線程執行了,而且調用時機要早於 onSubscribe 方法。
為什么會這樣呢?其實很好理解,需要分清楚上面這段代碼中哪些是在裝配過程中調用,哪些是在訂閱之后調用的。getBookNameObservable 這個方法是在裝配過程中被調用,應該返回一個未訂閱的數據源,當用戶訂閱時才開始執行 getBookNameFromDB 獲取並發射數據,但現在這種寫法,getBookNameObservable 方法中 return 之前的語句都會在裝配過程中被調用,這就能解釋上面的 log 信息了。
總結一下這種寫法會帶來的問題:
- 本該在流中執行的代碼會在訂閱之前執行,產生時序問題
- 如果裝配完成后沒有馬上訂閱,那么訂閱時接收到的數據可能已經與真實數據不同
- 線程調度不起作用
- 一旦這部分代碼拋出了異常,訂閱者無法在 onError 中接收到錯誤信息
所以雖然很多地方是這么用的,結果也沒有影響,但這並不表示這種用法就是正確的,只是沒有暴露問題而已。強調一下,實現返回數據源方法時,必須第一行就是return語句,所有的操作應當包含在操作流中,這樣做能避免很多問題。
簡便操作符
此處是要講一些常用的操作符,能讓我們把非 RxJava 代碼快速轉換過來。
just
前面已經提過,雖然好用,但是只有參數為常量時才能使用。
fromCallable
前面也有演示,一般 Single 操作符使用這個會很方便,直接在 call 方法中返回 onSuccess 的值即可。Observable 和 Flowable 在只有一次 onNext 的情況下也可以用。
fromAction
Completable 專用,相當於執行完成 Action 中的代碼並且調用 onComplete,很方便。
flatMap
這個大家用的比較多了,多用於數據源類型轉換。
andThen
Completable 因為沒有返回值,所以也就沒有 flatMap 操作符,可以用 andThen 來連接下一個數據源。
doOnSuccess / doAfterSuccess / doOnNext / doOnComplete
在特定位置插入操作,有些可以對發射的值做進一步處理。
暫時先講這幾個,RxJava 的操作符很豐富,不要只會用 create 和 just,可以多了解一下,有需要去查 官方 API 文檔。