RxJS快速入門


內容導航

RxJS是什么

RxJS 是一個庫,它通過使用 observable 序列來編寫異步和基於事件的程序。它提供了一個核心類型 Observable,附屬類型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 啟發的操作符 (map、filter、reduce、every, 等等),這些數組操作符可以把異步事件作為集合來處理。

可以把 RxJS 當做是用來處理事件的 Lodash

ReactiveX 結合了 觀察者模式迭代器模式使用集合的函數式編程,以滿足以一種理想方式來管理事件序列所需要的一切。

RxJS的主要成員

  • Observable (可觀察對象): 表示一個概念,這個概念是一個可調用的未來值或事件的集合。
  • Observer (觀察者): 一個回調函數的集合,它知道如何去監聽由 Observable 提供的值。
  • Subscription (訂閱): 表示 Observable 的執行,主要用於取消 Observable 的執行。
  • Operators (操作符): 采用函數式編程風格的純函數 (pure function),使用像 mapfilterconcatflatMap 等這樣的操作符來處理集合。
  • Subject (主體): 相當於 EventEmitter,並且是將值或事件多路推送給多個 Observer 的唯一方式。
  • Schedulers (調度器): 用來控制並發並且是中央集權的調度員,允許我們在發生計算時進行協調,例如 setTimeoutrequestAnimationFrame 或其他。

Observable (可觀察對象)

RxJS 是基於觀察者模式和迭代器模式以函數式編程思維來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 作為被觀察者,是一個值或事件的流集合;而 Observer 則作為觀察者,根據 Observables 進行處理。Observables 是多個值的惰性推送集合。

  • of():用於創建簡單的Observable,該Observable只發出給定的參數,在發送完這些參數后發出完成通知.
  • from():從一個數組、類數組對象、promise、迭代器對象或者類Observable對象創建一個Observable.
  • fromEvent(),:把event轉換成Observable.
  • range():在指定起始值返回指定數量數字.
  • interval():基於給定時間間隔發出數字序列。返回一個發出無限自增的序列整數,可以選擇固定的時間間隔進行發送。
  • timer():創建一個Observable,該Observable在初始延時之后開始發送並且在每個時間周期后發出自增的數字

創建 Observable

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';

	const Observable1 = new Observable(subscriber => {
    try{
          subscriber.next(1);
          subscriber.next(2);
          subscriber.next(3);
          setTimeout(() => {
            subscriber.next(4);
            subscriber.complete();
          }, 1000);
        } catch (err) {
        	subscriber.error(err);	//傳遞一個錯誤對象,如果捕捉到異常的話。
    	}
    });
    const Observable2 = from([
      { name: 'Dave', age: 34, salary: 2000 },
      { name: 'Nick', age: 37, salary: 32000 },
      { name: 'Howie', age: 40, salary: 26000 },
      { name: 'Brian', age: 40, salary: 30000 },
      { name: 'Kevin', age: 47, salary: 24000 },
    ]);
	const Observable3 = of("Dave","Nick");//把所有參數組合到數組,逐個提供給消費者
	const Observable4 = range(1,10);
	const Observable5 = interval(3000);//從零開始每3000毫秒自增並提供給消費者
	const Observable6 = timer(3000,1000);//等待3000毫秒后,從零開始每1000毫秒自增並提供給消費者

訂閱 Observables

因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源。

當調用了 observable.subscribe ,觀察者會被附加到新創建的 Observable 執行中。這個調用還返回一個對象,即 Subscription (訂閱):

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
	const observable1 = range(1,10);
    observable1.subscribe(
          num => {
            console.log(num);
          },
          err => console.log(err),
          () => console.log("Streaming is over.")
        );

執行 Observables

Observable 執行可以傳遞三種類型的值:

  • "Next" 通知: 發送一個值,比如數字、字符串、對象,等等。
  • "Error" 通知: 發送一個 JavaScript 錯誤 或 異常。
  • "Complete" 通知: 不再發送任何值。

"Next" 通知是最重要,也是最常見的類型:它們表示傳遞給觀察者的實際數據。"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,並且只會執行其中的一個。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';

	const observable = new Observable(subscriber => {
    try{
          subscriber.next(1);
          subscriber.next(2);
          subscriber.next(3);
          subscriber.complete();
          subscriber.next(4); // 因為違反規約,所以不會發送
        } catch (err) {
        	subscriber.error(err);	//傳遞一個錯誤對象,如果捕捉到異常的話。
    	}
    });

清理 Observable 執行

因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源

當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要調用 unsubscribe() 方法就可以取消執行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
    const observable = new Observable(subscriber => {
      let intervalID = setInterval(() => {
        subscriber.next('hi');
      }, 1000);
      // 提供取消和清理 interval 資源的方法
      return function unsubscribe() {
        clearInterval(intervalID);
      };
    });
    let subscription = observable.subscribe(x => console.log(x));
    subscription.unsubscribe();

Observer (觀察者)

觀察者是由 Observable 發送的值的消費者。觀察者只是一組回調函數的集合,每個回調函數對應一種 Observable 發送的通知類型:nexterrorcomplete 。下面的示例是一個典型的觀察者對象:

觀察者只是有三個回調函數的對象,每個回調函數對應一種 Observable 發送的通知類型。

observable.subscribe(
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
);

Subscription (訂閱)

Subscription 是表示可清理資源的對象,通常是 Observable 的執行。Subscription 有一個重要的方法,即 unsubscribe,它不需要任何參數,只是用來清理由 Subscription 占用的資源。在上一個版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理對象)。

Subscription 基本上只有一個 unsubscribe() 函數,這個函數用來釋放資源或去取消 Observable 執行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
	var observable1 = interval(1000);
    var subscription1 = observable1.subscribe(x => console.log(x));
    // 稍后:
    // 這會取消正在進行中的 Observable 執行
    // Observable 執行是通過使用觀察者調用 subscribe 方法啟動的
    subscription1.unsubscribe();

    var observable2 = interval(400);
    var observable3 = interval(300);
    var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
    var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
    subscription2.add(childSubscription);
    setTimeout(() => {
      // subscription 和 childSubscription 都會取消訂閱
      subscription2.unsubscribe();
    }, 1000);

Subject (主體)

RxJS Subject 是一種特殊類型的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行)。

Subject 像是 Observable,但是可以多播給多個觀察者。Subject 還像是 EventEmitters,維護着多個監聽器的注冊表。

還有一些特殊類型的 Subject:BehaviorSubjectReplaySubjectAsyncSubject

每個 Subject 都是 Observable 。 - 對於 Subject,你可以提供一個觀察者並使用 subscribe 方法,就可以開始正常接收值。從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject 。

在 Subject 的內部,subscribe 不會調用發送值的新執行。它只是將給定的觀察者注冊到觀察者列表中,類似於其他庫或語言中的 addListener 的工作方式。

每個 Subject 都是觀察者。 - Subject 是一個有如下方法的對象: next(v)error(e)complete() 。要給 Subject 提供新值,只要調用 next(theValue),它會將值多播給已注冊監聽該 Subject 的觀察者們。

import { Subject,from } from 'rxjs';
	//我們為 Subject 添加了兩個觀察者,然后給 Subject 提供一些值
 	var subject1 = new Subject();
    subject1.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject1.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    subject1.next(1);
    subject1.next(2);
    //因為 Subject 是觀察者,這也就在意味着你可以把 Subject 作為參數傳給任何 Observable 的 subscribe 方法
    var subject2 =new Subject();
    subject2.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject2.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    var observable = from([1, 2, 3]);
    observable.subscribe(subject2); // 你可以提供一個 Subject 進行訂閱

多播的 Observables

“多播 Observable” 通過 Subject 來發送通知,這個 Subject 可能有多個訂閱者,然而普通的 “單播 Observable” 只發送通知給單個觀察者。

多播 Observable 在底層是通過使用 Subject 使得多個觀察者可以看見同一個 Observable 執行。

在底層,這就是 multicast 操作符的工作原理:觀察者訂閱一個基礎的 Subject,然后 Subject 訂閱源 Observable 。

import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators'; 	
    const source = timer(1000, 2500).pipe(take(5));
    const subject = new Subject();
    subject.subscribe({
      next: (v) => console.log('observerC: ' + v)
    });
    subject.subscribe({
      next: (v) => console.log('observerD: ' + v)
    });
    const multicasted = source.pipe(multicast(subject));
    multicasted.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    multicasted.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
	source.subscribe(subject);

BehaviorSubject

Subject 的其中一個變體就是 BehaviorSubject,它有一個“當前值”的概念。它保存了發送給消費者的最新值。並且當有新的觀察者訂閱時,會立即從 BehaviorSubject 那接收到“當前值”。

BehaviorSubjects 適合用來表示“隨時間推移的值”。舉例來說,生日的流是一個 Subject,但年齡的流應該是一個 BehaviorSubject 。

import { BehaviorSubject } from 'rxjs';
	//BehaviorSubject 使用值0進行初始化,當第一個觀察者訂閱時會得到0。第二個觀察者訂閱時會得到值2,盡管它是在值2發送之后訂閱的。
	const subject = new BehaviorSubject(0); // 0是初始值
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });

    subject.next(1);
    subject.next(2);

    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });

    subject.next(3);

ReplaySubject

ReplaySubject 類似於 BehaviorSubject,它可以發送舊值給新的訂閱者,但它還可以記錄 Observable 執行的一部分。

ReplaySubject 記錄 Observable 執行中的多個值並將其回放給新的訂閱者。

除了緩沖數量,你還可以指定 window time (以毫秒為單位)來確定多久之前的值可以記錄。

import { ReplaySubject } from 'rxjs';
	const subject = new ReplaySubject(3); // 為新的訂閱者緩沖最后3個值
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    subject.next(5);

	//我們緩存數量100,但 window time 參數只設置了120毫秒
    const subject = new ReplaySubject(100, 120 /* windowTime */);

    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    let i = 1;
    setInterval(() => subject.next(i++), 200);
    setTimeout(() => {
      subject.subscribe({
        next: (v) => console.log('observerB: ' + v)
      });
    }, 1000);	

AsyncSubject

AsyncSubject 是另一個 Subject 變體,只有當 Observable 執行完成時(執行 complete()),它才會將執行的最后一個值發送給觀察者。

AsyncSubject 和 last() 操作符類似,因為它也是等待 complete 通知,以發送一個單個值。

import { AsyncSubject } from 'rxjs'; 
   const subject = new AsyncSubject();

    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });

    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);

    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });

    subject.next(5);
    subject.complete();

Scheduler (調度器)

調度器控制着何時啟動 subscription 和何時發送通知。它由三部分組成:

  • 調度器是一種數據結構。 它知道如何根據優先級或其他標准來存儲任務和將任務進行排序。
  • 調度器是執行上下文。 它表示在何時何地執行任務(舉例來說,立即的,或另一種回調函數機制(比如 setTimeout 或 process.nextTick),或動畫幀)。
  • 調度器有一個(虛擬的)時鍾。 調度器功能通過它的 getter 方法 now() 提供了“時間”的概念。在具體調度器上安排的任務將嚴格遵循該時鍾所表示的時間。

調度器可以讓你規定 Observable 在什么樣的執行上下文中發送通知給它的觀察者。

import { asyncScheduler, Observable } from 'rxjs';
	//我們使用普通的 Observable ,它同步地發出值`1`、`2`、`3`,並使用操作符 `observeOn` 來指定 `async` 調度器發送這些值。
	const observable = new Observable(subscriber => {
      subscriber.next(1);
      subscriber.next(2);
      subscriber.next(3);
      subscriber.complete();
    })
      .pipe(
        observeOn(asyncScheduler)
      );

    console.log('just before subscribe');
    observable.subscribe({
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    });
    console.log('just after subscribe');
    //你會發現"just after subscribe"在"got value..."之前就出現了
    //just before subscribe
    //just after subscribe
    //got value 1
    //got value 2
    //got value 3
    //done
    

調度器類型

async 調度器是 RxJS 提供的內置調度器中的一個。可以通過使用 Scheduler 對象的靜態屬性創建並返回其中的每種類型的調度器。

調度器 目的
null 不傳遞任何調度器的話,會以同步遞歸的方式發送通知。用於定時操作或尾遞歸操作。
queueScheduler 當前事件幀中的隊列調度(蹦床調度器)。用於迭代操作。
asapScheduler 微任務的隊列調度,它使用可用的最快速的傳輸機制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用於異步轉換。
asyncScheduler 使用 setInterval 的調度。用於基於時間的操作符。
animationFrameScheduler 計划將在下一次瀏覽器內容重新繪制之前發生的任務。 可用於創建流暢的瀏覽器動畫。

Pipeable(操作符)

操作符就是函數,管道操作符本質上是一個純函數,它將一個Observable作為輸入並生成另一個Observable作為輸出。訂閱輸出Observable也將訂閱輸入Observable。 操作符有兩種:

管道操作符是一個將Observable作為其輸入並返回另一個Observable的函數。這是一個純粹的操作:以前的Observable保持不變。

  1. 管道操作符是可以使用語法observableInstance.pipe(operator())傳遞給Observable的類型。 這些包括filter()mergeMap()。 調用時,它們不會更改現有的Observable實例。 相反,它們返回一個新的Observable,其訂閱邏輯基於第一個Observable。

  2. 創建運算符是另一種運算符,可以稱為獨立函數來創建新的Observable。例如:of(1,2,3)創建一個observable ,該對象將依次發射1、2和3。創建運算符將在后面的部分中詳細討論。

obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

常用的操作符

finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:

返回原始Observable,但在Observable完成或發生錯誤終止時將調用指定的函數。

創建操作符

連接創建操作符

These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.

轉換操作符

過濾操作符

組合操作符

Also see the Join Creation Operators section above.

多播操作符

錯誤處理操作符

工具操作符

條件和布爾操作符

數學和聚合操作符


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM