RxJava 2.x 使用最佳實踐


轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/76443347
本文出自【趙彥軍的博客】

以前寫過 Rxjava 系列教程, 如下所示

上面的這些教程覆蓋了 rxjava 的方方面面,很詳細。只是當時寫的時候是基於 rxjava 1.X 的版本寫的,后來 rxjava 進入了快速迭代的時期,很快就出現了 2.x 版本。根據 Rxjava 官方的GitHub 來看,2.x 相對於 1.x 做了很多改進,刪除了不少的類,同時也增加了一些新的類。基於以上背景,以前的這些文章,就顯得有些不足,為了緊跟 rxjava 的步伐,下面的這篇博客,就是對 rxjava 的重新認識。

Rxjava、RxAndroid

Rxjava : https://github.com/ReactiveX/RxJava

RxAndroid : https://github.com/ReactiveX/RxAndroid

添加依賴

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.2'

create() :創建

create操作符應該是最常見的操作符了,主要用於產生一個Obserable被觀察者對象,為了方便大家的認知,以后的教程中統一把被觀察者Observable稱為發射器(上游事件),觀察者Observer稱為接收器(下游事件)。

Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
           e.onNext(1);
           e.onNext(2);
           e.onNext(3);
           e.onComplete(); //結束
           e.onNext( 4 );
          }
        })
          .subscribe(new Observer<Integer>() {
                  @Override
                  public void onSubscribe(@NonNull Disposable d) {
                      Log.e("zhao", "onSubscribe: " + d.isDisposed());
                  }

                  @Override
                  public void onNext(@NonNull Integer integer) {
                      Log.e("zhao", "onNext: " + integer);
                  }

                  @Override
                  public void onError(@NonNull Throwable e) {
                      Log.e("zhao", "onError: ");
                  }

                  @Override
                  public void onComplete() {
                      Log.e("zhao", "onComplete: ");
                  }
            });

結果是:

E/zhao: onSubscribe: false
E/zhao: onNext: 1
E/zhao: onNext: 2
E/zhao: onNext: 3
E/zhao: onComplete: 

需要注意的幾點是:

1)在發射完 3 之后, 調用 e.onComplete() 方法,結束 發射數據。4 沒有發射出來。

  1. 另外一個值得注意的點是,在RxJava 2.x中,可以看到發射事件方法相比1.x多了一個throws Excetion,意味着我們做一些特定操作再也不用try-catch了。

  2. 並且2.x 中有一個Disposable概念,這個東西可以直接調用切斷,可以看到,當它的isDisposed()返回為false的時候,接收器能正常接收事件,但當其為true的時候,接收器停止了接收。所以可以通過此參數動態控制接收事件了。

在上面接收數據的時候,我們用了 Observer 對象,需要實現 4 個 方法。這顯得過於累贅,我們可以用 Consumer 對象來代替 Observer 對象,代碼如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
            e.onNext(4);
        }
    })
        .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(Integer integer) throws Exception {
                      Log.e("zhao", "accept: " + integer);
                  }
          });

效果如下:

 E/zhao: accept: 1
 E/zhao: accept: 2
 E/zhao: accept: 3

需要注意的是:

1)、Consumer 對象完全代替了Observer ,效果是一樣的。Consumer 顧名思義是消費者的意思,是消費數據的對象。Consumer 對象是 Rxjava 2.x 才出現的,老版本沒有。

map 操作符

map基本算是 RxJava 中一個最簡單的操作符了,熟悉 RxJava 1.x 的知道,它的作用是對發射時間發送的每一個事件應用一個函數,是的每一個事件都按照指定的函數去變化,而在2.x中它的作用幾乎一致。

Observable.create(new ObservableOnSubscribe<Integer>() {
       @Override
       public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
           e.onNext(1);
           e.onNext(2);
           e.onNext(3);
       }
   })
   .map(new Function<Integer, String>() {
       @Override
       public String apply(@NonNull Integer integer) throws Exception {
           // map 操作符,就是轉換輸入、輸出 的類型;本例中輸入是 Integer , 輸出是 String 類型
           Log.e("zhao", "apply: " + integer + "  線程:" + Thread.currentThread().getName());
           return "This is result " + integer;
       }
   })
   .subscribeOn(Schedulers.io()) //在子線程發射
   .observeOn(AndroidSchedulers.mainThread())  //在主線程接收
   .subscribe(new Consumer<String>() {
         @Override
         public void accept(@NonNull String s) throws Exception {
          Log.e("zhao", "accept: " + s + "  線程:" + Thread.currentThread().getName());
   }
});

結果是:

E/zhao: apply: 1  線程:RxCachedThreadScheduler-1
E/zhao: apply: 2  線程:RxCachedThreadScheduler-1
E/zhao: apply: 3  線程:RxCachedThreadScheduler-1
E/zhao: accept: This is result 1  線程:main
E/zhao: accept: This is result 2  線程:main
E/zhao: accept: This is result 3  線程:main

flatMap 操作符

FlatMap 是一個很有趣的東西,我堅信你在實際開發中會經常用到。它可以把一個發射器Observable 通過某種方法轉換為多個Observables,然后再把這些分散的Observables裝進一個單一的發射器Observable。但有個需要注意的是,flatMap並不能保證事件的順序,如果需要保證,需要用到我們下面要講的ConcatMap。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            }
        })
           .flatMap(new Function<Integer, ObservableSource<String>>() {
               @Override
               public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                   List<String> list = new ArrayList<>();
                   for (int i = 0; i < 3; i++) {
                       list.add("I am value " + integer);
                   }
                   //隨機生成一個時間
                   int delayTime = (int) (1 + Math.random() * 10);
                   return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
               }
           })
           .subscribe(new Consumer<String>() {
              @Override
               public void accept(String s) throws Exception {
                   Log.e("zhao", "accept: " + s);
               }
           });

效果如下:

E/zhao: accept: I am value 1
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2

一切都如我們預期中的有意思,為了區分concatMap(下一個會講),我在代碼中特意動了一點小手腳,我采用一個隨機數,生成一個時間,然后通過delay(后面會講)操作符,做一個小延時操作,而查看Log日志也確認驗證了我們上面的說法,它是無序的。

concatMap 操作符

上面其實就說了,concatMap 與 FlatMap 的唯一區別就是 concatMap 保證了順序,所以,我們就直接把 flatMap 替換為 concatMap 驗證吧。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            }
        })
           .concatMap(new Function<Integer, ObservableSource<String>>() {
               @Override
               public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                   List<String> list = new ArrayList<>();
                   for (int i = 0; i < 3; i++) {
                       list.add("I am value " + integer);
                   }
                   //隨機生成一個時間
                   int delayTime = (int) (1 + Math.random() * 10);
                   return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
               }
           })
           .subscribe(new Consumer<String>() {
              @Override
               public void accept(String s) throws Exception {
                   Log.e("zhao", "accept: " + s);
               }
           });

效果如下:

E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3

zip 操作符

構建一個 String 發射器 和 Integer 發射器

  //創建 String 發射器
private Observable<String> getStringObservable() {
      return Observable.create(new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                e.onNext("B");
                e.onNext("C");
            }
        });
    }

//創建 String 發射器
private Observable<Integer> getIntegerObservable() {
      return Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
              e.onNext(1);
              e.onNext(2);
              e.onNext(3);
              e.onNext(4);
              e.onNext(5);
          }
      });
  }

使用 zip 操作符

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
       @Override
       public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
           return s + integer;
          }
      })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.e("zhao", "accept: " + s);
                }
            });

效果如下:

E/zhao: accept: A1
E/zhao: accept: B2
E/zhao: accept: C3

需要注意的是:

  1. zip 組合事件的過程就是分別從發射器A和發射器B各取出一個事件來組合,並且一個事件只能被使用一次,組合的順序是嚴格按照事件發送的順序來進行的,所以上面截圖中,可以看到,1永遠是和A 結合的,2永遠是和B結合的。

  2. 最終接收器收到的事件數量是和發送器發送事件最少的那個發送器的發送事件數目相同,所以如截圖中,5很孤單,沒有人願意和它交往,孤獨終老的單身狗。

interval 操作符

interval操作符是每隔一段時間就產生一個數字,這些數字從0開始,一次遞增1直至無窮大

//方法1 
Flowable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
             @Override
             public void accept(@NonNull Long aLong) throws Exception {
                 Log.e("zhao", "accept11>: " + aLong);
              }
      });

//方法2 
Observable.interval(1, TimeUnit.SECONDS)
          .subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  Log.e("zhao", "accept:22> " + aLong);
              }
      });

效果如下:


E/zhao: accept11>: 0
E/zhao: accept11>: 1
E/zhao: accept11>: 2
E/zhao: accept11>: 3
E/zhao: accept11>: 4

倒計時

既然 interval 操作符會產生從 0 到無窮大的序列,那么我們我們會返回來思考一下,如果倒過來想, 就會發現可以用 interval 方法,實現一個倒計時的功能。

創建一個倒計時的 Observable

/**
  * 產生一個倒計時的 Observable
  * @param time
  * @return
  */
  
public Observable<Long> countdown(final long time) {
      return Observable.interval(1, TimeUnit.SECONDS)
             .map(new Function<Long, Long>() {
                 @Override
                 public Long apply(@NonNull Long aLong) throws Exception {
                     return time - aLong;
                 }
             }).take( time + 1 );
  }

實現倒計時的功能

countdown(4).subscribe(new Consumer<Long>() {
       @Override
       public void accept(Long aLong) throws Exception {
            Log.e("zhao", "accept: 倒計時: " + aLong);
        }
    });

效果如下:

E/zhao: accept: 倒計時: 4
E/zhao: accept: 倒計時: 3
E/zhao: accept: 倒計時: 2
E/zhao: accept: 倒計時: 1
E/zhao: accept: 倒計時: 0

repeat 操作符:重復的發射數據

repeat 重復地發射數據

  • repeat( ) //無限重復
  • repeat( int time ) //設定重復的次數
Observable
         .just(1, 2)
         .repeat( 3 ) //重復3次
         .subscribe(new Consumer<Integer>() {
               @Override
               public void accept(Integer integer) throws Exception {
                   Log.e("zhao", "accept: " + integer);
               }
          });

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2

range :發射特定的整數序列

range 發射特定整數序列的 Observable

  • range( int start , int end ) //start :開始的值 , end :結束的值

要求: end >= start

 Observable
           .range( 1 , 5 )
           .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e("zhao", "accept: " + integer);
                }
           });

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5

fromArray : 遍歷數組

Integer[] items = {0, 1, 2, 3, 4, 5};

Observable
        .fromArray(items)
        .subscribe(new Consumer<Integer>() {
             @Override
             public void accept(Integer integer) throws Exception {
                 Log.e("zhao", "accept: " + integer
                );
              }
          });

效果是:

E/zhao: accept: 0
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5

fromIterable : 遍歷集合

List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");

Observable
        .fromIterable(list)
        .subscribe(new Consumer<String>() {
             @Override
             public void accept(String s) throws Exception {
                  Log.e("zhao", "accept: " + s);
         }
   });

效果

E/zhao: accept: a
E/zhao: accept: b
E/zhao: accept: c

toList : 把數據轉換成 List 集合

Observable
          .just(1, 2, 3, 4)
          .toList()
          .subscribe(new Consumer<List<Integer>>() {
               @Override
               public void accept(List<Integer> integers) throws Exception {
                 Log.e("zhao", "accept: " + integers);
           }
     });

效果是

accept: [1, 2, 3, 4]

**把數組轉化成 List 集合 **

Integer[] items = {0, 1, 2, 3, 4, 5};
        
Observable
         .fromArray( items )  //遍歷數組
         .toList()  //把遍歷后的數組轉化成 List 
         .subscribe(new Consumer<List<Integer>>() {
               @Override
               public void accept(List<Integer> integers) throws Exception {
                  Log.e("zhao", "accept: " + integers);
            }
      });

效果是:

 accept: [0, 1, 2, 3, 4, 5]

delay : 延遲發射數據

這里寫圖片描述

Observable
          .just(1, 2, 3)
          .delay(3, TimeUnit.SECONDS)  //延遲3秒鍾,然后在發射數據
          .subscribe(new Consumer<Integer>() {
               @Override
               public void accept(Integer integer) throws Exception {
                  Log.e("zhao", "accept: " + integer);
              }
      });

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3

背壓 BackPressure

背壓產生的原因: 被觀察者發送消息太快以至於它的操作符或者訂閱者不能及時處理相關的消息。在 Rxjava 1.x 版本很容易就會報錯,使程序發生崩潰。

...
    Caused by: rx.exceptions.MissingBackpressureException
...
...

為了解決這個問題,在RxJava2里,引入了Flowable這個類:Observable不包含 backpressure 處理,而 Flowable 包含。

下面我們來模擬一個觸發背壓的實例 , 發射器每1毫秒發射一個數據,接收器每一秒處理一個數據。數據產生是數據處理的1000 倍。

首先用 RxJava 2.x 版本的 Observable 來實現。

Observable.interval(1, TimeUnit.MILLISECONDS)
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Thread.sleep(1000);
                    Log.e("zhao", "onNext: " + aLong);
                }
       });

經過測試,app 很健壯,沒有發生崩潰,日志每1秒打印一次。在上面我們說到 2.x 版本中 Observable 不再支持背壓,發神器生成的數據全部緩存在內存中。

Observable :

  • 不支持 backpressure 處理,不會發生 MissingBackpressureException 異常。

  • 所有沒有處理的數據都緩存在內存中,等待被訂閱者處理。

  • 壞處是:當產生的數據過快,內存中緩存的數據越來越多,占用大量內存。

然后用 RxJava 2.x 版本的 Flowable 來實現。

Flowable.interval(1, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Long>() {
              @Override
               public void accept(Long aLong) throws Exception {
                    Thread.sleep(1000);
                    Log.e("zhao", "onNext: " + aLong);
                }
         });

運行起來發生崩潰,崩潰日志如下:

io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
...
...
  Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests

很明顯發生了 MissingBackpressureException 異常 , 128 代表是 Flowable 最多緩存 128 個數據,緩存次超過 128 個數據,就會報錯。可喜的是,Rxjava 已經給我們提供了解決背壓的策略。

**onBackpressureDrop **

onBackpressureDrop() :當緩沖區數據滿 128 個時候,再新來的數據就會被丟棄,如果此時有數據被消費了,那么就會把當前最新產生的數據,放到緩沖區。簡單來說 Drop 就是直接把存不下的事件丟棄。

onBackpressureDrop 測試

Flowable.interval( 1 , TimeUnit.MILLISECONDS)
        .onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 后面否則不會生效
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Long>() {
              @Override
               public void accept(Long aLong) throws Exception {
                   Thread.sleep(1000);
                   Log.e("zhao", "onNext: " + aLong);
               }
       });

效果如下:

E/zhao: onNext: 0
E/zhao: onNext: 1
...
E/zhao: onNext: 126
E/zhao: onNext: 127
E/zhao: onNext: 96129
E/zhao: onNext: 96130
E/zhao: onNext: 96131

從日志上分析來看,發射器發射的 0 ~ 127 總共 128 個數據是連續的,下一個數據就是 96129 , 128 ~ 96128 的數據被丟棄了。

注意事項

1、onBackpressureDrop 一定要放在 interval 后面否則不會生效

onBackpressureLatest

onBackpressureLatest 就是只保留最新的事件。

onBackpressureBuffer

  • onBackpressureBuffer:默認情況下緩存所有的數據,不會丟棄數據,這個方法可以解決背壓問題,但是它有像 Observable 一樣的缺點,緩存數據太多,占用太多內存。

  • onBackpressureBuffer(int capacity) :設置緩存隊列大小,但是如果緩沖數據超過了設置的值,就會報錯,發生崩潰。

onBackpressureBuffer(int capacity) 測試

Flowable.interval( 1 , TimeUnit.MILLISECONDS)
        .onBackpressureBuffer( 1000 ) //設置緩沖隊列大小為 1000
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Long>() {
              @Override
               public void accept(Long aLong) throws Exception {
                  Thread.sleep(1000);
                  Log.e("zhao", "onNext: " + aLong);
               }
          });

運行起來后,過了幾秒鍾,發生崩潰,日志如下:

io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
···
Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full

通過日志可以看出,緩沖區已經滿了。

注意事項

1、onBackpressureBuffer 一定要放在 interval 后面否則不會生效

參考資料

RxJava2 源碼分析

如何形象地描述 RxJava 中的背壓和流控機制?

給初學者的RxJava2.0教程(八): Flowable緩存


個人微信號:zhaoyanjun125 , 歡迎關注


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM