RxJS 簡介:可觀察對象、觀察者與操作符
對於響應式編程來說,RxJS 是一個不可思議的工具。今天我們將深入探討什么是 Observable(可觀察對象)和 observer(觀察者),然后了解如何創建自己的 operator(操作符)。
如果你之前用過 RxJS,想了解它的內部工作原理,以及 Observable、operator 是如何運作的,這篇文章將很適合你閱讀。
什么是 Observable(可觀察對象)?
可觀察對象其實就是一個比較特別的函數,它接受一個“觀察者”(observer)對象作為參數(在這個觀察者對象中有 “next”、“error”、“complete”等方法),以及它會返回一種解除與觀察者關系的邏輯。例如我們自己實現的時候會使用一個簡單的 “unsubscribe” 函數來實現退訂功能(即解除與觀察者綁定關系的邏輯)。而在 RxJS 中, 它是一個包含 unsubsribe
方法的訂閱對象(Subscription)。
可觀察對象會創建觀察者對象(稍后我們將詳細介紹它),並將它和我們希望獲取數據值的“東西”連接起來。這個“東西”就是生產者(producer),它可能來自於 click
或者 input
之類的 DOM 事件,是數據值的來源。當然,它也可以是一些更復雜的情況,比如通過 HTTP 與服務器交流的事件。
我們稍后將要自己寫一個可觀察對象,以便更好地理解它!在此之前,讓我們先看看一個訂閱對象的例子:
const node = document.querySelector('input[type=text]'); const input$ = Rx.Observable.fromEvent(node, 'input'); input$.subscribe({ next: (event) => console.log(`你剛剛輸入了 ${event.target.value}!`), error: (err) => console.log(`Oops... ${err}`), complete: () => console.log(`完成!`) });
這個例子使用了一個 <input type="text">
節點,並將其傳入 Rx.Observable.fromEvent()
中。當我們觸發指定的事件名時,它將會返回一個輸入的 Event
的可觀察對象。(因此我們在 console.log 中用 ${event.target.value}
可以獲取輸入值)
當輸入事件被觸發的時候,可觀察對象會將它的值傳給觀察者。
什么是 Observer(觀察者)?
觀察者相當容易理解。在前面的例子中,我們傳入 .subscribe()
中的對象字面量就是觀察者(訂閱對象將會調用我們的可觀察對象)。
.subscribe(next, error, complete)
也是一種合法的語法,但是我們現在研究的是對象字面量的情況。
當一個可觀察對象產生數據值的時候,它會通知觀察者,當新的值被成功捕獲的時候調用 .next()
,發生錯誤的時候調用 .error()
。
當我們訂閱一個可觀察對象的時候,它會持續不斷地將值傳遞給觀察者,直到發生以下兩件事:一種是生產者告知沒有更多的值需要傳遞了,這種情況它會調用觀察者的 .complete()
;一種是我們(“消費者”)對之后的值不再感興趣,決定取消訂閱(unsubsribe)。
如果我們想要對可觀察對象傳來的值進行組成構建(compose),那么在值傳達最終的 .subscribe()
代碼塊之前,需要經過一連串的可觀察對象(也就是操作符)處理。這個一連串的“鏈”也就是我們所說的可觀察對象序列。鏈中的每個操作符都會返回一個新的可觀察對象,讓我們的序列能夠持續進行下去——這也就是我們所熟知的“流”。
什么是 Operator(操作符)?
我們前面提到,可觀察對象能夠進行鏈式調用,也就是說我們可以像這樣寫代碼:
const input$ = Rx.Observable.fromEvent(node, 'input') .map(event => event.target.value) .filter(value => value.length >= 2) .subscribe(value => { // use the `value` });
這段代碼做了下面一系列事情:
- 我們先假定用戶輸入了一個“a”
- 可觀察對象將會對這個輸入事件作出反應,將值傳給下一個觀察者
- “a”被傳給了訂閱了我們初始可觀察對象的
.map()
.map()
會返回一個event.target.value
的新可觀察對象,然后調用它觀察者對象中的.next()
.next()
將會調用訂閱了.map()
的.filter()
,並將.map()
處理后的值傳遞給它.filter()
將會返回另一個可觀察對象,.filter()
過濾后留下.length
大於等於 2 的值,並將其傳給.next()
- 我們通過
.subscribe()
獲得了最終的數據值
這短短的幾行代碼做了這么多的事!如果你還覺得弄不清,只需要記住:
每當返回一個新的可觀察對象,都會有一個新的觀察者掛載到前一個可觀察對象上,這樣就能通過觀察者的“流”進行傳值,對觀察者生產的值進行處理,然后調用 .next()
方法將處理后的值傳遞給下一個觀察者。
簡單來說,操作符將會不斷地依次返回新的可觀察對象,讓我們的流能夠持續進行。作為用戶而言,我們不需要關心什么時候、什么情況下需要創建與使用可觀察對象與觀察者,我們只需要用我們的訂閱對象進行鏈式調用就行了。
創建我們自己的 Observable(可觀察對象)
現在,讓我們開始寫自己的可觀察對象的實現吧。盡管它不會像 Rx 的實現那么高級,但我們還是對完善它充滿信心。
Observable 構造器
首先,我們需要創建一個 Observable 構造函數,此構造函數接受且僅接受 subscribe
函數作為其唯一的參數。每個 Observable 實例都存儲 subscribe 屬性,稍后可以由觀察者對象調用它:
function Observable(subscribe) { this.subscribe = subscribe; }
每個分配給 this.subscribe
的 subscribe
回調都將會被我們或者其它的可觀察對象調用。這樣我們下面做的事情就有意義了。
Observer 示例
在深入探討實際情況之前,我們先看一看基礎的例子。
現在我們已經配好了可觀察對象函數,可以調用我們的觀察者,將 1
這個值傳給它並訂閱它:
const one$ = new Observable((observer) => { observer.next(1); observer.complete(); }); one$.subscribe({ next: (value) => console.log(value) // 1 });
我們訂閱了 Observable 實例,將我們的 observer(對象字面量)傳入構造器中(之后它會被分配給 this.subscribe
)。
Observable.fromEvent
現在我們已經完成了創建自己的 Observable 的基礎步驟。下一步是為 Observable 添加 static
方法:
Observable.fromEvent = (element, name) => { };
我們將像使用 RxJS 一樣使用我們的 Observable:
const node = document.querySelector('input'); const input$ = Observable.fromEvent(node, 'input');
這意味着我們需要返回一個新的 Observable,然后將函數作為參數傳遞給它:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { }); };
這段代碼將我們的函數傳入了構造器中的 this.subscribe
。接下來,我們需要將事件監聽設置好:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => {}, false); }); };
那么這個 observer
參數是什么呢?它又是從哪里來的呢?
這個 observer
其實就是攜帶 next
、error
、complete
的對象字面量。
這塊其實很有意思。
observer
在.subscribe()
被調用之前都不會被傳遞,因此addEventListener
在 Observable 被“訂閱”之前都不會被執行。
一旦調用 subscribe,也就會調用 Observable 構造器內的 this.subscribe
。它將會調用我們傳入 new Observable(callback)
的 callback,同時也會依次將值傳給我們的觀察者。這樣,當 Observable 做完一件事的時候,它就會用更新過的值調用我們觀察者中的 .next()
方法。
那么之后呢?我們已經得到了初始化好的事件監聽器,但是還沒有調用 .next()
。下面完成它:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => { observer.next(event); }, false); }); };
我們都知道,可觀察對象在被銷毀前需要一個“處理后事”的函數,在我們這個例子中,我們需要移除事件監聽:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); };
因為這個 Observable 還在處理 DOM API 和事件,因此我們還不會去調用 .complete()
。這樣在技術上就有無限的可用性。
試一試吧!下面是我們已經寫好的完整代碼:
const node = document.querySelector('input'); const p = document.querySelector('p'); function Observable(subscribe) { this.subscribe = subscribe; } Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); }; const input$ = Observable.fromEvent(node, 'input'); const unsubscribe = input$.subscribe({ next: (event) => { p.innerHTML = event.target.value; } }); // 5 秒之后自動取消訂閱 setTimeout(unsubscribe, 5000);
在線示例:
創造我們自己的 Operator(操作符)
在我們理解了可觀察對象與觀察者對象的概念之后,我們可以更輕松地去創造我們自己的操作符了。我們在 Observable
對象原型中加上一個新的方法:
Observable.prototype.map=function(mapFn){ };
這個方法將會像 JavaScript 中的 Array.prototype.map
一樣使用,不過它可以對任何值用:
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value);
所以我們要取得回調函數,並調用它,返回我們期望得到的數據。在這之前,我們需要拿到流中最新的數據值。
下面該做什么就比較明了了,我們要得到調用了這個 .map()
操作符的 Observable 實例的引用入口。我們是在原型鏈上編程,因此可以直接這么做:
Observable.prototype.map = function (mapFn) { const input = this; };
找找樂子吧!現在我們可以在返回的 Obeservable 中調用 subscribe:
Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe(); }); };
我們要返回
input.subscribe()
,因為在我們退訂的時候,非訂閱對象將會順着鏈一直轉下去,解除每個 Observable 的訂閱。
這個訂閱對象將允許我們把之前 Observable.fromEvent
傳來的值傳遞下去,因為它返回了構造器中含有 subscribe
原型的新的 Observable 對象。我們可以輕松地訂閱它對數據值做出的任何更新!最后,完成通過 map 調用我們的 mapFn()
的功能:
Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe({ next: (value) => observer.next(mapFn(value)), error: (err) => observer.error(err), complete: () => observer.complete() }); }); };
現在我們可以進行鏈式調用了!
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value); input$.subscribe({ next: (value) => { p.innerHTML = value; } });
注意到最后一個 .subscribe()
不再和之前一樣傳入 Event
對象,而是傳入了一個 value
了嗎?這說明你成功地創建了一個可觀察對象流。
再試試:
希望這篇文章對你來說還算有趣~:)