RxJS 簡介:可觀察對象、觀察者與操作符


RxJS 簡介:可觀察對象、觀察者與操作符

對於響應式編程來說,RxJS 是一個不可思議的工具。今天我們將深入探討什么是 Observable(可觀察對象)和 observer(觀察者),然后了解如何創建自己的 operator(操作符)。

如果你之前用過 RxJS,想了解它的內部工作原理,以及 Observable、operator 是如何運作的,這篇文章將很適合你閱讀。

什么是 Observable(可觀察對象)?

可觀察對象其實就是一個比較特別的函數,它接受一個“觀察者”(observer)對象作為參數(在這個觀察者對象中有 “next”、“error”、“complete”等方法),以及它會返回一種解除與觀察者關系的邏輯。例如我們自己實現的時候會使用一個簡單的 “unsubscribe” 函數來實現退訂功能(即解除與觀察者綁定關系的邏輯)。而在 RxJS 中, 它是一個包含 unsubsribe 方法的訂閱對象(Subscription)。

可觀察對象會創建觀察者對象(稍后我們將詳細介紹它),並將它和我們希望獲取數據值的“東西”連接起來。這個“東西”就是生產者(producer),它可能來自於 click 或者 input之類的 DOM 事件,是數據值的來源。當然,它也可以是一些更復雜的情況,比如通過 HTTP 與服務器交流的事件。

我們稍后將要自己寫一個可觀察對象,以便更好地理解它!在此之前,讓我們先看看一個訂閱對象的例子:

const node = document.querySelector('input[type=text]'); const input$ = Rx.Observable.fromEvent(node, 'input'); input$.subscribe({ next: (event) => console.log(`你剛剛輸入了 ${event.target.value}!`), error: (err) => console.log(`Oops... ${err}`), complete: () => console.log(`完成!`) });

這個例子使用了一個 <input type="text"> 節點,並將其傳入 Rx.Observable.fromEvent() 中。當我們觸發指定的事件名時,它將會返回一個輸入的 Event 的可觀察對象。(因此我們在 console.log 中用 ${event.target.value} 可以獲取輸入值)

當輸入事件被觸發的時候,可觀察對象會將它的值傳給觀察者。

什么是 Observer(觀察者)?

觀察者相當容易理解。在前面的例子中,我們傳入 .subscribe() 中的對象字面量就是觀察者(訂閱對象將會調用我們的可觀察對象)。

.subscribe(next, error, complete) 也是一種合法的語法,但是我們現在研究的是對象字面量的情況。

當一個可觀察對象產生數據值的時候,它會通知觀察者,當新的值被成功捕獲的時候調用 .next(),發生錯誤的時候調用 .error()

當我們訂閱一個可觀察對象的時候,它會持續不斷地將值傳遞給觀察者,直到發生以下兩件事:一種是生產者告知沒有更多的值需要傳遞了,這種情況它會調用觀察者的 .complete();一種是我們(“消費者”)對之后的值不再感興趣,決定取消訂閱(unsubsribe)。

如果我們想要對可觀察對象傳來的值進行組成構建(compose),那么在值傳達最終的 .subscribe() 代碼塊之前,需要經過一連串的可觀察對象(也就是操作符)處理。這個一連串的“鏈”也就是我們所說的可觀察對象序列。鏈中的每個操作符都會返回一個新的可觀察對象,讓我們的序列能夠持續進行下去——這也就是我們所熟知的“流”。

什么是 Operator(操作符)?

我們前面提到,可觀察對象能夠進行鏈式調用,也就是說我們可以像這樣寫代碼:

const input$ = Rx.Observable.fromEvent(node, 'input') .map(event => event.target.value) .filter(value => value.length >= 2) .subscribe(value => { // use the `value` });

這段代碼做了下面一系列事情:

  • 我們先假定用戶輸入了一個“a”
  • 可觀察對象將會對這個輸入事件作出反應,將值傳給下一個觀察者
  • “a”被傳給了訂閱了我們初始可觀察對象的 .map()
  • .map() 會返回一個 event.target.value 的新可觀察對象,然后調用它觀察者對象中的 .next()
  • .next() 將會調用訂閱了 .map() 的 .filter(),並將 .map() 處理后的值傳遞給它
  • .filter() 將會返回另一個可觀察對象,.filter() 過濾后留下 .length 大於等於 2 的值,並將其傳給 .next()
  • 我們通過 .subscribe() 獲得了最終的數據值

這短短的幾行代碼做了這么多的事!如果你還覺得弄不清,只需要記住:

每當返回一個新的可觀察對象,都會有一個新的觀察者掛載到前一個可觀察對象上,這樣就能通過觀察者的“流”進行傳值,對觀察者生產的值進行處理,然后調用 .next() 方法將處理后的值傳遞給下一個觀察者。

簡單來說,操作符將會不斷地依次返回新的可觀察對象,讓我們的流能夠持續進行。作為用戶而言,我們不需要關心什么時候、什么情況下需要創建與使用可觀察對象與觀察者,我們只需要用我們的訂閱對象進行鏈式調用就行了。

創建我們自己的 Observable(可觀察對象)

現在,讓我們開始寫自己的可觀察對象的實現吧。盡管它不會像 Rx 的實現那么高級,但我們還是對完善它充滿信心。

Observable 構造器

首先,我們需要創建一個 Observable 構造函數,此構造函數接受且僅接受 subscribe 函數作為其唯一的參數。每個 Observable 實例都存儲 subscribe 屬性,稍后可以由觀察者對象調用它:

function Observable(subscribe) { this.subscribe = subscribe; }

每個分配給 this.subscribe 的 subscribe 回調都將會被我們或者其它的可觀察對象調用。這樣我們下面做的事情就有意義了。

Observer 示例

在深入探討實際情況之前,我們先看一看基礎的例子。

現在我們已經配好了可觀察對象函數,可以調用我們的觀察者,將 1 這個值傳給它並訂閱它:

const one$ = new Observable((observer) => { observer.next(1); observer.complete(); }); one$.subscribe({ next: (value) => console.log(value) // 1 });

我們訂閱了 Observable 實例,將我們的 observer(對象字面量)傳入構造器中(之后它會被分配給 this.subscribe)。

Observable.fromEvent

現在我們已經完成了創建自己的 Observable 的基礎步驟。下一步是為 Observable 添加 static 方法:

Observable.fromEvent = (element, name) => { };

我們將像使用 RxJS 一樣使用我們的 Observable:

const node = document.querySelector('input'); const input$ = Observable.fromEvent(node, 'input');

這意味着我們需要返回一個新的 Observable,然后將函數作為參數傳遞給它:

Observable.fromEvent = (element, name) => { return new Observable((observer) => { }); };

這段代碼將我們的函數傳入了構造器中的 this.subscribe。接下來,我們需要將事件監聽設置好:

Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => {}, false); }); };

那么這個 observer 參數是什么呢?它又是從哪里來的呢?

這個 observer 其實就是攜帶 nexterrorcomplete 的對象字面量。

這塊其實很有意思。observer 在 .subscribe() 被調用之前都不會被傳遞,因此 addEventListener 在 Observable 被“訂閱”之前都不會被執行。

一旦調用 subscribe,也就會調用 Observable 構造器內的 this.subscribe 。它將會調用我們傳入 new Observable(callback) 的 callback,同時也會依次將值傳給我們的觀察者。這樣,當 Observable 做完一件事的時候,它就會用更新過的值調用我們觀察者中的 .next() 方法。

那么之后呢?我們已經得到了初始化好的事件監聽器,但是還沒有調用 .next()。下面完成它:

Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => { observer.next(event); }, false); }); };

我們都知道,可觀察對象在被銷毀前需要一個“處理后事”的函數,在我們這個例子中,我們需要移除事件監聽:

Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); };

因為這個 Observable 還在處理 DOM API 和事件,因此我們還不會去調用 .complete()。這樣在技術上就有無限的可用性。

試一試吧!下面是我們已經寫好的完整代碼:

const node = document.querySelector('input'); const p = document.querySelector('p'); function Observable(subscribe) { this.subscribe = subscribe; } Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); }; const input$ = Observable.fromEvent(node, 'input'); const unsubscribe = input$.subscribe({ next: (event) => { p.innerHTML = event.target.value; } }); // 5 秒之后自動取消訂閱 setTimeout(unsubscribe, 5000);

在線示例:

創造我們自己的 Operator(操作符)

在我們理解了可觀察對象與觀察者對象的概念之后,我們可以更輕松地去創造我們自己的操作符了。我們在 Observable 對象原型中加上一個新的方法:

Observable.prototype.map=function(mapFn){ };

這個方法將會像 JavaScript 中的 Array.prototype.map 一樣使用,不過它可以對任何值用:

const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value);

所以我們要取得回調函數,並調用它,返回我們期望得到的數據。在這之前,我們需要拿到流中最新的數據值。

下面該做什么就比較明了了,我們要得到調用了這個 .map() 操作符的 Observable 實例的引用入口。我們是在原型鏈上編程,因此可以直接這么做:

Observable.prototype.map = function (mapFn) { const input = this; };

找找樂子吧!現在我們可以在返回的 Obeservable 中調用 subscribe:

Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe(); }); };

我們要返回 input.subscribe() ,因為在我們退訂的時候,非訂閱對象將會順着鏈一直轉下去,解除每個 Observable 的訂閱。

這個訂閱對象將允許我們把之前 Observable.fromEvent 傳來的值傳遞下去,因為它返回了構造器中含有 subscribe 原型的新的 Observable 對象。我們可以輕松地訂閱它對數據值做出的任何更新!最后,完成通過 map 調用我們的 mapFn() 的功能:

Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe({ next: (value) => observer.next(mapFn(value)), error: (err) => observer.error(err), complete: () => observer.complete() }); }); };

現在我們可以進行鏈式調用了!

const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value); input$.subscribe({ next: (value) => { p.innerHTML = value; } });

注意到最后一個 .subscribe() 不再和之前一樣傳入 Event 對象,而是傳入了一個 value 了嗎?這說明你成功地創建了一個可觀察對象流。

再試試:

希望這篇文章對你來說還算有趣~:)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM