RxJava/RxAndroid 使用實例實踐


原文地址

RxAndroid Tutorial
響應式編程(Reactive programming)不是一種API,而是一種新的非常有用的范式,而RxJava就是一套基於此思想的框架,在Android開發中我們通過這個框架就能探索響應式的世界,同時結合另一個庫,RxAndroid,這是一個擴展庫,更好的兼容了Android特性,比如主線程,UI事件等。
在這篇指南中,你將會學習到以下這些內容:

  • 什么是響應式編程
  • 什么是observable
  • 如何將異步事件比如按鈕點擊或者EditText字符變化轉換成observables
  • observable變換
  • observable 過濾攔截
  • 如何指定鏈式中的代碼執行線程
  • 如何合並多個observables

前言

the starter project for this tutorial 可以下載這篇文章中項目的所有代碼, 可以直接在Android Studio中打開。
大部分的代碼都在 CheeseActivity.java 這個類里面,繼承於 BaseSearchActivity;里面有一些基礎方法:
showProgressBar(): 顯示一個進度條
hideProgressBar(): 隱藏一個進度條
showResult(List<String> result): 顯示一個列表數據
mCheeseSearchEngine: CheeseSearchEngine類的一個對象,內部有一個search方法,接收一個數據查詢並返回一個匹配的列表list。
直接運行的話,跑出來是這樣子,就是一個查詢的界面:


 

什么是響應式編程

在創建第一個observable之前,先看一下響應式編程的理論 :]

一般的程序是這樣的,表達式只會計算一次,然后把賦值給變量

int a = 2;
int b = 3;
int c = a * b; // c is 6

a = 10;
// c is still 6

在a重新賦值后,前面的c並不會變化,而響應式編程會對值的變化做出響應。
有時候很有可能你已經做過一些響應式編程,但是並沒有意識到這一點。
比如Excel中的表格,我們可以在表格里面填上一些值,同時將某個格子的值設為一個表達式,就像下面這樣


 

設置這個表格里面 B1區域的值為2,B2區域的值為3,B3是一個表達式,B3 = B1* B2,當其中一個值改變的時候,這個觀察者B3也會變化,如圖把B1改成10,B3就會自動計算成30。


 

RxJava Observable

RxJava使用的是觀察者模式,其中有兩個關鍵的接口:Observable 和 Observer,當Observable(被觀察的對象)狀態改變,所有subscribed(訂閱)的Observer(觀察者)會收到一個通知。
在Observable的接口中有一個方法 subscribe() ,這樣Observer 可以調用來進行訂閱。
同樣,在Observer 接口中有三個方法,會被Observable 回調:

  • onNext(T value) 提供了一個 T 類型的item給Observer
  • onComplete() 在Observable發送items結束后通知Observer
  • onError(Throwable e) 當Observable發生錯誤時通知Observer

作為一個表現良好的Observable,發射0到多個數據時后面都會跟上一個completion 或是error的回調。
聽起來有點復雜,但是一些例子可以很清晰的解釋。

一個網絡請求observable 通常只發射一個數據並且立刻completes。


 


每一個圓代表了從observable 發射出去的item數據,黑色的block代表了結束或是錯誤。
一個鼠標的移動observable 將會不斷的發送鼠標當前坐標,並且從不會結束。


 

在一個observable 已經結束后不能再發射新的item數據,下面這個就是一個不好的示范,違反了Observable 的准則


 


在已經發信號結束后依然發射了一個item。

怎么創建一個Observable

你可以直接通過 Observable.create() 創建一個Observable

Observable<T> create(ObservableOnSubscribe<T> source)

 

看起來十分的簡潔,但是這段代碼是什么意思呢?這個 “source” 又是什么? 想要理解這個,只需要知道 ObservableOnSubscribe 是什么。 這是一個接口,其中有一個方法:

public interface ObservableOnSubscribe<T> {
  void subscribe(ObservableEmitter<T> emitter) throws Exception;
}

 

這個你創建Observable 時的一個“source” 需要暴露一個 subscribe() 方法,從這里又引出來另一個 emitter(發射器),那么什么又是emitter?
RxJava中的 Emitter 接口和 Observer 比較相似,都有以下方法

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

 

ObservableEmitter 提供了一個方法用來取消訂閱,用一個實際場景來形容一下。想象一個水龍頭和水流,這個管道就相當於Observable,從里面能放出水,ObservableEmitter 就相當於是水龍頭,控制開關,而水龍頭連接到管道就是 Observable.create()。
舉個例子免得前面描述太過於抽象,先來看看第一個例子

觀察按鈕點擊事件

CheeseActivity 類中有這么一段代碼

// 1
private Observable<String> createButtonClickObservable() {

  // 2
  return Observable.create(new ObservableOnSubscribe<String>() {

    // 3
    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
      // 4
      mSearchButton.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View view) {
          // 5
          emitter.onNext(mQueryEditText.getText().toString());
        }
      });

      // 6
      emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
          // 7
          mSearchButton.setOnClickListener(null);
        }
      });
    }
  });
}

 

上面這段代碼做了以下幾件事情

  1. 定義了一個方法會返回一個Observable,泛型是String類型。
  2. 通過 Observable.create() 創建了一個observable ,並提供了一個ObservableOnSubscribe。
  3. 在參數的內部類中覆寫了 subscribe() 方法。
  4. 給搜索按鈕mSearchButton添加了一個點擊事件。
  5. 當點擊事件觸發時,調用emitter 的onNext 方法,並傳遞了當前mQueryEditText的值。
  6. 在Java中保持引用容易造成內存泄漏,在不再需要的時候及時移除listeners是一個好習慣,那么這里怎么移除呢?ObservableEmitter 有一個 setCancellable() 方法。通過重寫cancel()方法,然后當Observable 被處理的時候這個實現會被回調,比如已經結束或者是所有的觀察者都解除了訂閱。
  7. 通過setOnClickListener(null) 來移除監聽。

現在被觀察者Observable 已經有了,還需要觀察者來進行訂閱,在此之前,我們先看看另一個接口, Consumer ,它可以十分簡單的從emitter 接收到數據。

public interface Consumer<T> {
  void accept(T t) throws Exception;
}

 

如果僅是想要簡單的訂閱一下Observable,這個接口是很方便的。
Observable 的接口方法 subscribe() 可以接收很多類型的參數,你可以訂閱一個全參數的版本,只要你實現其中所有的方法就可以。如果只是想要接收一下發射的數據,可以使用單一的 Consumer 的版本,這樣只需要實現一個方法,而且也是 onNext
我們可以直接在Activity的OnStart方法中來實現這個

@Override
protected void onStart() {
  super.onStart();
  // 1
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 2
      .subscribe(new Consumer<String>() {
        //3
        @Override
        public void accept(String query) throws Exception {
          // 4
          showResult(mCheeseSearchEngine.search(query));
        }
      });
}

 

其中Consumer需要導的包是

import io.reactivex.functions.Consumer;

 

依次解釋一下上面每一步

  1. 創建一個Observable 基於前面寫的事件監聽代碼
  2. 通過subscribe方法來訂閱這個Observable ,並提供一個單一的 Consumer
  3. 重寫Consumer 方法,這會在按鈕點擊的時候接收到發射出來的EditText的值
  4. 搜索並展示結果

這樣一個簡單的實現也寫完了,運行一下APP,跑出來的結果就像下面這樣


 

RxJava線程模型

雖然已經像模像樣的寫了一個小程序,但其實存在一些問題。當按鈕按下去后這個UI線程實際上被阻塞住了
如果在控制台可能可以看到這樣的提示

> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!  
The application may be doing too much work on its main thread.

這是由於search 發生在主線程,如果是一個網絡請求的話,Android會直接crash,拋出一個NetworkOnMainThreadException 的異常。如果不指定線程,那么RxJava的操作會一直在一個線程上。
通過 subscribeOnobserveOn 兩個操作符能改變線程的執行狀態。
subscribeOn 在操作鏈上最好只調用一次,如果多次調用,依然只有第一次生效
subscribeOn 用來指定 observable 在哪個線程上創建執行操作,如果想要通過observables 發射事件給Android的View,那么需要保證訂閱者在Android的UI線程上執行操作。
另一方面, observeOn 可以在鏈上調用多次,它主要是用來指定下一個操作在哪一個線程上執行,來個例子:

myObservable // observable will be subscribed on i/o thread
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .map(/* this will be called on main thread... */)
  .doOnNext(/* ...and everything below until next observeOn */)
  .observeOn(Schedulers.io())
  .subscribe(/* this will be called on i/o thread */);

 

主要用到三種schedulers:
Schedulers.io(): 適合I/O類型的操作,比如網絡請求,磁盤操作。
Schedulers.computation(): 適合計算任務,比如事件循環或者回調處理。
AndroidSchedulers.mainThread() : 回調主線程,比如UI操作。

Map 操作符

map操作符通過運用一個方法把從一個observable 發射的數據再返回成另一個observable給那些調用的。
比如你有一個observable稱之為numbers,並且會發射一系列的值,如下所示


 


通過map操作符的apply方法

numbers.map(new Function<Integer, Integer>() {
  @Override
  public Integer apply(Integer number) throws Exception {
    return number * number;
  }
}

 

然后結果就像下面這樣


 


再來個實例,我們用這個操作符能夠把前面的代碼拆分一下

@Override
protected void onStart() {
  super.onStart();
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 1
      .observeOn(Schedulers.io())
      // 2
      .map(new Function<String, List<String>>() {
        @Override
        public List<String> apply(String query) {
          return mCheeseSearchEngine.search(query);
        }
      })
      // 3
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> result) {
          showResult(result);
        }
      });
}

 

簡述一下代碼,首先,指定下一次操作在I/O線程上,然后通過給的String,執行search返回一個結果列表,
再將線程從I/O上變更為主線程,showResult ,展示返回的數據。

通過doOnNext顯示進度條

為了用戶體驗,我們需要一個進度條
這里可以引入 doOnNext 操作符,doOnNext 有一個 Consumer ,並且在每次observable 發射數據的時候都會被調用,再改一下前面的代碼

@Override
protected void onStart() {
  super.onStart();
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 1
      .observeOn(AndroidSchedulers.mainThread())
      // 2
      .doOnNext(new Consumer<String>() {
        @Override
        public void accept(String s) {
          showProgressBar();
        }
      })
      .observeOn(Schedulers.io())
      .map(new Function<String, List<String>>() {
        @Override
        public List<String> apply(String query) {
          return mCheeseSearchEngine.search(query);
        }
      })
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> result) {
          // 3
          hideProgressBar();
          showResult(result);
        }
      });
}

 

每次在點擊按鈕的時候就能收到一個事件
首先把線程切換到主線程,然后在 doOnNext 里面來顯示進度條,再把線程切換到子線程,來進行請求數據,最后在切換回來關閉進度條,展示數據。RxJava非常適合這種需求,代碼也很清晰。
把這個例子跑起來的效果就像下面這樣,點擊的時候就顯示進度條:


 

觀察EditText變化

除了通過點擊按鈕來搜索,更好的方式就是根據EditText的text內容變化自動的搜索。
首先,就需要對EditText的內容變化進行訂閱觀察,先看代碼實例:

//1
private Observable<String> createTextChangeObservable() {
  //2
  Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
      //3
      final TextWatcher watcher = new TextWatcher() {
        @Override
        public void beforeTextChanged(CharSequence s, int start, int count, int after) {}

        @Override
        public void afterTextChanged(Editable s) {}

        //4
        @Override
        public void onTextChanged(CharSequence s, int start, int before, int count) {
          emitter.onNext(s.toString());
        }
      };

      //5
      mQueryEditText.addTextChangedListener(watcher);

      //6
      emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
          mQueryEditText.removeTextChangedListener(watcher);
        }
      });
    }
  });

  // 7
  return textChangeObservable;
}

 

分析一下上面這幾步代碼:

  1. 定義一個方法返回一個EditText變化的observable
  2. 通過 Observable.create 創建一個textChangeObservable ,傳入一個ObservableOnSubscribe 對象
  3. 在subscribe 方法中,創建一個TextWatcher,這是用來監聽值變化的
  4. 這里不用管 beforeTextChanged()afterTextChanged(),在onTextChanged 里面,把這個數據通過emitter.onNext 發射出去,這樣訂閱的觀察者就能接收到
  5. 通過addTextChangedListener將Edittext綁定上這個watcher監聽
  6. 最后在emitter的setCancellable中去移除這個監聽,防止內存泄漏

實現了這個Observable后就可以把前面的給替換掉

Observable<String> searchTextObservable = createTextChangeObservable();

 

再跑一次程序,就可以邊輸入邊搜索了


 

內容長度攔截過濾

現在可能有一個需求是在輸入長度比較短的時候不進行搜索,達到一定字符后才搜索,RxJava引入了一個 filter 操作符。
filter只會通過那些滿足條件的item,filter通過一個 Predicate,這個接口內部有一個 test 方法用來決定是否滿足條件,最后會返回一個boolean 值。
這里,Predicate 拿到的是一個輸入字符String,如果長度大於或等於2,就返回true,表示滿足條件。

return textChangeObservable
    .filter(new Predicate<String>() {
      @Override
      public boolean test(String query) throws Exception {
        return query.length() >= 2;
      }
    });

 

注意Predicate需要導的包是:

import io.reactivex.functions.Predicate;

再前面創建Observable的代碼后面加一個 filter 后,當query的長度不足2時,那這個值就不會被發射出去,然后訂閱的就收不到這個消息。
跑起來就像這樣,只輸一個數,返回false,不會觸發搜索。


 


再輸一個字符就通過了filter的過濾。


 

Debounce 操作符

有時我們對於EditText內容頻繁變化的場景並不想每次變化都去新發送一個請求,所以,這里又引入了一個新的操作符 debounce ,意思就是防抖動,這個和filter比較類似,也是一種攔截的策略。
這個操作符是根據item被發射的時間來進行過濾。每次在一個item被發射后,debounce 會等待一段指定長度的時間,然后才去發射下一個item。
如果在這段時間內都沒有一個item發生,那么上一個最后的item會被發射出去,這樣能保證起碼有一個item能被發射成功。


 


從圖里看到,2,3,4,5觸發的時間非常的接近,所以這一段時間內前三個都被過濾了,只留下了5。
在前面的 createTextChangeObservable() 中,我們再添加一個 debounce 操作符在 filter 的后面

return textChangeObservable
    .filter(new Predicate<String>() {
      @Override
      public boolean test(String query) throws Exception {
        return query.length() >= 2;
      }
    }).debounce(1000, TimeUnit.MILLISECONDS);  // add this line

 

再跑一下APP,可以看到中間階段直接省略了,最后搜索了一下結果值


 

Merge 操作符

一開始我們實現了一個observable 是監聽點擊按鈕的事件,然后又實現了一個observable 是監聽EditText的內容變化,那么怎么把這兩個合二為一呢。
RxJava提供了很多的操作符來聯合observables,但是其中最有用和簡單的就是 merge
merge 可以將兩個或更多的observable 聯合起來,合成一個單一的observable。


 


這里我們把前面兩個observable 綁定起來

Observable<String> buttonClickStream = createButtonClickObservable();
Observable<String> textChangeStream = createTextChangeObservable();

Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);

 

現在的效果就是前面的兩種效果的結合體,無論是自動搜索還是手動搜索都是可以觸發的。

RxJava和Activity/Fragment生命周期

前面我們實現過 setCancellable 方法,這個方法會在解除訂閱的時候回調。
Observable.subscribe() 會返回一個Disposable,Disposable是一個接口,其中有兩個方法:

public interface Disposable {
  void dispose();  // ends a subscription
  boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}

 

我們先在 CheeseActivity 中定義一個Disposable

private Disposable mDisposable;

onStart() 中,把 subscribe() 的返回值賦給mDisposable

mDisposable = searchTextObservable // change this line
  .observeOn(AndroidSchedulers.mainThread())
  .doOnNext(new Consumer<String>() {
    @Override
    public void accept(String s) {
      showProgressBar();
    }
  })
  .observeOn(Schedulers.io())
  .map(new Function<String, List<String>>() {
    @Override
    public List<String> apply(String query) {
      return mCheeseSearchEngine.search(query);
    }
  })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Consumer<List<String>>() {
    @Override
    public void accept(List<String> result) {
      hideProgressBar();
      showResult(result);
    }
  });

 

最后我們就能在 onStop() 中去解除這個訂閱,代碼如下:

@Override
protected void onStop() {
  super.onStop();
  if (!mDisposable.isDisposed()) {
    mDisposable.dispose();
  }
}

 

這樣就解除了訂閱。

后記

你可以下載這篇文章中的代碼程序,下載地址
當然這篇文章只是講到了RxJava世界的一小點,比如,JakeWharton大神的庫 RxBinding ,這個庫里面包括大量的Android View的API,你可以通過調用 RxView.clicks(viewVariable) 來創建一個點擊事件observable 。
除此之外,學習更多有關RxJava的知識,可以看 官方文檔



作者:sheepm
鏈接:http://www.jianshu.com/p/031745744bfa
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM