RxJS 是一個庫,它通過使用Observable序列來編寫異步和基於事件的程序。其中Observable的設計主要運用到的設計模式有觀察者模式(Observer pattern )和迭代器模式(Iterator pattern)。
1.觀察者模式(Observer pattern)
1.1 什么是觀察者模式?
觀察者模式又叫發布訂閱模式(Publish/Subscribe),它是一種一對多的關系,讓多個觀察者(Observer)同時監聽一個主題(Subject),這個主題也就是被觀察者(Observable),被觀察者的狀態發生變化時就會通知所有的觀察者,使得它們能夠接收到更新的內容。
1.2 觀察者模式主要是為了解決什么問題?
1)定義對象之間的一對多依賴關系而不使對象緊密耦合。
2)確保當一個對象改變狀態時,自動更新開放數量的從屬對象。
3)一個對象應該可以通知開放式數量的其他對象。
1.3 RxJS的Observable中觀察模式實現源碼 ("rxjs": "~6.4.0" (TypeScript))
Observable 與 Observer 之間的訂閱發布關系(觀察者模式) 如下:
訂閱:Observer 通過 Observable 提供的 subscribe() 方法訂閱 Observable。
發布:Observable 通過回調 next 方法向 Observer 發布事件。
1.3.1 Observable源碼
// 基於源代碼有刪減 /** *表示任意時間內的任意一組值。這是RxJS最基本的構件 * * @class Observable<T> */ export declare class Observable<T> implements Subscribable<T> { /** * @constructor * @param {Function} 訂閱當被初始訂閱時被調用的函數。這個函數有一個訂閱者,它有新的值 *可以'next',也可以調用'error'方法來引發錯誤,或者'complete'可以被調用來通知成功完成。 */ constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic); /** * 通過調用Observable構造函數創建一個新的cold Observable * @static true * @owner Observable * @method create * @param {Function} subscribe? 要傳遞給Observable構造函數的訂閱函數 * @return {Observable} a new cold observable * @nocollapse * @deprecated use new Observable() instead */ static create: Function; /** * 創建一個新的可觀察對象,將這個可觀察對象作為源,並將傳遞的操作符定義為新可觀察對象的操作符。 * @method lift * @param {Operator} operator the operator defining the operation to take on the observable * @return {Observable} a new observable with the Operator applied */ lift<R>(operator: Operator<T, R>): Observable<R>; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; /** @deprecated This is an internal implementation detail, do not use. */ _trySubscribe(sink: Subscriber<T>): TeardownLogic; /** * @method forEach * @param {Function} next a handler for each value emitted by the observable * @param {PromiseConstructor} [promiseCtor] a constructor function used to instantiate the Promise * @return {Promise} a promise that either resolves on observable completion or * rejects with the handled error */ forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void>; /** @internal This is an internal implementation detail, do not use. */ _subscribe(subscriber: Subscriber<any>): TeardownLogic; //省略重載的方法 pipe<A>(op1: OperatorFunction<T, A>): Observable<A>; //省略重載的方法 toPromise<T>(this: Observable<T>): Promise<T>; } // 基於源代碼有刪減 export interface Subscribable<T> { /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable; //省略重載的方法 }
1.3.2 Subject
// 基於源代碼有刪減
/** * Subject 是一個特殊類型的可觀察對象,允許值被多播給許多觀察者。主題就像事件發射器。 * * 每一個Subject都是可觀察的(Observable)和觀察者(Observer)。您可以訂閱一個主題,還可以調用next來獲取提要值以及error和complete。 * * @class Subject<T> */ export declare class Subject<T> extends Observable<T> implements SubscriptionLike { observers: Observer<T>[]; closed: boolean; isStopped: boolean; hasError: boolean; thrownError: any; constructor(); /**@nocollapse * @deprecated use new Subject() instead */ static create: Function; lift<R>(operator: Operator<T, R>): Observable<R>; next(value?: T): void; error(err: any): void; complete(): void; unsubscribe(): void; /** @deprecated This is an internal implementation detail, do not use. */ _trySubscribe(subscriber: Subscriber<T>): TeardownLogic; /** @deprecated This is an internal implementation detail, do not use. */ _subscribe(subscriber: Subscriber<T>): Subscription; /** * 創建一個以這個主題為源的新觀察對象。您可以這樣做來創建主題的自定義observer邏輯,並對使用Observable的代碼隱藏它。code that uses the Observable. * @return {Observable} Observable that the Subject casts to */ asObservable(): Observable<T>; } /** SUBSCRIPTION INTERFACES */ export interface Unsubscribable { unsubscribe(): void; } export interface SubscriptionLike extends Unsubscribable { unsubscribe(): void; readonly closed: boolean; }
1.3.3 Observer
export interface Observer<T> { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }
Observable 與 Observer 之間的訂閱發布關系(觀察者模式) 如下:
訂閱:Observer 通過 Observable 提供的 subscribe() 方法訂閱 Observable。
發布:Observable 通過回調 next 方法向 Observer 發布事件。
查看Subject源代碼,我們發現Subject既可以作為Observable又可以作為Observer, 在angular項目里面,組件之間的異步通訊可以使用這一特性去實現。
1.4 觀察者模式一些優秀的框架的應用場景:Zookeeper事件通知節點、Spring事件驅動、消息訂閱通知、分布式配置中心等等。
2. 迭代器模式(Iterator pattern)
2.1 什么是迭代器模式?
迭代器(Iterator)模式又叫游標(Sursor)模式,迭代器具有 next 方法,可以順序訪問一個聚合對象中的各個元素,而不需要暴露該對象的內部表現。
2.2 迭代器模式解決了什么問題?
迭代器模式可以把迭代的過程從業務邏輯中分離出來,迭代器將使用者和目標對象隔離開來,即使不了解對象的內部構造,也可以通過迭代器提供的方法順序訪問其每個元素。
2.3 RxJS的Observable中迭代器模式實現源碼
在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還可以提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。
export interface Observer<T> { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }
// 結合迭代器Iterator進行理解:
next() => Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是通過調用 iterator.next() 來拿到值,是一種 pull 的形式。
complete() => 當不再有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則需要在 next 的返回結果中,當返回元素 done 為 true 時,則表示 complete。
error() => 當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則需要進行 try catch 包裹來處理可能出現的錯誤。
2.4 基於ES6實現Iterator
"use strict"; class Iterator { index = 0; list = []; constructor(container){ this.list = container.list; this.index = 0; } next() { if(this.hasNext()){ return { value: this.list[this.index++], done: false } } return {value: null,done: true} } hasNext() { if(this.index >= this.list.length){ return false; } return true; } } class Container { list = []; constructor(list) { this.list = list; } getIterator() { return new Iterator(this); } } let container = new Container([1,2,3,4,5]); let iter = container.getIterator(); console.log(iter.next()); console.log(iter.next()); console.log(iter.next()); console.log(iter.next()); console.log(iter.next()); console.log(iter.next()); console.log(iter.next()); console.log(iter.next());
執行結果:
通過上邊的示例代碼我們可以得知,我們不了解對象的內部構造,但是可以通過調用迭代器提供的 next() 方法就能順序訪問其每個元素。
3.那么基於觀察者模式+迭代器模式的組合是什么?
Observable 與 Observer實現觀察者 + 迭代器模式,數據的逐漸傳遞,傳遞與影響其實就是流的表現。RxJS 提供 of 的方法來自定義創建一個 Observable,可以使用 next 來發出流。
import { of } from 'rxjs'; of(10, 20, 30) .subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: 10' // 'next: 20' // 'next: 30'
以上全部就是對RxJS中Observable運用的設計的模式的分析,參考來源:
1. RxJS -API List https://rxjs.dev/api
2. 從觀察者模式到迭代器模式系統講解 RxJS Observable(一) https://www.ucloud.cn/yun/104556.html
3. Rx.js實現原理淺析 https://www.cnblogs.com/tangzhirong/p/7424777.html