RxJS-Observable設計思想中運用的設計模式


RxJS 是一個庫,它通過使用Observable序列來編寫異步和基於事件的程序。其中Observable的設計主要運用到的設計模式有觀察者模式(Observer pattern )和迭代器模式(Iterator pattern)。

1.觀察者模式(Observer pattern)

1.1 什么是觀察者模式?

觀察者模式又叫發布訂閱模式(Publish/Subscribe),它是一種一對多的關系,讓多個觀察者(Observer)同時監聽一個主題(Subject),這個主題也就是被觀察者(Observable),被觀察者的狀態發生變化時就會通知所有的觀察者,使得它們能夠接收到更新的內容。 

1.2 觀察者模式主要是為了解決什么問題?

1)定義對象之間的一對多依賴關系而不使對象緊密耦合。
2)確保當一個對象改變狀態時,自動更新開放數量的從屬對象。
3)一個對象應該可以通知開放式數量的其他對象。

1.3 RxJS的Observable中觀察模式實現源碼 ("rxjs": "~6.4.0" (TypeScript))

Observable 與 Observer 之間的訂閱發布關系(觀察者模式) 如下:
訂閱:Observer 通過 Observable 提供的 subscribe() 方法訂閱 Observable。
發布:Observable 通過回調 next 方法向 Observer 發布事件。

1.3.1 Observable源碼

// 基於源代碼有刪減
/**
 *表示任意時間內的任意一組值。這是RxJS最基本的構件
 *
 * @class Observable<T>
 */
export declare class Observable<T> implements Subscribable<T> {
    
    /**
     * @constructor
     * @param {Function} 訂閱當被初始訂閱時被調用的函數。這個函數有一個訂閱者,它有新的值
     *可以'next',也可以調用'error'方法來引發錯誤,或者'complete'可以被調用來通知成功完成。
     */
    constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);
    /**
     * 通過調用Observable構造函數創建一個新的cold Observable
     * @static true
     * @owner Observable
     * @method create
     * @param {Function} subscribe? 要傳遞給Observable構造函數的訂閱函數
     * @return {Observable} a new cold observable
     * @nocollapse
     * @deprecated use new Observable() instead
     */
    static create: Function;
    /**
     * 創建一個新的可觀察對象,將這個可觀察對象作為源,並將傳遞的操作符定義為新可觀察對象的操作符。
     * @method lift
     * @param {Operator} operator the operator defining the operation to take on the observable
     * @return {Observable} a new observable with the Operator applied
     */
    lift<R>(operator: Operator<T, R>): Observable<R>;
    
    /** @deprecated Use an observer instead of a complete callback */
    subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
    
    /** @deprecated This is an internal implementation detail, do not use. */
    _trySubscribe(sink: Subscriber<T>): TeardownLogic;
    /**
     * @method forEach
     * @param {Function} next a handler for each value emitted by the observable
     * @param {PromiseConstructor} [promiseCtor] a constructor function used to instantiate the Promise
     * @return {Promise} a promise that either resolves on observable completion or
     *  rejects with the handled error
     */
    forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void>;
    /** @internal This is an internal implementation detail, do not use. */
    _subscribe(subscriber: Subscriber<any>): TeardownLogic;
    //省略重載的方法
    pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
    //省略重載的方法
    toPromise<T>(this: Observable<T>): Promise<T>;
   
}
// 基於源代碼有刪減
export interface Subscribable<T> {
    
    /** @deprecated Use an observer instead of a complete callback */
    subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable;
    //省略重載的方法
}

1.3.2  Subject

// 基於源代碼有刪減
/*
* * Subject 是一個特殊類型的可觀察對象,允許值被多播給許多觀察者。主題就像事件發射器。 * * 每一個Subject都是可觀察的(Observable)和觀察者(Observer)。您可以訂閱一個主題,還可以調用next來獲取提要值以及error和complete。 * * @class Subject<T> */ export declare class Subject<T> extends Observable<T> implements SubscriptionLike { observers: Observer<T>[]; closed: boolean; isStopped: boolean; hasError: boolean; thrownError: any; constructor(); /**@nocollapse * @deprecated use new Subject() instead */ static create: Function; lift<R>(operator: Operator<T, R>): Observable<R>; next(value?: T): void; error(err: any): void; complete(): void; unsubscribe(): void; /** @deprecated This is an internal implementation detail, do not use. */ _trySubscribe(subscriber: Subscriber<T>): TeardownLogic; /** @deprecated This is an internal implementation detail, do not use. */ _subscribe(subscriber: Subscriber<T>): Subscription; /** * 創建一個以這個主題為源的新觀察對象。您可以這樣做來創建主題的自定義observer邏輯,並對使用Observable的代碼隱藏它。code that uses the Observable. * @return {Observable} Observable that the Subject casts to */ asObservable(): Observable<T>; } /** SUBSCRIPTION INTERFACES */ export interface Unsubscribable { unsubscribe(): void; } export interface SubscriptionLike extends Unsubscribable { unsubscribe(): void; readonly closed: boolean; }

1.3.3 Observer

export interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

Observable 與 Observer 之間的訂閱發布關系(觀察者模式) 如下:
訂閱:Observer 通過 Observable 提供的 subscribe() 方法訂閱 Observable。
發布:Observable 通過回調 next 方法向 Observer 發布事件。 

查看Subject源代碼,我們發現Subject既可以作為Observable又可以作為Observer, 在angular項目里面,組件之間的異步通訊可以使用這一特性去實現。

1.4 觀察者模式一些優秀的框架的應用場景:Zookeeper事件通知節點、Spring事件驅動、消息訂閱通知、分布式配置中心等等。

2. 迭代器模式(Iterator pattern)

2.1 什么是迭代器模式?

 迭代器(Iterator)模式又叫游標(Sursor)模式,迭代器具有 next 方法,可以順序訪問一個聚合對象中的各個元素,而不需要暴露該對象的內部表現。

2.2 迭代器模式解決了什么問題?

 迭代器模式可以把迭代的過程從業務邏輯中分離出來,迭代器將使用者和目標對象隔離開來,即使不了解對象的內部構造,也可以通過迭代器提供的方法順序訪問其每個元素。

 2.3 RxJS的Observable中迭代器模式實現源碼

在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還可以提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。

export interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

// 結合迭代器Iterator進行理解:
next() => Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是通過調用 iterator.next() 來拿到值,是一種 pull 的形式。
complete() => 當不再有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則需要在 next 的返回結果中,當返回元素 done 為 true 時,則表示 complete。
error() => 當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則需要進行 try catch 包裹來處理可能出現的錯誤。

 2.4 基於ES6實現Iterator 

"use strict";

class Iterator {
    index = 0;
    list = [];
    constructor(container){
      this.list = container.list;
      this.index = 0;
    }
  
    next() {
      if(this.hasNext()){
        return {
          value: this.list[this.index++],
          done: false
        }
      }
      return {value: null,done: true}
    }
  
    hasNext() {
      if(this.index >= this.list.length){
        return false;
      }
      return true;
    }
  }
  
  
  class Container {
    list = [];
    constructor(list) {
      this.list = list;
    }
  
    getIterator() {
      return new Iterator(this);
    }
  }

  let container = new Container([1,2,3,4,5]);
  let iter = container.getIterator();

  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());

執行結果:

 通過上邊的示例代碼我們可以得知,我們不了解對象的內部構造,但是可以通過調用迭代器提供的 next() 方法就能順序訪問其每個元素。

3.那么基於觀察者模式+迭代器模式的組合是什么?

Observable 與 Observer實現觀察者 + 迭代器模式,數據的逐漸傳遞,傳遞與影響其實就是流的表現。RxJS 提供 of 的方法來自定義創建一個 Observable,可以使用 next 來發出流。

import { of } from 'rxjs';

of(10, 20, 30)
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
// result:
// 'next: 10'
// 'next: 20'
// 'next: 30'

以上全部就是對RxJS中Observable運用的設計的模式的分析,參考來源:

1. RxJS -API List https://rxjs.dev/api

2. 從觀察者模式到迭代器模式系統講解 RxJS Observable(一) https://www.ucloud.cn/yun/104556.html

3. Rx.js實現原理淺析  https://www.cnblogs.com/tangzhirong/p/7424777.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM