RxJava2 Single, Maybe 和 Completable
原文: https://www.jianshu.com/p/66a55abbadef
參考而做的二次實現. 並重排版一下他的總結, 在此做個記錄好回顧. 本文沒有原文那么詳盡, 建議閱讀原文.
前述
java-
1.8
maven-
3
rxjava-
2.2.3
io.reactivex.Single
: a flow of exactly 1 item or an error,io.reactivex.Completable
: a flow without items but only a completion or error signal,io.reactivex.Maybe
: a flow with no items, exactly one item or an error.
像
Observale
和Flowable
都是用來發射數據流的(0..N), 然而出現了三個只有1個數據的基類: (https://www.jianshu.com/p/66a55abbadef)
Single
- 只發射一條單一的數據,或者一條異常通知, 不能發射完成通知,其中數據與通知只能發射一個。Compoletable
- 只發射一條完成通知,或者一條異常通知,不能發射數據,其中完成通知與異常通知只能發射一個Maybe
- 可發射一條單一的數據,以及發射一條完成通知,或者一條異常通知,其中完成通知和異常通知只能發射一個,發射數據只能在發射完成通知或者異常通知之前,否則發射數據無效。
示例(Single
簡單使用)
Single
操作實現類 - HelloSingle.java
package yag;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloSingle {
public void helloSingle(){
Single
.create((SingleOnSubscribe<Integer>) singleEmitter -> {
// 發射
singleEmitter.onSuccess(1);
singleEmitter.onSuccess(2);
})
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
執行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloSingle helloSingle = new HelloSingle();
helloSingle.helloSingle();
}
}
執行結果
1
Process finished with exit code 0
只接收第一條信息.
小結
發送異常信息 - 使用onError()
方法.
發射器接口SingleEmitter
:
1、方法void onSuccess(T t)
用來發射一條單一的數據,且一次訂閱只能調用一次,不同於Observale的發射器ObservableEmitter中的void onNext(@NonNull T value)方法,在一次訂閱中,可以多次調用多次發射。
2、方法void onError(Throwable t)
等同於ObservableEmitter
中的void onError(@NonNull Throwable error)
用來發射一條錯誤通知
3、SingleEmitter
中沒有用來發射完成通知的void onComplete()
方法。
方法onSuccess
與onError
只可調用一個,若先調用onError
則會導致onSuccess
無效,若先調用onSuccess
,則會拋出io.reactivex.exceptions.UndeliverableException
異常。
觀察者SingleObserver
:
方法void onSubscribe(Disposable d)
等同於Observer中的void onSubscribe(Disposable d)
。
方法void onSuccess(T t)
類似於Observer
中的onNext(T t)
用來接收Single發的數據。
方法void onError(Throwable e)
等同於Observer中的void onError(Throwable e)
用來處理異常通知。
沒有用來處理完成通知的方法void onComplete()
示例(Completable
簡單使用)
Completable
操作實現類 - HelloCompletable.java
package yag;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloCompletable {
public void helloCompletable(){
Completable
.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter completableEmitter) throws Exception {
completableEmitter.onComplete();
}
})
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onComplete() {
System.out.println("執行完成");
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
執行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloCompletable helloCompletable = new HelloCompletable();
helloCompletable.helloCompletable();
}
}
執行結果
執行完成
Process finished with exit code 0
補充
onError()
(發射異常信息), 由CompletableObserver
的onError()
.
示例( Maybe
簡單使用)
Maybe
操作實現類 - HelloMaybe.java
package yag;
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloMaybe {
public void helloMaybe(){
Maybe
.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> maybeEmitter) {
maybeEmitter.onSuccess(1);
maybeEmitter.onComplete();
}
})
.subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("執行完成");
}
});
}
}
執行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloMaybe helloMaybe = new HelloMaybe();
helloMaybe.helloMaybe();
}
}
執行結果
1
Process finished with exit code 0