Android RxJava2 淺析


原文地址:http://blog.csdn.net/maplejaw_/article/details/52442065 

Observable

在RxJava1.x中,最熟悉的莫過於Observable這個類了,筆者剛使用RxJava2.x時,創建一個Observable后,頓時是懵逼的。因為我們熟悉的Subscriber居然沒影了,取而代之的是ObservableEmitter,俗稱發射器。此外,由於沒有了Subscriber的蹤影,我們創建觀察者時需使用Observer。而Observer也不是我們熟悉的那個Observer,其回調的Disposable參數更是讓人摸不到頭腦。

廢話不多說,從會用開始,還記得使用RxJava的三部曲嗎? 
第一步:初始化一個Observable

  1.        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
  2.  
  3.             @Override
  4.             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  5.                 e.onNext(1);
  6.                 e.onNext(2);
  7.                 e.onComplete();
  8.             }
  9.         });

第二步:初始化一個Observer

  1.         Observer<Integer> observer= new Observer<Integer>() {
  2.  
  3.             @Override
  4.             public void onSubscribe(Disposable d) {
  5.  
  6.             }
  7.  
  8.             @Override
  9.             public void onNext(Integer value) {
  10.  
  11.  
  12.             }
  13.  
  14.             @Override
  15.             public void onError(Throwable e) {
  16.  
  17.             }
  18.  
  19.             @Override
  20.             public void onComplete() {
  21.             }
  22.         }

第三部:建立訂閱關系

  1.     observable.subscribe(observer); //建立訂閱關系

不難看出,與RxJava1.x還是存在着一些區別的。首先,創建Observable時,回調的是ObservableEmitter,字面意思即發射器,用於發射數據(onNext)和通知(onError/onComplete)。其次,創建的Observer中多了一個回調方法onSubscribe,傳遞參數為Disposable ,Disposable相當於RxJava1.x中的Subscription,用於解除訂閱。你可能納悶為什么不像RxJava1.x中訂閱時返回Disposable,而是選擇回調出來呢。官方說是為了設計成Reactive-Streams架構。不過仔細想想這么一個場景還是很有用的,假設Observer需要在接收到異常數據項時解除訂閱,在RxJava2.x中則非常簡便,如下操作即可。

  1.   Observer<Integer> observer = new Observer<Integer>() {
  2.             private Disposable disposable;
  3.  
  4.             @Override
  5.             public void onSubscribe(Disposable d) {
  6.                 disposable = d;
  7.             }
  8.  
  9.             @Override
  10.             public void onNext(Integer value) {
  11.                 Log.d("JG", value.toString());
  12.                 if (value > 3) {   // >3 時為異常數據,解除訂閱
  13.                     disposable.dispose();
  14.                 }
  15.             }
  16.  
  17.             @Override
  18.             public void onError(Throwable e) {
  19.  
  20.             }
  21.  
  22.             @Override
  23.             public void onComplete() {
  24.  
  25.  
  26.  
  27.             }
  28.         };

此外,RxJava2.x中仍然保留了其他簡化訂閱方法,我們可以根據需求,選擇相應的簡化訂閱。只不過傳入的對象改為了Consumer。`

  1.    Disposable disposable = observable.subscribe(new Consumer<Integer>() {
  2.             @Override
  3.             public void accept(Integer integer) throws Exception {
  4.                   //這里接收數據項
  5.             }
  6.         }, new Consumer<Throwable>() {
  7.             @Override
  8.             public void accept(Throwable throwable) throws Exception {
  9.               //這里接收onError
  10.             }
  11.         }, new Action() {
  12.             @Override
  13.             public void run() throws Exception {
  14.               //這里接收onComplete。
  15.             }
  16.         });

不同於RxJava1.x,RxJava2.x中沒有了一系列的Action/Func接口,取而代之的是與Java8命名類似的函數式接口,如下圖: 

image_1arse84b03rfoo89mr1m53151a9.png

其中Action類似於RxJava1.x中的Action0,區別在於Action允許拋出異常。

  1. public interface Action {
  2.     /**
  3.      * Runs the action and optionally throws a checked exception
  4.      * @throws Exception if the implementation wishes to throw a checked exception
  5.      */
  6.     void run() throws Exception;
  7. }

而Consumer即消費者,用於接收單個值,BiConsumer則是接收兩個值,Function用於變換對象,Predicate用於判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這里也就不再贅述了。

線程調度

關於線程切換這點,RxJava1.x和RxJava2.x的實現思路是一樣的。這里就簡單看下相關源碼。

subscribeOn

同RxJava1.x一樣,subscribeOn用於指定subscribe()時所發生的線程,從源碼角度可以看出,內部線程調度是通過ObservableSubscribeOn來實現的。

  1.    public final Observable<T> subscribeOn(Scheduler scheduler) {
  2.         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  3.         return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
  4.     }

ObservableSubscribeOn的核心源碼在subscribeActual方法中,通過代理的方式使用SubscribeOnObserver包裝Observer后,設置Disposable來將subscribe切換到Scheduler線程中

  1.     @Override
  2.     public void subscribeActual(final Observer<? super T> s) {
  3.         final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
  4.  
  5.         s.onSubscribe(parent); //回調Disposable
  6.  
  7.         parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設置`Disposable`
  8.             @Override
  9.             public void run() {
  10.                 source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中
  11.             }
  12.         }));
  13.     }

observeOn

observeOn方法用於指定下游Observer回調發生的線程。

  1.     public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  2.          //..
  3.          //驗證安全
  4.         return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  5.     }

主要實現在ObservableObserveOn中的subscribeActual,可以看出,不同於subscribeOn,沒有將suncribe操作全部切換到Scheduler中,而是通過ObserveOnSubscriber與Scheduler配合,通過schedule()達到切換下游Observer回調發生的線程,這一點與RxJava1.x實現幾乎相同。關於ObserveOnSubscriber的源碼這里不再重復描述了,有興趣的可以查看本人RxJava源碼解讀這篇文章

  1.     @Override
  2.     protected void subscribeActual(Observer<? super T> observer) {
  3.         if (scheduler instanceof TrampolineScheduler) {
  4.             source.subscribe(observer);
  5.         } else {
  6.             Scheduler.Worker w = scheduler.createWorker();
  7.  
  8.             source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize));
  9.         }
  10.     }

Flowable

Flowable是RxJava2.x中新增的類,專門用於應對背壓(Backpressure)問題,但這並不是RxJava2.x中新引入的概念。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,比如在Android中常見的點擊事件,點擊過快則會造成點擊兩次的效果。 
我們知道,在RxJava1.x中背壓控制是由Observable完成的,使用如下:

  1.   Observable.range(1,10000)
  2.             .onBackpressureDrop()
  3.             .subscribe(integer -> Log.d("JG",integer.toString()));

而在RxJava2.x中將其獨立了出來,取名為Flowable。因此,原先的Observable已經不具備背壓處理能力。 
通過Flowable我們可以自定義背壓處理策略。 

image_1arsktnsv1uk810abcki15ljp91m.png

測試Flowable例子如下:

  1.   Flowable.create(new FlowableOnSubscribe<Integer>() {
  2.  
  3.             @Override
  4.             public void subscribe(FlowableEmitter<Integer> e) throws Exception {
  5.  
  6.                 for(int i=0;i<10000;i++){
  7.                     e.onNext(i);
  8.                 }
  9.                 e.onComplete();
  10.             }
  11.         }, FlowableEmitter.BackpressureMode.ERROR) //指定背壓處理策略,拋出異常
  12.                 .subscribeOn(Schedulers.computation())
  13.                 .observeOn(Schedulers.newThread())
  14.                 .subscribe(new Consumer<Integer>() {
  15.                     @Override
  16.                     public void accept(Integer integer) throws Exception {
  17.                         Log.d("JG", integer.toString());
  18.                         Thread.sleep(1000);
  19.                     }
  20.                 }, new Consumer<Throwable>() {
  21.                     @Override
  22.                     public void accept(Throwable throwable) throws Exception {
  23.                         Log.d("JG",throwable.toString());
  24.                     }
  25.                 });

或者可以使用類似RxJava1.x的方式來控制。

  1.   Flowable.range(1,10000)
  2.                 .onBackpressureDrop()
  3.                 .subscribe(integer -> Log.d("JG",integer.toString()));

其中還需要注意的一點在於,Flowable並不是訂閱就開始發送數據,而是需等到執行Subscription#request才能開始發送數據。當然,使用簡化subscribe訂閱方法會默認指定Long.MAX_VALUE。手動指定的例子如下:

  1.         Flowable.range(1,10).subscribe(new Subscriber<Integer>() {
  2.             @Override
  3.             public void onSubscribe(Subscription s) {
  4.                 s.request(Long.MAX_VALUE);//設置請求數
  5.             }
  6.  
  7.             @Override
  8.             public void onNext(Integer integer) {
  9.  
  10.             }
  11.  
  12.             @Override
  13.             public void onError(Throwable t) {
  14.  
  15.             }
  16.  
  17.             @Override
  18.             public void onComplete() {
  19.  
  20.             }
  21.         });

Single

不同於RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一個回調方法onSubscribe。

  1. interface SingleObserver<T> {
  2.     void onSubscribe(Disposable d);
  3.     void onSuccess(T value);
  4.     void onError(Throwable error);
  5. }

Completable

同Single,Completable也被重新設計為Reactive-Streams架構,RxJava1.x的CompletableSubscriber改為CompletableObserver,源碼如下:

  1. interface CompletableObserver<T> {
  2.     void onSubscribe(Disposable d);
  3.     void onComplete();
  4.     void onError(Throwable error);
  5. }

Subject/Processor

Processor和Subject的作用是相同的。關於Subject部分,RxJava1.x與RxJava2.x在用法上沒有顯著區別,這里就不介紹了。其中Processor是RxJava2.x新增的,繼承自Flowable,所以支持背壓控制。而Subject則不支持背壓控制。使用如下:

  1.         //Subject
  2.         AsyncSubject<String> subject = AsyncSubject.create();
  3.         subject.subscribe(-> Log.d("JG",o));//three
  4.         subject.onNext("one");
  5.         subject.onNext("two");
  6.         subject.onNext("three");
  7.         subject.onComplete();
  8.  
  9.        //Processor
  10.         AsyncProcessor<String> processor = AsyncProcessor.create();
  11.         processor.subscribe(-> Log.d("JG",o)); //three
  12.         processor.onNext("one");
  13.         processor.onNext("two");
  14.         processor.onNext("three");
  15.         processor.onComplete();

操作符

關於操作符,RxJava1.x與RxJava2.x在命名和行為上大多數保持了一致,部分操作符請查閱文檔。

最后

RxJava1.x 如何平滑升級到RxJava2.x? 
由於RxJava2.x變化較大無法直接升級,幸運的是,官方提供了RxJava2Interop這個庫,可以方便地將RxJava1.x升級到RxJava2.x,或者將RxJava2.x轉回RxJava1.x。地址:https://github.com/akarnokd/RxJava2Interop


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM