1. RxJava簡介
Rx(ReactiveX,響應式編程)是一種事件驅動的基於異步數據流的編程模式,整個數據流就像一條河流,它可以被觀測(監聽),過濾,操控或者與其他數據流合並為一條新的數據流。而RxJava是.Net Rx在JVM上的實現。RxJava可以應用於大部分基於JVM的語言,如Scala,Groovy等。整個RxJava+RxAndroid的包大小為(1125kb+10kb)
2.RxJava特點
- 函數響應式編程(Functional Reactive Programming,FRP)
- 異步
- 事件驅動的
- 基於觀察者模式
- 專門的出錯處理,當使用RxJava出現錯誤時,它不會直接拋出異常,而是會執行OnError()方法;
- 並發,可以很容易實現多線程
3.RxJava的基本概念
RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者),Observables發出一系列事件,Subscribers處理這些事件。而RxJava的Observables是擴展自設計模式中的觀察者模式,添加了以下幾個能力:
- onCompleted(),當沒有新的可用數據時,通知
Observables; - onError(),當發生錯誤時,通知
Observables,但不會直接將錯誤或異常直接拋出;
3.1 四個關鍵概念
- Observable,產生事件(事件源)
- Observer, 根據事件作出相應的響應
- Subscriber,實現了Observer的抽象類,
- Subjects,Observable + Observer
3.1.1 Observable
Observable在存活期間,生命周期包含三個可能的事件,與迭代器的生命周期很類似:
| Events | Iterable(pull) | Observable(push) |
|---|---|---|
| 得到數據 | T next() | onNext(T) |
| 發現錯誤 | throws Exception | onError(Throwable) |
| 完成 | !hasNext() | onCompleted() |
與使用迭代器的區別:在使用迭代器的時候,線程會阻塞直到他們需要的數據到來。而使用Observable,是使用異步的方式將數據推送到Observer;
而根據推送機制的不同,Observable分為熱Observable和冷Observable:
- 熱Observable,當他創建時新開始執行它的職責,這樣所有訂閱了這個Observable的Observer就可以直接大中途觀察了(但可能會丟失前面發送的數據(事件));
- 冷Observable,只有等到有訂閱(subscribes)了這個Observable的Observer才開始執行它的職責:發送數據;
下面是一個簡單的創建觀察者的代碼:
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber){}
});
// example
Observable<Integer> ob=
Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber){
for(int i = 0; i < 5; i++){
observer.onNext(i);
}
observer.onCompleted();
}
});
// 並不用關心有多少數據,
Subscription subscriptionPrint =
observableString.subscribe(new Observer<Integer(){
@Override
public void onCompleted(){
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e){
System.out.println("Oh no! Something wrong happened!");
}
@Override
public void onNext(Integer item)
System.out.println("Item is " + item);
})
Observable的構造方法:
- create(subscribe),需要一個subscribe作為參數來構造,
- from(list)// 用來從一個已知的列表的產生數據,和前面的create作用類似;
- just(funnction),用來接收從一個方法的返回值(最多可以有9個參數),如果返回的是List,它不會去逐個遍歷List的Items,而是直接輸出整個List ;
- empty(),不輸出數據,但可以正常結束;
- never(),不輸出數據,並且不會終止;
- throw(),不輸出數據,但在發生錯誤時終止;
- interval(),創建一個按固定間隔發送整數序列的Observable,
- timer(),創建一個Observable在給定的延遲后發送一個特殊的值;
3.1.2 Subject
Subject = Observable + Observer
這意味着一個Subject可以同時是觀察者和被觀察者,事件源(Observable),也就是說Subject可以像觀察者一樣訂閱一個事件源,並且可以像Observable一樣輸出它們收到的事件。RxJava提供了四種不同類型的subjects:
- PublishSubject,
- BehaviorSubject,輸出它觀察到的大部分最近的Items和隨后觀察的Items到所有的訂閱者,初始化時需要一個初始值來做為最近的Items
- ReplaySubject,將它觀察到的所有數據重復發送到所有訂閱了的觀察者;
- AsyncSubject, 在整個Observable完成后,將最后觀察到的Items發送給每一個訂閱者;
4. RxJava的操作符
4.1 過濾
- filter(),過濾掉不需要的數據,只有返回true 的數據才會被使用;
- take(int n),只取返回數據中的前n個,skip(int n)跳過前n個數據;
- takeLast(int n),只取數據的最后n個,skipLast(int n);
- distinct(),會幫助我們處理重復的數據,但如果數據太大的話,內存需要比較大
- distinctUntilChanged(),只有當新數據與先前的不同,才會輸出,
- first(),last();
- firstOrDefault(),lastOrDefault,如果Observable沒有輸出任何數據時,我們可以給一個默認值;
- elementAt(int n),輸出第n個位置上的數據(從0 開始)
- timeout(),如果在給定時間間隔內,沒有輸出有效數據,則會執行onError();
- delay() 用於事件流中,延遲一段時間再發送來自Observable的結果;
4.2 映射
- map(),用來映射簡單的數據
- flatMap(),用來映射隊列等,但可能會改變數據的順序
- concatMap(),解決了fmp的交錯的問題
- flatMapIterable(),將生成的Iterable與Items進行對應起來(類似於key-value);
- switchMap(),
這幾個方法都是將輸入的數據以一種新的形式輸出; - Scan(),類似於一個累加的方法,后一個item是前面item的后再加上原來的item;
- GroupBy(),
- buffer(int n),將數據作為列表(每n個數據作為一個列表)輸出而不是單個的Items;
- cast() 類似於map();
4.3 合並
- merge()可以將多個輸入整合成一個輸出(並不會合並Items);
- zip(),可以將多個輸入整合成一個輸出(會合並Items);
4.3 重試
- retryWhen(),當接收到onError()事件時,觸發重新訂閱(發生某些錯誤時,需要做什么工作);
- repeat(),當接收到onComplete()事件時,觸發重新訂閱
4.4 線程的調度(Schedulers)
RxJava提供了5種類型的調度者:
Schedulers
- .io(), 使用線程池來為IO操作進行調度,但沒有騎士線程池的大小 作限制,因此使用時需要考慮內存的使用
- .computation(),與IO無關的計算型調度,有很多RxJava相關的默認方法:buffer(),debounce(),delay(),interval()等默認是在該類線程中執行;
- .immediate(),在當前線程中快速開始某項操作,是方法:timeout(),timeInterval等的默認調度器;
- .newThread(), 開啟新線程來執行某項操作
- .trampoline(),為一些不需要立即執行的任務進行調度,會依次執行隊列里的任務,是方法:repeat(),retry()的默認調度器;
RxAndroid還提供了一個Android特有的調度器:AndroidSchedulers.mainThread()來讓代碼在UI主線程中執行;
RxJava提供了一個每一個Observables都可以使用的subscribeOn(),ObserveOn()方法,將Scheduler與Observables建立聯系,我們可以這樣來使用:
.subscribeOn(Schedulers.io()) // 讓事件的產生發生在IO線程,多次調用,以最后一次調用的結果為准
.observeOn(AndroidSchedulers.mainThread()) // 讓事件的回調發生在UI主線程中
.subscribe(....)
5. RxAndroid
前面版本的RxAndroid還提供了AppObservable,ViewObservable,WidgetObservable,LifecycleObservable 等,但最新版本的RxAndroid直接刪掉了這些;
6.參考文獻:
1
Grokking RxJava,Part 4
RxJava處理網絡失敗
RxJava Essentials
給Android開發者的RxJava詳解
拆RxJava
