RxJava2|Single, Maybe 和 Completable


RxJava2 Single, Maybe 和 Completable

原文: https://www.jianshu.com/p/66a55abbadef

參考而做的二次實現. 並重排版一下他的總結, 在此做個記錄好回顧. 本文沒有原文那么詳盡, 建議閱讀原文.

前述

java-1.8

maven-3

rxjava-2.2.3

ObservaleFlowable都是用來發射數據流的(0..N), 然而出現了三個只有1個數據的基類: (https://www.jianshu.com/p/66a55abbadef)

  • Single - 只發射一條單一的數據,或者一條異常通知, 不能發射完成通知,其中數據與通知只能發射一個。
  • Compoletable - 只發射一條完成通知,或者一條異常通知,不能發射數據,其中完成通知與異常通知只能發射一個
  • Maybe - 可發射一條單一的數據,以及發射一條完成通知,或者一條異常通知,其中完成通知和異常通知只能發射一個,發射數據只能在發射完成通知或者異常通知之前,否則發射數據無效。

示例(Single簡單使用)

Single操作實現類 - HelloSingle.java

package yag;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloSingle {

    public void helloSingle(){
        Single
                .create((SingleOnSubscribe<Integer>) singleEmitter -> {
                    // 發射
                    singleEmitter.onSuccess(1);
                    singleEmitter.onSuccess(2);
                })
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloSingle helloSingle = new HelloSingle();
        helloSingle.helloSingle();
    }
}

執行結果

1

Process finished with exit code 0

只接收第一條信息.

小結

發送異常信息 - 使用onError()方法.

發射器接口SingleEmitter:

1、方法void onSuccess(T t)用來發射一條單一的數據,且一次訂閱只能調用一次,不同於Observale的發射器ObservableEmitter中的void onNext(@NonNull T value)方法,在一次訂閱中,可以多次調用多次發射。
2、方法void onError(Throwable t)等同於ObservableEmitter中的void onError(@NonNull Throwable error)用來發射一條錯誤通知
3、SingleEmitter中沒有用來發射完成通知的void onComplete()方法。
方法onSuccessonError只可調用一個,若先調用onError則會導致onSuccess無效,若先調用onSuccess,則會拋出io.reactivex.exceptions.UndeliverableException異常。

觀察者SingleObserver:

方法void onSubscribe(Disposable d)等同於Observer中的void onSubscribe(Disposable d)
方法void onSuccess(T t)類似於Observer中的onNext(T t)用來接收Single發的數據。
方法void onError(Throwable e)等同於Observer中的void onError(Throwable e)用來處理異常通知。
沒有用來處理完成通知的方法void onComplete()

示例(Completable簡單使用)

Completable操作實現類 - HelloCompletable.java

package yag;

import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloCompletable {

    public void helloCompletable(){
        Completable
                .create(new CompletableOnSubscribe() {
                    @Override
                    public void subscribe(CompletableEmitter completableEmitter) throws Exception {
                        completableEmitter.onComplete();
                    }
                })
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("執行完成");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloCompletable helloCompletable = new HelloCompletable();
        helloCompletable.helloCompletable();
    }
}

執行結果

執行完成

Process finished with exit code 0

補充

onError() (發射異常信息), 由CompletableObserveronError().

示例( Maybe簡單使用)

Maybe操作實現類 - HelloMaybe.java

package yag;

import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloMaybe {
    
    public void helloMaybe(){
        Maybe
                .create(new MaybeOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(MaybeEmitter<Integer> maybeEmitter) {
                        maybeEmitter.onSuccess(1);
                        maybeEmitter.onComplete();
                    }
                })
                .subscribe(new MaybeObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("執行完成");
                    }
                });
    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){

        HelloMaybe helloMaybe = new HelloMaybe();
        helloMaybe.helloMaybe();
    }
}

執行結果

1

Process finished with exit code 0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM