可觀察對象支持在應用中的發布者和訂閱者之間傳遞消息。在需要進行事件處理,異步編程和處理多值的時候,可觀察對象相對其他技術有顯著的優點。
可觀察對象是聲明式的 —— 也就是說,雖然你定義了一個用於發布值的函數,但是在有消費者訂閱它之前,這個函數並不會實際執行。 訂閱之后,當這個函數執行完或取消訂閱時,訂閱者就會收到通知。
創建 Observable(可觀察對象)
Rx.Observable.create
是 Observable
構造函數的別名,它接收一個參數:subscribe
函數。
使用 Observable 構造函數可以創建任何類型的可觀察流。 當執行可觀察對象的 subscribe() 方法時,這個構造函數就會把它接收到的參數作為訂閱函數來運行。 訂閱函數會接收一個 Observer 對象,並把值發布給觀察者的 next() 方法。
如下:創建一個Observable,並每隔一秒向觀察者發送‘hi’:
var observable = Rx.Observable.create(function subscribe(observer) { var id = setInterval(() => { observer.next('hi') }, 1000); });
注意:Observables 可以使用 create
來創建, 但通常我們使用所謂的創建操作符, 像 of
、from
、interval
、等等。
如下:
Observable.of(...items) —— 返回一個 Observable 實例,它用同步的方式把參數中提供的這些值發送出來。
Observable.from(iterable) —— 把它的參數轉換成一個 Observable 實例。 該方法通常用於把一個數組轉換成一個(發送多個值的)可觀察對象。
執行Observable
Observable 執行可以傳遞三種類型的值:
- "next" :用來處理每個送達值。在開始執行后可能執行零次或多次。比如數字、字符串、對象,等等。
- "error" :可選。用來處理錯誤通知,發送一個 JavaScript 錯誤 或 異常。錯誤會中斷這個可觀察對象實例的執行過程 。
- "complete" :用來處理執行完畢(complete)通知。
"Next" 通知是最重要,也是最常見的類型:它們表示傳遞給觀察者的實際數據。
"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,並且只會執行其中的一個,一旦發送,那么之后不會再發送任何通知了。
下面是 Observable 執行的示例,它發送了三個 "Next" 通知,然后是 "Complete" 通知:
//用try..catch 捕獲異常
var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete();
observer.next(4); // 因為違反規約,所以不會發送
} catch (err) { observer.error(err); // 如果捕獲到異常會發送一個錯誤 } });
清理 Observable 執行
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源。
當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要調用 unsubscribe()
方法就可以取消執行。
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // 稍后: subscription.unsubscribe();
當我們使用 create()
方法創建 Observable 時,Observable 必須定義如何清理執行的資源。你可以通過在 function subscribe()
中返回一個自定義的 unsubscribe
函數。
var observable = Rx.Observable.create(function subscribe(observer) { // 追蹤 interval 資源 var intervalID = setInterval(() => { observer.next('hi'); }, 1000); // 提供取消和清理 interval 資源的方法 return function unsubscribe() { clearInterval(intervalID); }; }); //這里返回一個函數聲明 var subscription=observable.subscribe((x)=>console.log(x)); //執行取消訂閱 subscription.unsubscribe();
Observer (觀察者)
觀察者是由 Observable 發送的值的消費者。觀察者只是一組回調函數的集合,每個回調函數對應一種 Observable 發送的通知類型:next
、error
和 complete
。下面的示例是一個典型的觀察者對象:
var observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), }; observable.subscribe(observer); //觀察者只是有三個回調函數的對象,每個回調函數對應一種 Observable 發送的通知類型。
Subscription (訂閱)
Subscription 基本上只有一個 unsubscribe()
函數,它不需要任何參數,這個函數用來釋放資源或去取消 Observable 執行占用的資源。
Subscription 還可以合在一起,這樣一個 Subscription 調用 unsubscribe()
方法,可能會有多個 Subscription 取消訂閱 。你可以通過把一個 Subscription 添加到另一個上面來做這件事:
var observable1 = Rx.Observable.interval(400); var observable2 = Rx.Observable.interval(300); var subscription = observable1.subscribe(x => console.log('first: ' + x)); var childSubscription = observable2.subscribe(x => console.log('second: ' + x)); subscription.add(childSubscription); setTimeout(() => { // subscription 和 childSubscription 都會取消訂閱 subscription.unsubscribe(); }, 1000);
執行時,我們在控制台中看到:
second: 0 first: 0 second: 1 first: 1 second: 2
Subscriptions 還有一個 remove(otherSubscription)
方法,用來撤銷一個已添加的子 Subscription 。
Subject (主體)
Subject 是一種特殊類型的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行)。
每個 Subject 都是 Observable 。 - 對於 Subject,你可以提供一個觀察者並使用 subscribe
方法,就可以開始正常接收值。從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject 。
在 Subject 的內部,subscribe
不會調用發送值的新執行。它只是將給定的觀察者注冊到觀察者列表中,類似於其他庫或語言中的 addListener
的工作方式。
每個 Subject 都是觀察者。 - Subject 是一個有如下方法的對象: next(v)
、error(e)
和 complete()
。要給 Subject 提供新值,只要調用 next(theValue)
,它會將值多播給已注冊監聽該 Subject 的觀察者們。
在下面的示例中,我們為 Subject 添加了兩個觀察者,然后給 Subject 提供一些值:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(1); subject.next(2);
下面是控制台的輸出:
observerA: 1 observerB: 1 observerA: 2 observerB: 2
因為 Subject 是觀察者,這也就在意味着你可以把 Subject 作為參數傳給任何 Observable 的 subscribe
方法,如下面的示例所展示的:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // 你可以提供一個 Subject 進行訂閱
執行結果:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
使用上面的方法,我們基本上只是通過 Subject 將單播的 Observable 執行轉換為多播的。這也說明了 Subjects 是將任意 Observable 執行共享給多個觀察者的唯一方式。
Subject的三種變體類型:
- BehaviorSubject :主要用來表示隨着時間的流逝,值會改變的情況。比如生日是Subject,而生日就是BehaviorSubject。
- ReplaySubject :當有新的訂閱者訂閱時,可以指定回放的值得個數,或者以 window time (以毫秒為單位) 規定范圍內發生的值。
- AsyncSubject : 當被觀察者的值推送完成時,才將最后一個值發送給觀察者。類似於last ()操作符。
多播的 Observables
多播 Observable 在底層是通過使用 Subject 使得多個觀察者可以看見同一個 Observable 執行。
多播 Observable” 通過 Subject 來發送通知,這個 Subject 可能有多個訂閱者,然而普通的 “單播 Observable” 只發送通知給單個觀察者。
在底層,這就是 multicast
操作符的工作原理:觀察者訂閱一個基礎的 Subject,然后 Subject 訂閱源 Observable 。下面的示例與前面使用 observable.subscribe(subject)
的示例類似:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 在底層使用了 `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); // 在底層使用了 `source.subscribe(subject)`: multicasted.connect();
multicast
操作符返回一個 Observable,它看起來和普通的 Observable 沒什么區別,但當訂閱時就像是 Subject 。multicast
返回的是 ConnectableObservable
,它只是一個有 connect()
方法的 Observable 。
connect()
方法十分重要,它決定了何時啟動共享的 Observable 執行。因為 connect()
方法在底層執行了 source.subscribe(subject)
,所以它返回的是 Subscription,你可以取消訂閱以取消共享的 Observable 執行。