RxJS——Operators


RxJS 的操作符(operators)是最有用的,盡管 Observable 是最基本的。操作符最基本的部分(pieces)就是以申明的方式允許復雜的異步代碼組合簡化。

什么是操作符?

操作符是函數。這里有兩種操作符:

管道操作符(Pipeable Operators)是可以通過使用 observableInstance.pipe(operator()) 管道傳輸到 Observable 對象。這些包括,filter(...),mergeMap(...)。當調用他們時,它們不會改變已存在的 Observable 實例。相反,他們會返回一個新的 Observable,它在第一個 Observable 的基礎上的邏輯的 Observable。

管道操作符是一個函數,它有一個 Observable 作為輸入參數並且返回另一個 Observable。這是純函數(函數式)的操作:上一個 Observable 保留未修改。

管道操作符本質上就是一個純函數,它可以作為獨立函數被調用來創建一個新的 Observable。例如:of(1, 2, 3) 創建了一個 Observable,它會發送 1,2,3 一個接一個的。在下一接將詳細討論操作符的創建。

例如,操作符 map 被調用跟數組的 map 是類似的。就比如 [1, 2 , 3].map(x => x * x) 會返回 [1, 4, 9],被創建的 Observable 就像下面:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1
// value: 4
// value: 9

將會發送 1,4,9。其他操作符的用法 first:

import { of } from 'rxjs';
import { first } from 'rxjs/operators';

first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1

注意 map 邏輯部分必須要動態構造,因為它必須被賦予映射函數來做。通過對比,first 是一個常數,但是還是得動態構造。作為一般實踐,所有的操作符都要被構造,無論他們是否需要參數。

管道(Piping)

管道操作符是函數,所以他們能像原生函數使用:op()(obs) — 但是實際上,他們都互相交織在一起,很快就會演變得不利於閱讀的:op4()(op3()(op2()(op1()(obs))))。所以基於這個原因,Observables 有一個方法 .pipe() 可以調用,它能完成相同的事,並且更易讀的:

obs.pipe(
 op1(),
 op2(),
 op3(),
 op4(),
)

作為一個風格,op()(obs) 永不這樣使用,甚至是只有一個操作符時,obs.pipe(op()) 也是首選的。

創建操作符

什么是創建操作符?為了區別管道操作符,創建操作符是一個函數,能被用來創建通過一些共同的預定義的行為或者關聯其他的 Observables 的 Observables 。

一個典型的創建操作符例子是 interval 函數。它要求一個數字類型作為輸入參數,並返回 Observable 作為輸出:

import { interval } from 'rxjs';

const observable = interval(1000 /* number of milliseconds */);

這里查看所有的靜態創建操作符。

高階 Observables

Observables 大體上是有順序的發送值(數字或字符串),但是也有例外,處理 Observables 的 Observables 是有必要的,叫做高階 Observables。例如,想象你有一個 Observable,它發送文件的地址的字符串,這個字符串恰好是想要看到的。代碼如下:

const fileObservable = urlObservable.pipe(
	map(url => http.get(url)),
);

http.get() 返回每個 URL 的 Observable 對象(可能是字符串或字符串數組)。現在你有一個 Observables 對象了,一個高階 Observable。

但是這個高階 Observable 是怎么工作的呢?通常通過平坦化:通過某種方式將高階 Observable 轉換成一個普通的 Observable。例如:

const fileObservable = urlObservable.pipe(
	map(url => http.get(url)),
    concatAll(),
);

concatAll() 操作訂閱了每個內部的 Observable,它會帶出來一個 Observable 對象,並復制所有的發送的值知道Observable 完成,然后繼續執行下一個。所有的值都是以這種方式串起來的。其他的扁平用處的操作(也叫鏈接操作符(join operators))

  • mergeAll() —— 訂閱每個內部到達的 Observable 對象,然后發送它到達的每一個值
  • switchAll() —— 當內部的 Observable 第一個到達時訂閱(Observable),然后每個到達時發送值,但是當下個內部的 Observable 到達時,上一個訂閱取消,並訂閱這個新的 Observable。
  • exhaust() —— 當 Observable 到達時訂閱第一個,每個在它到達時發送值,在第一個完成之前,丟棄所有新到大的 Observable 對象,完成之后等待下一個 Observable。

就像組合了數組庫中許多的 map()flat()(或者是 flatten()),他們映射就等價於 RxJS 中的 扁平操作符 concatMap(),mergeMap()switchMap() 以及 exhaustMap()

彈珠圖標(Marble diagrams)

為了解釋操作符是如何工作的,本文還不足以描述。太多的操作符相關聯了,他們實例化可能以不同的方式延遲,舉例,節流,降低值發送(頻率)。為了更好的描述,圖標經常是最好的工具。彈珠圖表是可視化的表示 operator 是如何工作的,也包括了 Observable 輸入,operator 以及它們的參數,還有 Observable 的輸出。

在彈珠圖中,時間軸向右移動,並且圖描述了值 “彈珠” 在 Observable 運行的時候是如何發送的。

根據下面你能看到彈珠圖的解析。

貫穿文檔節點,我們普遍使用 marble diagrams 來解釋 operator 是如何工作的。它們在其他的方面也是很有用的,比如在白板甚至是在我們的單元測試中(作為 ASCII 圖表)。

按照圖標的順序,我解釋下上面的意思:

  1. 這個從左到右的時間軸是表示輸入的 Observable 的執行過程
  2. 4,6,a,8 這些值是 Observable 要發送的值
  3. 緊接着的 “|” 是表示 “完成” 通知,表明這個 Observable 已經成功完成。
  4. 中間的盒子表明操作符,它傳遞上面輸入的 Observable 生成一個 Observable 作為輸出(下面一條線)方框內的文本顯示了轉換的性質
  5. 調用操作符輸出 Observable
  6. “X” 表示這個輸出的 Observable 發送錯誤,表明異常終止。至此之后不會有任何值發送。

操作符的分類

有很多操作是使用目的是不同的,他們分這幾類:創建,轉換,過濾,聯合,多播,錯誤處理,公共等等。下面的列表你會發現所有被分類的操作符。

為了完成的概述,你可以看這個 API 文檔

創建操作符

鏈接創建操作

轉化操作符

過濾操作符

連接操作符

你也可以看 連接創建操作符 一節

多播操作符

異常處理操作符

公共操作符

條件布爾操作符

數學和聚合操作符

創建自定義的 Observables

使用 pipe() 函數生成新的 Observable

如果在你的代碼里有常用的操作符序列,用 pipe() 函數來提取到新的操作符。甚至這個序列是不常見的,中斷它到單個的操作符,這樣能提高可讀性。

舉個例子,你可以生成一個函數,這個函數舍棄奇數的值並給偶數值加倍:

import { pipe } from 'rxjs';
import { filter, map } from 'rxjs';

function discardOddDoubleEven()
{
    return pipe(
    	filter(v => !(v % 2)),
        map(v => v +v),
    );
}

pipe() 函數是這樣類比的,但是又與 .pipe() 不是完全一樣)

從零創建新的操作符

這個非常復雜,如果你一定要寫一個不能通過現有的操作符組合而成的操作符(這是極少情況),你可以使用 Observable 構造函數從零寫一個操作符,就像下面這樣:

import { Observable } from 'rxjs';

function delay(delayInMillis)
{
    return (observable) => new Observable(observer => {
        //這個函數將會沒當 Observable 訂閱的時候調用
        const allTimerIDs = new Set();
        const subscription = observable.subscribe({
            next(value){
                const timerID = setTimeout(() => {
                    observer.next(value);
                    allTimeIDs.delete(timerID);
                }, delayInMillis);
                allTimerIDs.add(timerID);
            },
            error(err) {
                observer.error(err);
            },
            complete() {
                observer.complete();
            }
        });
        //返回值是一個卸載函數
        //它當新的 Observable 被取消訂閱時調用
        return () => {
            subscription.unsubscribe();
            allTimerIDs.forEach(timerID => {
                clearTimeout(timerID)
            });
        }
    });
}

你必須要注意以下幾點

  1. 實現所有的 Observer 函數,next()error()complete() 當訂閱這個輸入的 Observer 的時候。
  2. 實現一個卸載函數,它負責清理,當 Observable 完成的時候(在上面的例子是通過取消訂閱以及清除時間定時器函數)
  3. 返回一個從 Observable 構造傳遞過來的函數的卸載函數

當然,這個只是一個例子;delay() 操作符已經存在了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM