RxJava2學習筆記(1)


作為github上star數極高的響應式編程java擴展類庫,rxjava是啥就不多說了,網上能查到一堆介紹,下面是一些學習記錄:

前提依賴:

compile 'io.reactivex.rxjava2:rxjava:2.1.9'

一、Observable

1.1 hello world

rxjava中的核心思路是“生產者-消費者”模型,生產者的java類通常用xxxEmitter命名,字面意思:發射器,可以想象為一個機關槍,一直biu biu biu的向外發射信息,另一端則是靶子(也就是消費者),在不停的接收。不過要注意的是:rxjava中,能接收子彈的靶子,可以同時有多個。

        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("a");
            emitter.onNext("b");
            emitter.onNext("c");
            emitter.onComplete();
        });

        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("subscribe=>");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s + " ");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        };

        observable.subscribe(observer1);

 輸出:

subscribe=>
a 
b 
c 
complete

注:最后一行,也可以改成

observable.subscribe(observer1);
observable.subscribe(observer1);

這樣就相當於2個靶子在接子彈了。 上面這是最正統的寫法,官方推薦使用鏈式編程寫法:

        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("a");
            emitter.onNext("b");
            emitter.onNext("c");
            emitter.onComplete();
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("subscribe=>");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s + " ");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });

1.2 onComplete事件

emitter發送onComplete消息后,挨打的靶子(消費者),就不再繼續處理了,不管后面emitter是否還繼續發送。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    emitter.onNext("c");
    emitter.onComplete(); //這里主動通知消費者complete
    System.out.println("complete后,emitter還繼續發射...");
    emitter.onNext("d");
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println(e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 輸出:

subscribe=>
a 
b 
c 
complete
complete后,emitter還繼續發射...

注:onComplete之后,emitter再次發送的"d",消費者已經不再處理了。

1.3 onError事件

onError即可以在emitter(生產者)端報錯,也可以在靶子(消費者)上報錯,不管哪一端發生error,消費者就停止處理了。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onError(new Throwable("emitter報了個錯!"));
    System.out.println("准備發送c");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println(e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 輸出:

subscribe=>
a 
emitter報了個錯!
准備發送c

下面模擬下消費者處理時,發生異常:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        if (s.equals("b")) {
            int a = 0;
            int b = 1;
            System.out.println((b / a));
        }
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("error:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

輸出:

subscribe=>
a
error:/ by zero

1.4 disposable

如果消費者主動dispose()后,相當於就解除了生產者-消費者的關系。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    System.out.println("准備發送c");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {

    Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        disposable = d;
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        if (s.equals("b")) {
            disposable.dispose();
        }
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("error:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 上面的代碼,消費者在遇到b時,主動切斷了與生產者的關聯,emitter再發送的d,消費者就不處理了,輸出:

subscribe=>
a 
b 
准備發送c

1.5 大道至簡

如果消費者只關心onNext的處理部分,其它無所謂,上面這一堆代碼,可以簡化為一行:

Observable.fromArray("a", "b", "c").subscribe(c -> System.out.println(c + " "));

 輸出:

a 
b 
c 

最后再來一個示例:把3個單詞拼成一句話,而且每個單詞處理成“首字母大寫”的風格。

Observable.fromArray("I", "AM", "CHINESE")
        .map(c -> c.substring(0, 1).toUpperCase() + c.substring(1).toLowerCase())
        .subscribe(c -> System.out.print(c + " "));

輸出:

I Am Chinese 

 

參考:

http://www.vogella.com/tutorials/RxJava/article.html

http://www.cnblogs.com/aademeng/articles/7462540.html

https://www.jianshu.com/c/299d0a51fdd4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM