歡迎指錯與討論 : )
當前RxJS版本:5.0.0-beta.10。更詳細的內容盡在RxJS官網http://reactivex.io/rxjs/manual/overview.html。文章比較長,可以通過快捷鍵 command+f 或者 ctrl+f 搜索主要內容。
- 前言
RxJS在ng2、redux-observable或者前端數據層中扮演一個重要角色,因此筆者想學好RxJS,提前做好准備。本文95%非原創,而是筆者對RxJS官網基礎篇的翻譯,如有錯漏請指出。本文主要內容:簡介和六大概念(Observable、Observer、Subscription、Subject、Operators、Scheduler)。
具體Api請本博客的另外幾篇中文api翻譯文章:
transform(轉換)、filter (過濾)、combination/multicasting (組合、廣播)、ErrorHanding/Condition/Mathematical(錯誤處理、情況處理、數學方法)
- 安裝
import {Observable} from 'rxjs/Observable' // 1. 按需打包,減輕bundle.js大小
import 'rxjs/add/observable/merge'; // 2. 按需導入函數,如merge
- 一些概念
- Observable( 可被觀察的 ) : 是一個包含來自未來、可以被使用的值( value )或事件( event )的集合
- Observe( 觀察者 ):是一個知道如何監聽、處理來自Obervable的值的函數集合
- Subscription( 訂閱 ):代表着Observable的執行動作,我們可以使用它來停止Obervable繼續執行
- Operators( 操作 ):一系列可以操作集合的pure function,像是過濾( filter )、轉換( map )等等
- Subject( ):相當於一個事件發射器,是唯一能夠向多個Observer廣播值( value )的唯一手段
- Schedulers( 調度 ):是一個中央調度員,幫助我們控制並發,協調計算( setTimeout、requestAnimationFrame等 )
- Observable( 被觀察者 )
通常而言,Observable都會延遲產生值 ,比如當我們subscribe一個observable的時候它才會向我們發送這些值 let observable = Rx.Observable.range(1,3)
- pull( 拉取 ) 每個函數都是一個數據的生產者,每個調用函數的那個'人',都會希望從這個函數中能夠獲得(pull) 唯一的返回值
- push( 推送 ) 在數據生產者中( 如函數 ),會在特定時候把數據推送至消費者,消費者在獲得數據之前啥也不會做
- Observable與函數、promsise的對比:函數是當調用才同步計算,並最終只返回一個值的;promise是會或者不會返回一個值;Observable是當調用才同步或者異步地計算,並可能產生0到無窮多個值的。Observable就像一個沒有參數的函數,並不斷生成一些值供我們使用,因此它也像是一個事件發射機( EventEmitters )。在Observable中subscribe就像call一個函數,你訂閱它,它才會被'啟動'。同一個Observable對於不同的subscribe,是不會共享結果的( 通常情況下這樣子的,但可以通過調用api來共享 )。
- Observable四大核心:創建 、訂閱 、 執行 、銷毀 。
訂閱( subscribe )。當對一個Observable調用多個subscribe函數並創建多個observe時,observe之間不會共享任何東西,因為在Observable.create內部是對observe列表調用各自的回調的 Observable.create(function subscribe(observe){...})
執行( Executing )。Next函數能夠將數據傳遞給Observer,同時在執行期間,能在Observable內部調用多個Next( )函數。同時建議在Observabl內部使用try/catch語法。
銷毀Observe
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // Later:
subscription.unsubscribe();
銷毀Observable
var observable = Rx.Observable.create(function subscribe(observer) { var intervalID = setInterval(() => { ... }, 1000); return function unsubscribe() { clearInterval(intervalID); }; });
- Observer( 觀察者 )
什么是觀察者?觀察者其實是數據的消費者,把來自Observble的數據拿過來使用。同時,Observer的本質是一系列的回調函數,是來自於Observable傳遞數據后的回調函數。我們可以直接通過subscribe函數創建觀察者
observable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );
- Subscription( 訂閱 )
什么是Subscription?它其實是代表着Observable的'執行'的對象,我們可以通過它的 unsubscribe 方法銷毀Observable的執行。同時我們能使用 add 方法,一次銷毀多個
var subscription = observable1.subscribe(x => console.log('first: ' + x)); var childSubscription = observable2.subscribe(x => console.log('second: ' + x)); subscription.add(childSubscription); setTimeout(() => { subscription.unsubscribe(); }, 1000);
- Subject( )
什么是Subject?它是在Rx中一種比較特殊的Observable( 同時它也是Observer ),它能夠讓值( value )同時向多個Observer傳播( 廣播 )。而一般的Observable都是' 單播 '形式,即:每一個訂閱了同一個Observable的observer,實際上是擁有不同的、獨立的Observable的執行( 原文:each subscribed Observer owns an independent execution of the Observable ),而Subject是多播的。
// Observable對比Subject
let source = Rx.Observable.create((observer)=>{ observer.next('1'); observer.next('2'); }); source.subscribe((x)=>{console.log(x);}); source.subscribe((x)=>{console.log(x);}); // 輸出 1 2 1 2
let subject = new Rx.Subject(); subject.subscribe((x)=>{console.log(`${x}`);}); subject.subscribe((x)=>{console.log(`${x}`);}); subject.next(1); subject.next(2); // 輸出 1 1 2 2
// Subject可以在'多播'情景下對Observable進行優化 // 明顯看到在subject下,Observable只執行了一次
var source = Rx.Observable.create((observer)=>{ console.log(`source was called`); observer.next(1);observer.next(2);observer.next(3); }); source.subscribe({next: (v) => console.log('observerA: ' + v)}); source.subscribe({next: (v) => console.log('observerB: ' + v)}); // 輸出為 'source was called' observerA: 1 observerA: 2 // 'source was called' observerB: 1 observerB: 2
var source = Rx.Observable.create((o)=>{ console.log(`source was called`); o.next(1);o.next(2); }); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 原理是`subject.subscribe({...})`:返回的multicasted是一個connectableObservable
multicasted.subscribe({next: (v) => console.log('observerA: ' + v)}); multicasted.subscribe({next: (v) => console.log('observerB: ' + v)}); // 原理是 `source.subscribe(subject)`:
multicasted.connect(); // 輸出 ‘source was called’ observerA: 1 observerB: 1 // observerA: 2 observerB: 2
可以單個取消訂閱狀態 observer.subscribe( ) ,也可以直接銷毀Subject subscriptionConnect.unsubscribe( )
- refCount( ) 該api能監聽當前connectableObservable的'連接狀態',當有大於0個subscribe掛在它上面它會自動執行 connect ,無subscribtion的時候會自動執行 unsubscribe( )
var refCounted = source.multicast(subject).refCount();
- BehaviorSubject() 它能儲存上一次從Observable發過來的值,每當有新的observer的時,會把改值立即發送給它
var subject = new Rx.BehaviorSubject(0)// 初始值
- ReplaySubject( 重現 ) 是BehaviorSubject的加強版,它能儲存從Observable發送過來的一系列值,當有新的observer連接時,會把這些值發送給它。
var subject = new Rx.ReplaySubject(3); // 緩存空間為3
- AsyncSubject(異步)僅將從Observable發送過來的最后一個值發送給Observe
var subject = new Rx.AsyncSubject();
- Operators
什么是Operators?Operators是Rx中最有用的一系列函數,它們建立在Observable之上,並能優雅地以同步代碼的寫法,將不同的異步流代碼鏈式組合到一起。同時,Operator都是pure的,因為這些函數不會直接修改傳入進來的Observable,而是經過修飾之后返回一個新Observable。
在鏈式操作中,下一個operator會根據上一個operator修改后的Observable繼續工作。
- 具體請本博客的另外幾篇中文api翻譯文章:
transform(請點擊)、filter (請點擊)、combination/multicasting (請點擊)、ErrorHanding/Condition/Mathematical(請點擊)
- Scheduler
Scheduler有咩用?Scheduler能控制一個訂閱的開始、數據的傳遞。它由三部分組成:
- scheduler是一種數據結構,它知道如何基於優先級或者其他標准對任務隊列進行儲存
- scheduler是一個執行環境,它指何時何地地執行任務
- scheduler有一個' 時鍾 '的概念,它能讓我們自己定義,當Observable向Observer時會處於什么環境下( context )
// observeOn( RX.Scheduler.async )
var observable = Rx.Observable.create(function (observer) { observer.next(1);observer.next(2); }) .observeOn(Rx.Scheduler.async); console.log('just before subscribe'); observable.subscribe({next: x => console.log('got value ' + x)}); console.log('just after subscribe'); // 輸出 // just before subscribe just after subscribe // got value 1 got value 2
- Scheduler類型
- Scheduler.queue 基於當前的事件框架在一個隊列上工作,當我們需要遍歷操作時可以使用它
- Scheduler.asap
- Scheduler.async 在內部,Schedules會使用setInterval,當我們需要以時間為基線時就使用它
- 基礎
- 創建流: Observable.create、of、from、fromEvent( target,eventType )、fromPromise、bindCallback( 把callback寫法轉換為鏈式寫法 )
let exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe( exist=>console.log(`exist? : ${exist} `) )
- 事件流: filter、delay( 延時 )、throttleTime( 時間間隔 )、debounceTime( 事件暫停x毫秒后 )、take( 執行x次后停止 )、takeUtil( 取消訂閱 )
let inputStream = Rx.Observable.fromEvent(document.querySelector('input'), 'keyup')
let stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click') inputStream.throttleTime(200) // 每隔200毫秒才釋放一次 .takeUtil(stopStream) // 當觸發時,停止訂閱 .subscribe(($event)=> {console.log(`${$event.target.value}`)}) // 事件對象
- 轉換流、過濾流:map、distinct( 過濾重復 )、distince( 過濾連續重復 )
// 輸入hello world
input.pluck('target', 'value').distinctUntilChanged()
.subscribe(value => console.log(value)); // "helo wrd"
input.pluck('target', 'value').distinct() .subscribe(value => console.log(value)); // "helo world"
- 創建一個應用
- RxJS的工具函數都是pure、無狀態的一類函數。但我們的應用中往往需要記錄大量的state,那在Rx如何記錄狀態呢?類似於redux,rx中的scan函數能夠使用reduce函數,將流的結果合並。我們需要做的僅僅是保存這個狀態( state )值
// 基礎模式
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click') // 內部的reduce函數能夠積累(此處是相加,若是對象可以是Object.assign),並相應變化 .scan(count => count + 1, 0) // Set the count on an element each time it changes .subscribe(count => document.querySelector('#count').innerHTML = count);
- 一個更好的模式
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click') // 返回了一個, 類似redux中 經過switch(action.type)的reducer 與函數 .map(() => state => Object.assign({}, state, {count: state.count + 1})) // 直接把函數作用於初始化state,scan同時能保存這個更新后的state .scan((state, changeFn) => changeFn(state), {count: 0}) .subscribe(state => document.querySelector('#count').innerHTML = state.count);
- 對於某一個組件還可以
var state = Rx.Observable.merge(
increase,
decrease,
input
).scan((state, changeFn) => changeFn(state), { count: 0, inputValue: '' });