作為github上star數極高的響應式編程java擴展類庫,rxjava是啥就不多說了,網上能查到一堆介紹,下面是一些學習記錄:
前提依賴:
compile 'io.reactivex.rxjava2:rxjava:2.1.9'
一、Observable
1.1 hello world
rxjava中的核心思路是“生產者-消費者”模型,生產者的java類通常用xxxEmitter命名,字面意思:發射器,可以想象為一個機關槍,一直biu biu biu的向外發射信息,另一端則是靶子(也就是消費者),在不停的接收。不過要注意的是:rxjava中,能接收子彈的靶子,可以同時有多個。
Observable<String> observable = Observable.create(emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }); Observer observer1 = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } }; observable.subscribe(observer1);
輸出:
subscribe=> a b c complete
注:最后一行,也可以改成
observable.subscribe(observer1); observable.subscribe(observer1);
這樣就相當於2個靶子在接子彈了。 上面這是最正統的寫法,官方推薦使用鏈式編程寫法:
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
1.2 onComplete事件
emitter發送onComplete消息后,挨打的靶子(消費者),就不再繼續處理了,不管后面emitter是否還繼續發送。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); //這里主動通知消費者complete System.out.println("complete后,emitter還繼續發射..."); emitter.onNext("d"); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
輸出:
subscribe=> a b c complete complete后,emitter還繼續發射...
注:onComplete之后,emitter再次發送的"d",消費者已經不再處理了。
1.3 onError事件
onError即可以在emitter(生產者)端報錯,也可以在靶子(消費者)上報錯,不管哪一端發生error,消費者就停止處理了。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onError(new Throwable("emitter報了個錯!")); System.out.println("准備發送c"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
輸出:
subscribe=> a emitter報了個錯! 准備發送c
下面模擬下消費者處理時,發生異常:
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { if (s.equals("b")) { int a = 0; int b = 1; System.out.println((b / a)); } System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
輸出:
subscribe=> a error:/ by zero
1.4 disposable
如果消費者主動dispose()后,相當於就解除了生產者-消費者的關系。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); System.out.println("准備發送c"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { Disposable disposable; @Override public void onSubscribe(@NonNull Disposable d) { disposable = d; System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { if (s.equals("b")) { disposable.dispose(); } System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
上面的代碼,消費者在遇到b時,主動切斷了與生產者的關聯,emitter再發送的d,消費者就不處理了,輸出:
subscribe=> a b 准備發送c
1.5 大道至簡
如果消費者只關心onNext的處理部分,其它無所謂,上面這一堆代碼,可以簡化為一行:
Observable.fromArray("a", "b", "c").subscribe(c -> System.out.println(c + " "));
輸出:
a b c
最后再來一個示例:把3個單詞拼成一句話,而且每個單詞處理成“首字母大寫”的風格。
Observable.fromArray("I", "AM", "CHINESE") .map(c -> c.substring(0, 1).toUpperCase() + c.substring(1).toLowerCase()) .subscribe(c -> System.out.print(c + " "));
輸出:
I Am Chinese
參考:
http://www.vogella.com/tutorials/RxJava/article.html