簡要:
需求了解:
Rxjava 中當數據處理派發中發生了異常 ,觀察者會接受到一個 Error 的通知,那如果不想發射這個異常的通知,自己處理掉呢?答案當然是可以的,在 Rxjava 中很多操作符可用於對 Observable 發射的 onError 通知做出響應或者從錯誤中恢復。
例如:
- 吞掉這個錯誤,切換到一個備用的Observable繼續發射數據
- 吞掉這個錯誤然后發射默認值
- 吞掉這個錯誤並立即嘗試重啟這個Observable
- 吞掉這個錯誤,在一些回退間隔后重啟這個Observable
Rxjava中常見的錯誤處理操作符有如下幾類:
- onErrorReturn():指示Observable在遇到錯誤時發射一個特定的數據
- onErrorResumeNext():指示Observable在遇到錯誤時發射一個數據序列
- onExceptionResumeNext():指示Observable遇到錯誤時繼續發射數據
- retry():指示Observable遇到錯誤時重試
- retryWhen():指示Observable遇到錯誤時,將錯誤傳遞給另一個Observable來決定是否要重新給訂閱這個Observable
1. Catch
從 onError 通知中恢復發射數據。

Catch 操作符攔截原始Observable的 onError 通知,將它替換為其它的數據項或數據序列,讓產生的Observable能夠正常終止或者根本不終止。
1.1 onErrorReturn
onErrorReturn 方法返回一個鏡像原有Observable行為的新Observable,后者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,作為替代,它會發發射一個特殊的項並調用觀察者的 onCompleted 方法。
- onErrorReturnItem(T item): 讓Observable遇到錯誤時發射一個指定的項(item)並且正常終止。

- onErrorReturn(Function<Throwable, T> valueSupplier):讓Observable遇到錯誤時通過一個函數Function來進行判斷返回指定的類型數據,並且正常終止。

示例代碼:
// 創建一個可以發射異常的Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(1 / 0); // 產生一個異常
emitter.onNext(3);
emitter.onNext(4);
}
});
/** 1. onErrorReturnItem(T item)
* 讓Observable遇到錯誤時發射一個指定的項(item)並且正常終止。
*/
observable.onErrorReturnItem(888) // 源Observable發生異常時發射指定的888數據
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(1): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(1)");
}
});
System.out.println("-----------------------------------------------");
/**
* 2. onErrorReturn(Function<Throwable, T> valueSupplier)
* 讓Observable遇到錯誤時通過一個函數Function來接受Error參數並進行判斷返回指定的類型數據,並且正常終止。
*/
observable.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
System.out.println("--> apply(1): e = " + throwable);
return 888; // 源Observable發生異常時發射指定的888數據
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(2): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(2)");
}
});
輸出:
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 888
--> onCompleted(1)
-----------------------------------------------
--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> apply(1): e = java.lang.ArithmeticException: / by zero
--> onNext(2): 888
--> onCompleted(2)
Javadoc: onErrorReturnItem(T item)
Javadoc: onErrorReturn(Function<Throwable, T> valueSupplier)
1.2 onErrorResumeNext
onErrorResumeNext 方法返回一個鏡像原有Observable行為的新Observable,后者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,作為替代,它會開始另一個指定的備用Observable。

- onErrorResumeNext(ObservableSource next): 讓Observable在遇到錯誤時開始發射第二個指定的Observable的數據序列。
- onErrorResumeNext(Function<Throwable, ObservableSource
> resumeFunction):讓Observable在遇到錯誤時通過一個函數Function來接受Error參數並進行判斷返回指定的第二個Observable的數據序列。
示例代碼:
// 創建一個可以發射異常的Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(1 / 0); // 產生一個異常
emitter.onNext(3);
emitter.onNext(4);
}
});
/**
* 3. onErrorResumeNext(ObservableSource next)
* 讓Observable在遇到錯誤時開始發射第二個指定的Observable的數據序列
*/
observable.onErrorResumeNext(Observable.just(888)) // 當發生異常的時候繼續發射此項Observable
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(3): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(3)");
}
});
System.out.println("-----------------------------------------------");
/**
* 4. onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
* 讓Observable在遇到錯誤時通過一個函數Function來接受Error參數並進行判斷返回指定的第二個Observable的數據序列
*/
observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
System.out.println("--> apply(4): " + throwable);
return Observable.just(888); // 當發生異常的時候繼續發射此項Observable
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(4)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(4): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(4): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(4)");
}
});
輸出:
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 888
--> onCompleted(3)
-----------------------------------------------
--> onSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> apply(4): java.lang.ArithmeticException: / by zero
--> onNext(4): 888
--> onCompleted(4)
Javadoc: onErrorResumeNext(ObservableSource next)
Javadoc: onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
1.3 onExceptionResumeNext
與 onErrorResumeNext 類似, onExceptionResumeNext 方法返回一個鏡像原有Observable行為的新Observable,也使用一個備用的Observable,不同的是,如果 onError 收到的 Throwable 不是一個 Exception ,它會將錯誤傳遞給觀察者的 onError 方法,不會使用備用的Observable。

解析: onExceptionResumeNext 只會對Exception類型的異常進行處理,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable 。
示例代碼:
// 創建一個可以發射異常的Observable
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("This is Throwable!")); // Throwable類型異常,直接通知觀察者
// emitter.onError(new Error("This is Error!")); // Error類型異常,直接通知觀察者
emitter.onError(new Exception("This is Exception!")); // Exception類型異常,進行處理,發送備用的Observable數據
// emitter.onNext(1 / 0); // 會產生一個ArithmeticException異常,異常會被處理,發送備用的Observable數據
emitter.onNext(3);
emitter.onNext(4);
}
});
/**
* 5. onExceptionResumeNext(ObservableSource next)
* 如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable
* 只對Exception類型的異常通知進行備用Observable處理
*/
observable1.onExceptionResumeNext(Observable.just(888))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(5)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(5): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(5): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(5)");
}
});
輸出:
--> onSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 888
--> onCompleted(5)
2. Retry
如果原始Observable遇到錯誤,重新訂閱它期望它能正常終止。
Retry 操作符不會將原始 Observable 的 onError 通知傳遞給觀察者,它會訂閱這個Observable,再給它機會無錯誤地完成它的數據序列。 Retry 總是傳遞 onNext 通知給觀察者,由於重新訂閱,可能會造成數據項重復情況。
2.1 retry()
retry():無論收到多少次 onError 通知,無參數版本的 retry 都會繼續訂閱並發射原始Observable。

注意: 因為如果遇到異常,將會無條件的重新訂閱原始的Observable,知道沒有異常的發射全部的數據序列為止。所以如果你的異常發生后重新訂閱也不會恢復正常的話,會一直訂閱下去,有內存泄露的風險。
2.2 retry(long times)
retry(long times):接受單個 count 參數的 retry 會最多重新訂閱指定的次數,如果次數超了,它不會嘗試再次訂閱,它會把最新的一個 onError 通知傳遞給它的觀察者。

2.3 retry(long times, Predicate predicate)
retry(long times, Predicate<Throwable> predicate):遇到異常后最多重新訂閱 times 次,每次重新訂閱經過函數predicate 最終判斷是否繼續重新訂閱,如果 times 到達上限或者 predicate 返回 false 中任意一個最先滿足條件,都會終止重新訂閱,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

2.4 retry(Predicate predicate)
retry(Predicate<Throwable> predicate):接受一個謂詞函數作為參數,這個函數的兩個參數是:重試次數和導致發射 onError 通知的 Throwable 。這個函數返回一個布爾值,如果返回 true , retry 應該再次訂閱和鏡像原始的Observable,如果返回 false , retry 會將最新的一個 onError 通知傳遞給它的觀察者

2.5 retry(BiPredicate predicate)
retry(BiPredicate<Integer, Throwable> predicate):遇到異常時,通過函數 predicate 判斷是否重新訂閱源Observable,並且通過參數 Integer 傳遞給 predicate 重新訂閱的次數,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

2.6 retryUntil(BooleanSupplier stop)
retryUntil(BooleanSupplier stop):重試重新訂閱,直到給定的停止函數 stop 返回 true,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

2.7 retryWhen(Function handler)
retryWhen(Function<Observable<Throwable>, ObservableSource> handler):retryWhen 和 retry 類似,區別是, retryWhen 將 onError 中的 Throwable 傳遞給一個函數,這個函數產生另一個 Observable, retryWhen 觀察它的結果再決定是不是要重新訂閱原始的Observable。如果這個Observable發射了一項數據,它就重新訂閱,如果這個Observable發射的是 onError 通知,它就將這個通知傳遞給觀察者然后終止。

實例代碼:
// flag for emitted onError times
public static int temp = 0;
// 創建可以發送Error通知的Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
if (temp <= 2) {
emitter.onError(new Exception("Test Error!"));
temp++;
}
emitter.onNext(3);
emitter.onNext(4);
}
});
/**
* 1. retry()
* 無論收到多少次onError通知, 都會去繼續訂閱並發射原始Observable。
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(1)");
}
}).retry().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 2. retry(long times)
* 遇到異常后,最多重新訂閱源Observable times次
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(2)");
}
}).retry(1) // 遇到異常后,重復訂閱的1次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(2): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(2)");
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 3. retry(long times, Predicate<Throwable> predicate)
* 遇到異常后最多重新訂閱times次,每次重新訂閱經過函數predicate最終判斷是否繼續重新訂閱
* 如果times到達上限或者predicate返回false中任意一個最先滿足條件,都會終止重新訂閱
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(3)");
}
}).retry(2, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
System.out.println("--> test(3)");
if(throwable instanceof Exception) {
return true; // 遇到異常通知后是否繼續繼續訂閱
}
return false;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(3): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(3)");
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 4. retry(Predicate<Throwable> predicate)
* 遇到異常時,通過函數predicate判斷是否重新訂閱源Observable
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(4)");
}
}).retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
if (throwable instanceof Exception) {
return true; // 遇到異常通知后是否繼續繼續訂閱
}
return false;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(4)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(4): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(4): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(4)");
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 5. retry(BiPredicate<Integer, Throwable> predicate)
* 遇到異常時,通過函數predicate判斷是否重新訂閱源Observable,並且通過參數integer傳遞給predicate重新訂閱的次數
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(5)");
}
}).retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
System.out.println("--> test(5): " + integer);
if (throwable instanceof Exception) {
return true; // 遇到異常通知后是否繼續繼續訂閱
}
return false;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(5)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(5): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(5): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(5)");
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 6. retryUntil(BooleanSupplier stop)
* 重試重新訂閱,直到給定的停止函數stop返回true
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(6)");
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
System.out.println("--> getAsBoolean(6)");
if(temp == 1){ // 滿足條件,停止重新訂閱
return true;
}
return false;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(6)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(6): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(6): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(6)");
}
});
System.out.println("---------------------------------------------");
temp = 0;
/**
* 7. retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
* 將onError中的Throwable傳遞給一個函數handler,這個函數產生另一個Observable,
* retryWhen觀察它的結果再決定是不是要重新訂閱原始的Observable。
* 如果這個Observable發射了一項數據,它就重新訂閱,
* 如果這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者然后終止。
*/
observable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> doOnSubscribe(7)");
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
System.out.println("--> apply(7)");
// 根據產生的Error的Observable是否正常發射數據來進行重新訂閱,如果發射Error通知,則直接傳遞給觀察者后終止
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (temp == 1) {
return Observable.error(throwable); // 滿足條件后,傳遞這個Error,終止重新訂閱
}
return Observable.timer(1, TimeUnit.MILLISECONDS); // 正常發射數據,可以重新訂閱
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(7)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(7): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(7): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onCompleted(7)");
}
});
System.in.read();
輸出:
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
---------------------------------------------
--> onSubscribe(2)
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onError(2): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onError(3): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(4)
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> onNext(4): 3
--> onNext(4): 4
---------------------------------------------
--> onSubscribe(5)
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 1
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 2
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 3
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 3
--> onNext(5): 4
---------------------------------------------
--> onSubscribe(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
--> onError(6): java.lang.Exception: Test Error!
---------------------------------------------
--> apply(7)
--> onSubscribe(7)
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
--> onError(7): java.lang.Exception: Test Error!
Javadoc: retry()
Javadoc: retry(long times)
Javadoc: retry(long times, Predicate<Throwable> predicate)
Javadoc: retry(Predicate<Throwable> predicate)
Javadoc: retry(BiPredicate<Integer, Throwable> predicate)
Javadoc: retryUntil(BooleanSupplier stop)
Javadoc: retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
小結
本節主要介紹了 Rxjava 中關於 Error 通知的處理,主要是在遇到異常通知時,無條件或者指定條件的去重新訂閱原始 Observable 直到沒有異常(正常發射所有數據序列)或者滿足指定的條件后終止重新訂閱,發射異常通知給觀察者。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼:
