我們日常發現的一些最常用的 RxJs 操作符是 RxJs 高階映射操作符:switchMap、mergeMap、concatMap 和exhaustMap。
例如,我們程序中的大部分網絡調用都將使用這些運算符之一完成,因此熟悉它們對於編寫幾乎所有反應式程序至關重要。
知道在給定情況下使用哪個運算符(以及為什么)可能有點令人困惑,我們經常想知道這些運算符是如何真正工作的,以及為什么它們會這樣命名。
這些運算符可能看起來不相關,但我們真的很想一口氣學習它們,因為選擇錯誤的運算符可能會意外地導致我們程序中的微妙問題。
Why are the mapping operators a bit confusing?
這樣做是有原因的:為了理解這些操作符,我們首先需要了解每個內部使用的 Observable 組合策略。
與其試圖自己理解switchMap,不如先了解什么是Observable切換; 我們需要先學習 Observable 連接等,而不是直接深入 concatMap。
這就是我們在這篇文章中要做的事情,我們將按邏輯順序學習 concat、merge、switch 和exhaust 策略及其對應的映射運算符:concatMap、mergeMap、switchMap 和exhaustMap。
我們將結合使用 marble 圖和一些實際示例(包括運行代碼)來解釋這些概念。
最后,您將確切地知道這些映射運算符中的每一個是如何工作的,何時使用,為什么使用,以及它們名稱的原因。
The RxJs Map Operator
讓我們從頭開始,介紹這些映射運算符的一般作用。
正如運算符的名稱所暗示的那樣,他們正在做某種映射:但究竟是什么被映射了? 我們先來看看 RxJs Map 操作符的彈珠圖:
How the base Map Operator works
使用 map 運算符,我們可以獲取輸入流(值為 1、2、3),並從中創建派生的映射輸出流(值為 10、20、30)。
底部輸出流的值是通過獲取輸入流的值並將它們應用到一個函數來獲得的:這個函數只是將這些值乘以 10。
所以 map 操作符就是映射輸入 observable 的值。 以下是我們如何使用它來處理 HTTP 請求的示例:
const http$ : Observable<Course[]> = this.http.get('/api/courses');
http$
.pipe(
tap(() => console.log('HTTP request executed')),
map(res => Object.values(res['payload']))
)
.subscribe(
courses => console.log("courses", courses)
);
在這個例子中,我們正在創建一個 HTTP observable 來進行后端調用,我們正在訂閱它。 observable 將發出后端 HTTP 響應的值,它是一個 JSON 對象。
在這種情況下,HTTP 響應將數據包裝在有效負載屬性中,因此為了獲取數據,我們應用了 RxJs 映射運算符。 然后映射函數將映射 JSON 響應負載並提取負載屬性的值。
既然我們已經回顧了基本映射的工作原理,現在讓我們來談談高階映射。
What is Higher-Order Observable Mapping?
在高階映射中,我們不是將像 1 這樣的普通值映射到另一個像 10 這樣的值,而是將一個值映射到一個 Observable 中!
結果是一個高階的 Observable。 它只是一個 Observable,但它的值本身也是 Observable,我們可以單獨訂閱。
這聽起來可能有些牽強,但實際上,這種類型的映射一直在發生。 讓我們舉一個這種類型映射的實際例子。 假設例如,我們有一個 Angular Reactive Form,它通過 Observable 隨時間發出有效的表單值:
@Component({
selector: 'course-dialog',
templateUrl: './course-dialog.component.html'
})
export class CourseDialogComponent implements AfterViewInit {
form: FormGroup;
course:Course;
@ViewChild('saveButton') saveButton: ElementRef;
constructor(
private fb: FormBuilder,
private dialogRef: MatDialogRef<CourseDialogComponent>,
@Inject(MAT_DIALOG_DATA) course:Course) {
this.course = course;
this.form = fb.group({
description: [course.description,
Validators.required],
category: [course.category, Validators.required],
releasedAt: [moment(), Validators.required],
longDescription: [course.longDescription,
Validators.required]
});
}
}
Reactive Form 提供了一個 Observable this.form.valueChanges,它在用戶與表單交互時發出最新的表單值。 這將是我們的源 Observable。
我們想要做的是在這些值隨着時間的推移發出時至少保存其中一些值,以實現表單草稿預保存功能。 這樣,隨着用戶填寫表單,數據會逐漸保存,從而避免由於意外重新加載而丟失整個表單數據。
Why Higher-Order Observables?
為了實現表單草稿保存功能,我們需要獲取表單值,然后創建第二個執行后端保存的 HTTP observable,然后訂閱它。
我們可以嘗試手動完成所有這些,但是我們會陷入嵌套的訂閱反模式:
this.form.valueChanges
.subscribe(
formValue => {
const httpPost$ =
this.http.put(`/api/course/${courseId}`, formValue);
httpPost$.subscribe(
res => ... handle successful save ...
err => ... handle save error ...
);
}
);
正如我們所見,這會導致我們的代碼很快在多個級別嵌套,這是我們在使用 RxJs 時首先要避免的問題之一。
讓我們稱這個新的 httpPost$ Observable 為內部 Observable,因為它是在內部嵌套代碼塊中創建的。
Avoiding nested subscriptions
我們希望以更方便的方式完成所有這些過程:我們希望獲取表單值,並將其映射到保存 Observable 中。 這將有效地創建一個高階 Observable,其中每個值對應一個保存請求。
然后我們希望透明地訂閱這些網絡 Observable 中的每一個,並且一次性直接接收網絡響應,以避免任何嵌套。
如果我們有某種更高階的 RxJs 映射運算符,我們就可以做到這一切! 那為什么我們需要四個不同的操作符呢?
為了理解這一點,想象一下如果 valueChanges observable 快速連續發出多個表單值並且保存操作需要一些時間來完成,會發生什么情況:
- 我們應該等待一個保存請求完成后再進行另一次保存嗎?
- 我們應該並行進行多次保存嗎?
- 我們應該取消正在進行的保存並開始新的保存嗎?
- 當一個已經在進行中時,我們應該忽略新的保存嘗試嗎?
在探索這些用例中的每一個之前,讓我們回到上面的嵌套訂閱代碼。
在嵌套訂閱示例中,我們實際上是並行觸發保存操作,這不是我們想要的,因為沒有強有力的保證后端將按順序處理保存,並且最后一個有效的表單值確實是存儲在 后端。
讓我們看看如何確保僅在上一次保存完成后才完成保存請求。
Understanding Observable Concatenation
為了實現順序保存,我們將引入 Observable 連接的新概念。 在此代碼示例中,我們使用 concat() RxJs 函數連接兩個示例 observable:
const series1$ = of('a', 'b');
const series2$ = of('x', 'y');
const result$ = concat(series1$, series2$);
result$.subscribe(console.log);
在使用 of 創建函數創建了兩個 Observables series1$ 和 series2$ 之后,我們創建了第三個 result$ Observable,它是串聯 series1$ 和 series2$ 的結果。
這是該程序的控制台輸出,顯示了結果 Observable 發出的值:
a
b
x
y
如我們所見,這些值是將 series1$ 的值與 series2$ 的值連接在一起的結果。 但這里有一個問題:這個例子能工作的原因是因為這些 Observable 正在完成!!
of() 函數將創建 Observables,它發出傳遞給 of() 的值,然后在發出所有值后完成 Observables。
Observable Concatenation Marble Diagram
你注意到第一個 Observable 的值 b 后面的豎線了嗎?這標志着第一個具有值 a 和 b (series1$) 的 Observable 完成的時間點。
讓我們按照時間表逐步分解這里發生的事情:
- 兩個 Observables series1$ 和 series2$ 被傳遞給 concat() 函數
- concat() 然后將訂閱第一個 Observable series1$,但不會訂閱第二個 Observable series2$(這對於理解串聯至關重要)
- source1$ 發出值 a,該值立即反映在輸出 result$ Observable 中
- 注意 source2$ Observable 還沒有發出值,因為它還沒有被訂閱
- 然后 source1$ 將發出 b 值,該值反映在輸出中
- 然后 source1$ 將完成,只有在此之后 concat() 現在訂閱 source2$
- 然后 source2$ 值將開始反映在輸出中,直到 source2$ 完成
- 當 source2$ 完成時, result$ Observable 也將完成
- 請注意,我們可以將任意數量的 Observable 傳遞給 concat(),而不僅僅是本示例中的兩個
The key point about Observable Concatenation
正如我們所看到的,Observable 連接就是關於 Observable 的完成! 我們取第一個 Observable 並使用它的值,等待它完成,然后我們使用下一個 Observable,依此類推,直到所有 Observable 完成。
回到我們的高階 Observable 映射示例,讓我們看看串聯的概念如何幫助我們。
Using Observable Concatenation to implement sequential saves
正如我們所見,為了確保我們的表單值按順序保存,我們需要獲取每個表單值並將其映射到 httpPost$ Observable。
然后我們需要訂閱它,但我們希望在訂閱下一個 httpPost$ Observable 之前完成保存。
In order to ensure sequentiality, we need to concatenate the multiple httpPost$ Observables together!
然后我們將訂閱每個 httpPost$ 並按順序處理每個請求的結果。 最后,我們需要的是一個混合了以下內容的運算符:
-
一個高階映射操作(獲取表單值並將其轉換為 httpPost$ Observable)
-
使用 concat() 操作,將多個 httpPost$ Observables 連接在一起以確保在前一個正在進行的保存首先完成之前不會進行下一個 HTTP 保存。
我們需要的是恰當命名的 RxJs concatMap Operator,它混合了高階映射和 Observable 連接。
The RxJs concatMap Operator
代碼如下:
this.form.valueChanges
.pipe(
concatMap(formValue => this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);
正如我們所見,使用像 concatMap 這樣的高階映射運算符的第一個好處是現在我們不再有嵌套訂閱。
通過使用 concatMap,現在所有表單值都將按順序發送到后端,如 Chrome DevTools Network 選項卡中所示:
Breaking down the concatMap network log diagram
正如我們所見,只有在上一次保存完成后才會啟動一個保存 HTTP 請求。 以下是 concatMap 運算符如何確保請求始終按順序發生:
-
concatMap 正在獲取每個表單值並將其轉換為保存的 HTTP Observable,稱為內部 Observable
-
concatMap 然后訂閱內部 Observable 並將其輸出發送到結果 Observable
第二個表單值可能比在后端保存前一個表單值更快 -
如果發生這種情況,新的表單值將不會立即映射到 HTTP 請求
-
相反, concatMap 將等待先前的 HTTP Observable 完成,然后將新值映射到 HTTP Observable,訂閱它並因此觸發下一次保存
Observable Merging
將 Observable 串聯應用於一系列 HTTP 保存操作似乎是確保保存按預期順序發生的好方法。
但是在其他情況下,我們希望並行運行,而不需要等待前一個內部 Observable 完成。
為此,我們有合並 Observable 組合策略! 與 concat 不同,Merge 不會在訂閱下一個 Observable 之前等待 Observable 完成。
相反,merge 同時訂閱每個合並的 Observable,然后隨着多個值隨着時間的推移到達,它將每個源 Observable 的值輸出到組合結果 Observable。
Practical Merge Example
為了明確合並不依賴於完成,讓我們合並兩個從未完成的 Observables,因為它們是 interval Observables:
const series1$ = interval(1000).pipe(map(val => val*10));
const series2$ = interval(1000).pipe(map(val => val*100));
const result$ = merge(series1$, series2$);
result$.subscribe(console.log);
使用 interval() 創建的 Observable 將每隔一秒發出值 0、1、2 等,並且永遠不會完成。
請注意,我們對這些區間 Observable 應用了幾個 map 運算符,只是為了更容易在控制台輸出中區分它們。
以下是控制台中可見的前幾個值:
0
0
10
100
20
200
30
300
Merging and Observable Completion
正如我們所見,合並的源 Observable 的值在發出時立即顯示在結果 Observable 中。 如果合並的 Observable 之一完成,merge 將繼續發出其他 Observable 隨着時間到達的值。
請注意,如果源 Observables 完成,合並仍會以相同的方式工作。
The Merge Marble Diagram
看另一個例子:
正如我們所見,合並的源 Observables 的值立即顯示在輸出中。 直到所有合並的 Observable 完成后,結果 Observable 才會完成。
現在我們了解了合並策略,讓我們看看它如何在高階 Observable 映射的上下文中使用。
回到我們之前的表單草稿保存示例,很明顯在這種情況下我們需要 concatMap 而不是 mergeMap,因為我們不希望保存並行發生。
讓我們看看如果我們不小心選擇了 mergeMap 會發生什么:
this.form.valueChanges
.pipe(
mergeMap(formValue =>
this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);
現在假設用戶與表單交互並開始相當快地輸入數據。 在這種情況下,我們現在會在網絡日志中看到多個並行運行的保存請求:
正如我們所看到的,請求是並行發生的,在這種情況下是一個錯誤! 在高負載下,這些請求可能會被亂序處理。
Observable Switching
現在我們來談談另一個 Observable 組合策略:切換。 切換的概念更接近於合並而不是串聯,因為我們不等待任何 Observable 終止。
但是在切換時,與合並不同,如果一個新的 Observable 開始發出值,我們將在訂閱新的 Observable 之前取消訂閱之前的 Observable。
Observable 切換就是為了確保未使用的 Observables 的取消訂閱邏輯被觸發,從而可以釋放資源!
Switch Marble Diagram
注意對角線,這些不是偶然的! 在切換策略的情況下,在圖中表示高階 Observable 很重要,它是圖像的頂行。
這個高階 Observable 發出的值本身就是 Observable。
對角線從高階 Observable 頂線分叉的那一刻,是 Observable 值被 switch 發出和訂閱的那一刻。
Breaking down the switch Marble Diagram
這是這張圖中發生的事情:
-
高階 Observable 發出它的第一個內部 Observable (a-b-c-d),它被訂閱(通過 switch 策略實現)
-
第一個內部 Observable (a-b-c-d) 發出值 a 和 b,它們立即反映在輸出中
-
但隨后第二個內部 Observable (e-f-g) 被發射,這觸發了第一個內部 Observable (a-b-c-d) 的取消訂閱,這是切換的關鍵部分
-
然后第二個內部 Observable (e-f-g) 開始發出新值,這些值反映在輸出中
-
但請注意,第一個內部 Observable (a-b-c-d) 同時仍在發出新值 c 和 d
-
然而,這些后來的值沒有反映在輸出中,那是因為我們同時取消了第一個內部 Observable (a-b-c-d) 的訂閱
我們現在可以理解為什么必須以這種不尋常的方式繪制圖表,用對角線:這是因為我們需要在每個內部 Observable 被訂閱(或取消訂閱)時直觀地表示,這發生在對角線從源高階 Observable。
The RxJs switchMap Operator
然后讓我們采用切換策略並將其應用於高階映射。 假設我們有一個普通的輸入流,它發出值 1、3 和 5。
然后我們將每個值映射到一個 Observable,就像我們在 concatMap 和 mergeMap 的情況下所做的那樣,並獲得一個更高階的 Observable。
如果我們現在在發出的內部 Observable 之間切換,而不是連接或合並它們,我們最終會得到 switchMap 運算符:
Breaking down the switchMap Marble Diagram
這是該運算符的工作原理:
- 源 observable 發出值 1、3 和 5
- 然后通過應用映射函數將這些值轉換為 Observable
- 映射的內部 Observable 被 switchMap 訂閱
- 當內部 Observables 發出一個值時,該值會立即反映在輸出中
- 但是如果在前一個 Observable 有機會完成之前發出了像 5 這樣的新值,則前一個內部 Observable (30-30-30) 將被取消訂閱,並且它的值將不再反映在輸出中
- 注意上圖中紅色的 30-30-30 內部 Observable:最后 30 個值沒有發出,因為 30-30-30 內部 Observable 被取消訂閱
如我們所見,Observable 切換就是確保我們從未使用的 Observable 觸發取消訂閱邏輯。 現在讓我們看看 switchMap 的運行情況!
Search TypeAhead - switchMap Operator Example
switchMap 的一個非常常見的用例是搜索 Typeahead。 首先讓我們定義源 Observable,其值本身將觸發搜索請求。
這個源 Observable 將發出值,這些值是用戶在輸入中鍵入的搜索文本:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith('')
)
.subscribe(console.log);
此源 Observable 鏈接到用戶鍵入其搜索的輸入文本字段。 當用戶輸入單詞“Hello World”作為搜索時,這些是 searchText$ 發出的值:
Debouncing and removing duplicates from a Typeahead
請注意重復值,要么是由於使用兩個單詞之間的空格,要么是使用 Shift 鍵將字母 H 和 W 大寫。
為了避免將所有這些值作為單獨的搜索請求發送到后端,讓我們使用 debounceTime 運算符等待用戶輸入穩定:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith(''),
debounceTime(400)
)
.subscribe(console.log);
使用此運算符,如果用戶以正常速度鍵入,則 searchText$ 的輸出中現在只有一個值:Hello World
這已經比我們之前的要好得多,現在只有在穩定至少 400 毫秒時才會發出值!
但是如果用戶在考慮搜索時輸入緩慢,以至於兩個值之間需要超過 400 毫秒,那么搜索流可能如下所示:
此外,用戶可以鍵入一個值,按退格鍵並再次鍵入,這可能會導致重復的搜索值。 我們可以通過添加 distinctUntilChanged 操作符來防止重復搜索的發生。
Cancelling obsolete searches in a Typeahead
但更重要的是,我們需要一種方法來取消以前的搜索,因為新的搜索開始了。
我們在這里要做的是將每個搜索字符串轉換為后端搜索請求並訂閱它,並在兩個連續的搜索請求之間應用切換策略,如果觸發新的搜索,則取消之前的搜索。
這正是 switchMap 運算符將要做的! 這是使用它的 Typeahead 邏輯的最終實現:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith(''),
debounceTime(400),
distinctUntilChanged()
);
const lessons$: Observable<Lesson[]> = searchText$
.pipe(
switchMap(search => this.loadLessons(search))
)
.subscribe();
function loadLessons(search:string): Observable<Lesson[]> {
const params = new HttpParams().set('search', search);
return this.http.get(`/api/lessons/${coursesId}`, {params});
}
switchMap Demo with a Typeahead
現在讓我們看看 switchMap 操作符的作用! 如果用戶在搜索欄上輸入,然后猶豫並輸入其他內容,我們通常會在網絡日志中看到以下內容:
正如我們所看到的,之前的一些搜索在進行時已被取消,這很棒,因為這將釋放可用於其他事情的服務器資源。
The Exhaust Strategy
switchMap 操作符是預輸入場景的理想選擇,但在其他情況下,我們想要做的是忽略源 Observable 中的新值,直到前一個值被完全處理。
例如,假設我們正在觸發后端保存請求以響應單擊保存按鈕。 我們可能首先嘗試使用 concatMap 運算符來實現這一點,以確保保存操作按順序發生:
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(
concatMap(() => this.saveCourse(this.form.value))
)
.subscribe();
這確保保存按順序完成,但是如果用戶多次單擊保存按鈕會發生什么? 以下是我們將在網絡日志中看到的內容:
正如我們所見,每次點擊都會觸發自己的保存:如果我們點擊 20 次,我們會得到 20 次保存! 在這種情況下,我們想要的不僅僅是確保按順序進行保存。
我們還希望能夠忽略點擊,但前提是保存已經在進行中。 排氣 Observable 組合策略將允許我們做到這一點。
Exhaust Marble Diagram
就像以前一樣,我們在第一行有一個更高階的 Observable,它的值本身就是 Observable,從第一行分叉出來。這是這張圖中發生的事情:
- 就像 switch 的情況一樣,exhaust 訂閱第一個內部 Observable (a-b-c)
像往常一樣,值 a、b 和 c 會立即反映在輸出中 - 然后發出第二個內部 Observable (d-e-f),而第一個 Observable (a-b-c) 仍在進行中
- 第二個 Observable 被排放策略丟棄,並且不會被訂閱(這是排放的關鍵部分)
只有在第一個 Observable (a-b-c) 完成后,排氣策略才會訂閱新的 Observable - 當第三個 Observable (g-h-i) 發出時,第一個 Observable (a-b-c) 已經完成,所以第三個 Observable 不會被丟棄,會被訂閱
- 然后,第三個 Observable 的值 g-h-i 將顯示在結果 Observable 的輸出中,與輸出中不存在的值 d-e-f 不同
就像 concat、merge 和 switch 的情況一樣,我們現在可以在高階映射的上下文中應用 exhaust 策略。
The RxJs exhaustMap Operator
現在讓我們看看exhaustMap 操作符的彈珠圖。 讓我們記住,與上圖的第一行不同,源 Observable 1-3-5 發出的值不是 Observable。
相反,這些值可以是例如鼠標點擊:
所以這是在排放地圖圖的情況下發生的事情:
- 發出值 1,並創建內部 Observable 10-10-10
- Observable 10-10-10 發出所有值並在源 Observable 中發出值 3 之前完成,因此所有 10-10-10 值在輸出中發出
- 在輸入中發出一個新值 3,觸發一個新的 30-30-30 內部 Observable
- 但是現在,雖然 30-30-30 仍在運行,但我們在源 Observable 中得到了一個新值 5
- 這個值 5 被排氣策略丟棄,這意味着從未創建 50-50-50 Observable,因此 50-50-50 值從未出現在輸出中
A Practical Example for exhaustMap
現在讓我們將這個新的exhaustMap Operator 應用到我們的保存按鈕場景中:
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(
exhaustMap(() => this.saveCourse(this.form.value))
)
.subscribe();
如果我們現在點擊保存,假設連續 5 次,我們將獲得以下網絡日志:
正如我們所看到的,我們在保存請求仍在進行時所做的點擊被忽略了,正如預期的那樣!
請注意,如果我們連續點擊例如 20 次,最終正在進行的保存請求將完成,然后第二個保存請求將開始。
How to choose the right mapping Operator?
concatMap、mergeMap、switchMap 和exhaustMap 的行為相似,因為它們都是高階映射運算符。
但它在許多微妙的方面也如此不同,以至於實際上沒有一個運算符可以安全地指向默認值。
相反,我們可以簡單地根據用例選擇合適的運算符:
-
如果我們需要在等待完成的同時按順序做事情,那么 concatMap 是正確的選擇
-
對於並行處理,mergeMap 是最好的選擇
-
如果我們需要取消邏輯,switchMap 是要走的路
-
為了在當前的 Observables 仍在進行時忽略新的 Observables,exhaustMap 就是這樣做的
總結
正如我們所見,RxJ 的高階映射運算符對於在響應式編程中執行一些非常常見的操作(例如網絡調用)至關重要。
為了真正理解這些映射操作符及其名稱,我們首先需要重點了解底層的Observable組合策略concat、merge、switch和exhaust。
我們還需要意識到有一個更高階的映射操作正在發生,其中值被轉換成分離的 Observables,並且這些 Observables 被映射運算符本身以隱藏的方式訂閱。
選擇正確的算子就是選擇正確的內部 Observable 組合策略。 選擇錯誤的運算符通常不會導致程序立即損壞,但隨着時間的推移可能會導致一些難以解決的問題。
更多Jerry的原創文章,盡在:"汪子熙":