Reactive Extensions for Javascript 誕生於幾年前,隨着angular2正式版的發布,它將會被更多開發者所認知。RxJs提供的核心是Observable對象,它是一個使用可觀察數據序列實現組合異步和事件編程。
跟這個很類似的異步編程模型是Promise,Promise是基於狀態變化的異步模型,一旦由等待狀態進入成功或失敗狀態便不能再次修改,當狀態變化時訂閱者只能夠拿到一個值;而Observable是基於序列的異步編程模型,隨着序列的變化,訂閱者可以持續不斷的獲取新的值。而且Promise只提供回話機制,並沒有更多的操作來支持對結果的復雜處理,而Observable提供了多種多樣的操作符,來處理運算結果,以滿足復雜的應用邏輯。
在實際編程中,我們主要與三個對象打交道:Observable、observer、Subscription:


以一個元素的click事件為例看看如何使用Observable:
var clickStream = new Rx.Observable(observer => { var handle = evt => observer.next(evt); element.addEventListener('click', handle); return () => element.removeEventListener('click', handle); }); subscription = clickStream.subscribe(evt => { console.log('onNext: ' + evt.id); }, err => { console.error('onError'); }, () => { console.log('onComplete'); }); setTimeout(() => { subscription.unsubscribe(); }, 1000);
如果每個事件都需要這么包裝一下,豈不是太麻煩了,所以RxJs為我們提供了一個便利函數:Observable.fromEvent來方便的銜接事件。
常見的鏈接操作符:concat、merge、combineLates等
投影操作:map、flatMap,flatMap需要重點介紹
過濾:filter、distinctUltilChanges、
操作符分類:
Operators by Categories
錯誤處理:catch、retry、finally
減壓:debounce、throttle、sample、pausable
減少:buffer、bufferWithCount、bufferWithTime
想要掌握Observable的操作符先要學會看懂序列圖:

箭頭代表着隨時間變化的序列,比如在一個元素上不斷點擊鼠標,圓圈代表序列對外產生的影響,如每一次點擊元素都會觸發一次事件回調,圓圈中的數字是對外發射的信息,如每一次事件的觸發都會有一個事件對象,代表本次操作的一些信息。
想要靈活的運用Observable處理復雜的邏輯,就要學會使用它提供的操作符。我將操作符分為兩類,單序列操作和復合序列操作,單序列操作是指的針對一個序列進行的操作運算,復合序列操作指的是對兩個或者多個序列進行處理的操作符,復合序列操作相對更難懂一些。
下面先看單序列操作,以map操作為例:

map操作是將一個序列中每次對外發射的信息做轉換,如上圖map將每次的發射值乘以十,那么當訂閱者訂閱之后每次得到的訂閱值就不再是原始的123而是經過轉換后的10 20 30。通過序列圖能更方便的理解Observable的運算。
下面我們來看一個復合序列操作,以merge為例

merge操作的目的是將兩個獨立序列,合成一個序列。原本序列1隨着時間的前進,在100ms時發射a,在200ms時發射b,300ms時發射c,它的訂閱者在400ms將受到abc三個值;序列2在150ms時發射d,250ms時發射e,350ms時發射f,它的訂閱者在400ms內收到def三個值。而merge之后的新序列將在400ms內收到abcdef(注意順序)。
常用操作符的理解:
Observable.range:發射一定數量值得序列。
Observable.toArray: 在序列完成時將所有發射值轉換為一個數組。
Observable.flatMap: 將原始序列流中的元素轉化為一個新的序列流,並將這個新的序列流merge到原來的序列中元素的位置。
Observable.startWith: 它會設置Observable序列的第一個值
Observable.combineLatest: 類似於promiseAll,在所有序列有結果后才會執行
Observable.scan: 將序列中每次發射的值可以做聚合,與reduce類似,reduce會將整個序列的值聚合起來,在序列完成時發送一個最終值
Observable.sample: 從持續的序列中取得一定的樣品
Observable.merge:將多個序列合並成一個,可以做OR來使用
Observable.timestamp: 能夠得到每個發射值的發射時的時間
Observable.distinctUntilChanged(compare, selector): selector取出用來比較的key,compare用來比較兩個key
Observable.takeWhile() 當參數為false時停止發射數據