這可能是最好的RxJava 2.x入門教程系列專欄
文章鏈接:
這可能是最好的RxJava 2.x 入門教程(完結版)【強力推薦】
這可能是最好的RxJava 2.x 入門教程(一)
這可能是最好的RxJava 2.x 入門教程(二)
這可能是最好的RxJava 2.x 入門教程(三)
GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples
為了滿足大家的飢渴難耐,GitHub將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x所有操作符應用場景介紹和實際應用場景,后期除了RxJava可能還會增添其他東西,總之,GitHub上的Demo專為大家傾心打造。傳送門:https://github.com/nanchen2251/RxJava2Examples
一、前言
很快我們就迎來了第二期,上一期我們主要講解了 RxJava 1.x 到 2.x 的變化概覽,相信各位熟練掌握RxJava 1.x的老司機們隨便看一下變化概覽就可以上手RxJava 2.x了,但為了滿足更廣大的年輕一代司機(未來也是老司機),在本節中,我們將學習RxJava 2.x 強大的操作符章節。
【注】以下所有操作符標題都可直接點擊進入官方doc查看。
二、正題
1、Create
create操作符應該是最常見的操作符了,主要用於產生一個Obserable被觀察者對象,為了方便大家的認知,以后的教程中統一把被觀察者Observable稱為發射器(上游事件),觀察者Observer稱為接收器(下游事件)。
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4 mRxOperatorsText.append("Observable emit 1" + "\n"); 5 Log.e(TAG, "Observable emit 1" + "\n"); 6 e.onNext(1); 7 mRxOperatorsText.append("Observable emit 2" + "\n"); 8 Log.e(TAG, "Observable emit 2" + "\n"); 9 e.onNext(2); 10 mRxOperatorsText.append("Observable emit 3" + "\n"); 11 Log.e(TAG, "Observable emit 3" + "\n"); 12 e.onNext(3); 13 e.onComplete(); 14 mRxOperatorsText.append("Observable emit 4" + "\n"); 15 Log.e(TAG, "Observable emit 4" + "\n" ); 16 e.onNext(4); 17 } 18 }).subscribe(new Observer<Integer>() { 19 private int i; 20 private Disposable mDisposable; 21 22 @Override 23 public void onSubscribe(@NonNull Disposable d) { 24 mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n"); 25 Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" ); 26 mDisposable = d; 27 } 28 29 @Override 30 public void onNext(@NonNull Integer integer) { 31 mRxOperatorsText.append("onNext : value : " + integer + "\n"); 32 Log.e(TAG, "onNext : value : " + integer + "\n" ); 33 i++; 34 if (i == 2) { 35 // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件 36 mDisposable.dispose(); 37 mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n"); 38 Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n"); 39 } 40 } 41 42 @Override 43 public void onError(@NonNull Throwable e) { 44 mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n"); 45 Log.e(TAG, "onError : value : " + e.getMessage() + "\n" ); 46 } 47 48 @Override 49 public void onComplete() { 50 mRxOperatorsText.append("onComplete" + "\n"); 51 Log.e(TAG, "onComplete" + "\n" ); 52 } 53 });
輸出:
需要注意的幾點是:
1)在發射事件中,我們在發射了數值3之后,直接調用了e.onComlete(),雖然無法接收事件,但發送事件還是繼續的。
2) 另外一個值得注意的點是,在RxJava 2.x中,可以看到發射事件方法相比1.x多了一個throws Excetion,意味着我們做一些特定操作再也不用try-catch了。
3) 並且2.x 中有一個Disposable概念,這個東西可以直接調用切斷,可以看到,當它的isDisposed()返回為false的時候,接收器能正常接收事件,但當其為true的時候,接收器停止了接收。所以可以通過此參數動態控制接收事件了。
2、Map
Map基本算是RxJava中一個最簡單的操作符了,熟悉RxJava 1.x的知道,它的作用是對發射時間發送的每一個事件應用一個函數,是的每一個事件都按照指定的函數去變化,而在2.x中它的作用幾乎一致。
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4 e.onNext(1); 5 e.onNext(2); 6 e.onNext(3); 7 } 8 }).map(new Function<Integer, String>() { 9 @Override 10 public String apply(@NonNull Integer integer) throws Exception { 11 return "This is result " + integer; 12 } 13 }).subscribe(new Consumer<String>() { 14 @Override 15 public void accept(@NonNull String s) throws Exception { 16 mRxOperatorsText.append("accept : " + s +"\n"); 17 Log.e(TAG, "accept : " + s +"\n" ); 18 } 19 });
輸出:
是的,map基本作用就是將一個Observable通過某種函數關系,轉換為另一種Observable,上面例子中就是把我們的Integer數據變成了String類型。從Log日志顯而易見。
3、Zip
zip專用於合並事件,該合並不是連接(連接操作符后面會說),而是兩兩配對,也就意味着,最終配對出的Observable發射事件數目只和少的那個相同。
1 Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() { 2 @Override 3 public String apply(@NonNull String s, @NonNull Integer integer) throws Exception { 4 return s + integer; 5 } 6 }).subscribe(new Consumer<String>() { 7 @Override 8 public void accept(@NonNull String s) throws Exception { 9 mRxOperatorsText.append("zip : accept : " + s + "\n"); 10 Log.e(TAG, "zip : accept : " + s + "\n"); 11 } 12 });
1 private Observable<String> getStringObservable() { 2 return Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { 5 if (!e.isDisposed()) { 6 e.onNext("A"); 7 mRxOperatorsText.append("String emit : A \n"); 8 Log.e(TAG, "String emit : A \n"); 9 e.onNext("B"); 10 mRxOperatorsText.append("String emit : B \n"); 11 Log.e(TAG, "String emit : B \n"); 12 e.onNext("C"); 13 mRxOperatorsText.append("String emit : C \n"); 14 Log.e(TAG, "String emit : C \n"); 15 } 16 } 17 }); 18 } 19 20 private Observable<Integer> getIntegerObservable() { 21 return Observable.create(new ObservableOnSubscribe<Integer>() { 22 @Override 23 public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 24 if (!e.isDisposed()) { 25 e.onNext(1); 26 mRxOperatorsText.append("Integer emit : 1 \n"); 27 Log.e(TAG, "Integer emit : 1 \n"); 28 e.onNext(2); 29 mRxOperatorsText.append("Integer emit : 2 \n"); 30 Log.e(TAG, "Integer emit : 2 \n"); 31 e.onNext(3); 32 mRxOperatorsText.append("Integer emit : 3 \n"); 33 Log.e(TAG, "Integer emit : 3 \n"); 34 e.onNext(4); 35 mRxOperatorsText.append("Integer emit : 4 \n"); 36 Log.e(TAG, "Integer emit : 4 \n"); 37 e.onNext(5); 38 mRxOperatorsText.append("Integer emit : 5 \n"); 39 Log.e(TAG, "Integer emit : 5 \n"); 40 } 41 } 42 }); 43 }
輸出:
需要注意的是:
1) zip 組合事件的過程就是分別從發射器A和發射器B各取出一個事件來組合,並且一個事件只能被使用一次,組合的順序是嚴格按照事件發送的順序來進行的,所以上面截圖中,可以看到,1永遠是和A 結合的,2永遠是和B結合的。
2) 最終接收器收到的事件數量是和發送器發送事件最少的那個發送器的發送事件數目相同,所以如截圖中,5很孤單,沒有人願意和它交往,孤獨終老的單身狗。
4、Concat
對於單一的把兩個發射器連接成一個發射器,雖然 zip 不能完成,但我們還是可以自力更生,官方提供的 concat 讓我們的問題得到了完美解決。
1 Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6)) 2 .subscribe(new Consumer<Integer>() { 3 @Override 4 public void accept(@NonNull Integer integer) throws Exception { 5 mRxOperatorsText.append("concat : "+ integer + "\n"); 6 Log.e(TAG, "concat : "+ integer + "\n" ); 7 } 8 });
輸出:
如圖,可以看到。發射器B把自己的三個孩子送給了發射器A,讓他們組合成了一個新的發射器,非常懂事的孩子,有條不紊的排序接收。
5、FlatMap
FlatMap 是一個很有趣的東西,我堅信你在實際開發中會經常用到。它可以把一個發射器Observable 通過某種方法轉換為多個Observables,然后再把這些分散的Observables裝進一個單一的發射器Observable。但有個需要注意的是,flatMap並不能保證事件的順序,如果需要保證,需要用到我們下面要講的ConcatMap。
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4 e.onNext(1); 5 e.onNext(2); 6 e.onNext(3); 7 } 8 }).flatMap(new Function<Integer, ObservableSource<String>>() { 9 @Override 10 public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { 11 List<String> list = new ArrayList<>(); 12 for (int i = 0; i < 3; i++) { 13 list.add("I am value " + integer); 14 } 15 int delayTime = (int) (1 + Math.random() * 10); 16 return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS); 17 } 18 }).subscribeOn(Schedulers.newThread()) 19 .observeOn(AndroidSchedulers.mainThread()) 20 .subscribe(new Consumer<String>() { 21 @Override 22 public void accept(@NonNull String s) throws Exception { 23 Log.e(TAG, "flatMap : accept : " + s + "\n"); 24 mRxOperatorsText.append("flatMap : accept : " + s + "\n"); 25 } 26 });
輸出:
一切都如我們預期中的有意思,為了區分concatMap(下一個會講),我在代碼中特意動了一點小手腳,我采用一個隨機數,生成一個時間,然后通過delay(后面會講)操作符,做一個小延時操作,而查看Log日志也確認驗證了我們上面的說法,它是無序的。
6、concatMap
上面其實就說了,concatMap 與 FlatMap 的唯一區別就是 concatMap 保證了順序,所以,我們就直接把 flatMap 替換為 concatMap 驗證吧。
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4 e.onNext(1); 5 e.onNext(2); 6 e.onNext(3); 7 } 8 }).concatMap(new Function<Integer, ObservableSource<String>>() { 9 @Override 10 public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { 11 List<String> list = new ArrayList<>(); 12 for (int i = 0; i < 3; i++) { 13 list.add("I am value " + integer); 14 } 15 int delayTime = (int) (1 + Math.random() * 10); 16 return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS); 17 } 18 }).subscribeOn(Schedulers.newThread()) 19 .observeOn(AndroidSchedulers.mainThread()) 20 .subscribe(new Consumer<String>() { 21 @Override 22 public void accept(@NonNull String s) throws Exception { 23 Log.e(TAG, "flatMap : accept : " + s + "\n"); 24 mRxOperatorsText.append("flatMap : accept : " + s + "\n"); 25 } 26 });
輸出:
結果的確和我們預想的一樣。
三、寫在最后
好了,這一節就先介紹到這里,下一節我們將學習其它的一些操作符,在操作符講完后再帶大家進入實際情景,希望持續關注,代碼傳送門。