內容導航
RxJS是什么
RxJS 是一個庫,它通過使用 observable 序列來編寫異步和基於事件的程序。它提供了一個核心類型 Observable,附屬類型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 啟發的操作符 (map、filter、reduce、every, 等等),這些數組操作符可以把異步事件作為集合來處理。
可以把 RxJS 當做是用來處理事件的 Lodash 。
ReactiveX 結合了 觀察者模式、迭代器模式 和 使用集合的函數式編程,以滿足以一種理想方式來管理事件序列所需要的一切。
RxJS的主要成員
- Observable (可觀察對象): 表示一個概念,這個概念是一個可調用的未來值或事件的集合。
- Observer (觀察者): 一個回調函數的集合,它知道如何去監聽由 Observable 提供的值。
- Subscription (訂閱): 表示 Observable 的執行,主要用於取消 Observable 的執行。
- Operators (操作符): 采用函數式編程風格的純函數 (pure function),使用像
map
、filter
、concat
、flatMap
等這樣的操作符來處理集合。 - Subject (主體): 相當於 EventEmitter,並且是將值或事件多路推送給多個 Observer 的唯一方式。
- Schedulers (調度器): 用來控制並發並且是中央集權的調度員,允許我們在發生計算時進行協調,例如
setTimeout
或requestAnimationFrame
或其他。
Observable (可觀察對象)
RxJS 是基於觀察者模式和迭代器模式以函數式編程思維來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 作為被觀察者,是一個值或事件的流集合;而 Observer 則作為觀察者,根據 Observables 進行處理。Observables 是多個值的惰性推送集合。
- of():用於創建簡單的Observable,該Observable只發出給定的參數,在發送完這些參數后發出完成通知.
- from():從一個數組、類數組對象、promise、迭代器對象或者類Observable對象創建一個Observable.
- fromEvent(),:把event轉換成Observable.
- range():在指定起始值返回指定數量數字.
- interval():基於給定時間間隔發出數字序列。返回一個發出無限自增的序列整數,可以選擇固定的時間間隔進行發送。
- timer():創建一個Observable,該Observable在初始延時之后開始發送並且在每個時間周期后發出自增的數字
創建 Observable
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const Observable1 = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err); //傳遞一個錯誤對象,如果捕捉到異常的話。
}
});
const Observable2 = from([
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
]);
const Observable3 = of("Dave","Nick");//把所有參數組合到數組,逐個提供給消費者
const Observable4 = range(1,10);
const Observable5 = interval(3000);//從零開始每3000毫秒自增並提供給消費者
const Observable6 = timer(3000,1000);//等待3000毫秒后,從零開始每1000毫秒自增並提供給消費者
訂閱 Observables
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源。
當調用了 observable.subscribe
,觀察者會被附加到新創建的 Observable 執行中。這個調用還返回一個對象,即 Subscription
(訂閱):
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable1 = range(1,10);
observable1.subscribe(
num => {
console.log(num);
},
err => console.log(err),
() => console.log("Streaming is over.")
);
執行 Observables
Observable 執行可以傳遞三種類型的值:
- "Next" 通知: 發送一個值,比如數字、字符串、對象,等等。
- "Error" 通知: 發送一個 JavaScript 錯誤 或 異常。
- "Complete" 通知: 不再發送任何值。
"Next" 通知是最重要,也是最常見的類型:它們表示傳遞給觀察者的實際數據。"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,並且只會執行其中的一個。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const observable = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 因為違反規約,所以不會發送
} catch (err) {
subscriber.error(err); //傳遞一個錯誤對象,如果捕捉到異常的話。
}
});
清理 Observable 執行
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源
當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要調用
unsubscribe()
方法就可以取消執行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable = new Observable(subscriber => {
let intervalID = setInterval(() => {
subscriber.next('hi');
}, 1000);
// 提供取消和清理 interval 資源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
let subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Observer (觀察者)
觀察者是由 Observable 發送的值的消費者。觀察者只是一組回調函數的集合,每個回調函數對應一種 Observable 發送的通知類型:next
、error
和 complete
。下面的示例是一個典型的觀察者對象:
觀察者只是有三個回調函數的對象,每個回調函數對應一種 Observable 發送的通知類型。
observable.subscribe(
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
);
Subscription (訂閱)
Subscription 是表示可清理資源的對象,通常是 Observable 的執行。Subscription 有一個重要的方法,即 unsubscribe
,它不需要任何參數,只是用來清理由 Subscription 占用的資源。在上一個版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理對象)。
Subscription 基本上只有一個
unsubscribe()
函數,這個函數用來釋放資源或去取消 Observable 執行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
var observable1 = interval(1000);
var subscription1 = observable1.subscribe(x => console.log(x));
// 稍后:
// 這會取消正在進行中的 Observable 執行
// Observable 執行是通過使用觀察者調用 subscribe 方法啟動的
subscription1.unsubscribe();
var observable2 = interval(400);
var observable3 = interval(300);
var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
subscription2.add(childSubscription);
setTimeout(() => {
// subscription 和 childSubscription 都會取消訂閱
subscription2.unsubscribe();
}, 1000);
Subject (主體)
RxJS Subject 是一種特殊類型的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行)。
Subject 像是 Observable,但是可以多播給多個觀察者。Subject 還像是 EventEmitters,維護着多個監聽器的注冊表。
還有一些特殊類型的 Subject:
BehaviorSubject
、ReplaySubject
和AsyncSubject
。
每個 Subject 都是 Observable 。 - 對於 Subject,你可以提供一個觀察者並使用 subscribe
方法,就可以開始正常接收值。從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject 。
在 Subject 的內部,subscribe
不會調用發送值的新執行。它只是將給定的觀察者注冊到觀察者列表中,類似於其他庫或語言中的 addListener
的工作方式。
每個 Subject 都是觀察者。 - Subject 是一個有如下方法的對象: next(v)
、error(e)
和 complete()
。要給 Subject 提供新值,只要調用 next(theValue)
,它會將值多播給已注冊監聽該 Subject 的觀察者們。
import { Subject,from } from 'rxjs';
//我們為 Subject 添加了兩個觀察者,然后給 Subject 提供一些值
var subject1 = new Subject();
subject1.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject1.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject1.next(1);
subject1.next(2);
//因為 Subject 是觀察者,這也就在意味着你可以把 Subject 作為參數傳給任何 Observable 的 subscribe 方法
var subject2 =new Subject();
subject2.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject2.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = from([1, 2, 3]);
observable.subscribe(subject2); // 你可以提供一個 Subject 進行訂閱
多播的 Observables
“多播 Observable” 通過 Subject 來發送通知,這個 Subject 可能有多個訂閱者,然而普通的 “單播 Observable” 只發送通知給單個觀察者。
多播 Observable 在底層是通過使用 Subject 使得多個觀察者可以看見同一個 Observable 執行。
在底層,這就是 multicast
操作符的工作原理:觀察者訂閱一個基礎的 Subject,然后 Subject 訂閱源 Observable 。
import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators';
const source = timer(1000, 2500).pipe(take(5));
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerD: ' + v)
});
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
source.subscribe(subject);
BehaviorSubject
Subject 的其中一個變體就是 BehaviorSubject
,它有一個“當前值”的概念。它保存了發送給消費者的最新值。並且當有新的觀察者訂閱時,會立即從 BehaviorSubject
那接收到“當前值”。
BehaviorSubjects 適合用來表示“隨時間推移的值”。舉例來說,生日的流是一個 Subject,但年齡的流應該是一個 BehaviorSubject 。
import { BehaviorSubject } from 'rxjs';
//BehaviorSubject 使用值0進行初始化,當第一個觀察者訂閱時會得到0。第二個觀察者訂閱時會得到值2,盡管它是在值2發送之后訂閱的。
const subject = new BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
ReplaySubject
ReplaySubject
類似於 BehaviorSubject
,它可以發送舊值給新的訂閱者,但它還可以記錄 Observable 執行的一部分。
ReplaySubject
記錄 Observable 執行中的多個值並將其回放給新的訂閱者。除了緩沖數量,你還可以指定 window time (以毫秒為單位)來確定多久之前的值可以記錄。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 為新的訂閱者緩沖最后3個值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
//我們緩存數量100,但 window time 參數只設置了120毫秒
const subject = new ReplaySubject(100, 120 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
AsyncSubject
AsyncSubject 是另一個 Subject 變體,只有當 Observable 執行完成時(執行 complete()
),它才會將執行的最后一個值發送給觀察者。
AsyncSubject 和
last()
操作符類似,因為它也是等待complete
通知,以發送一個單個值。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
Scheduler (調度器)
調度器控制着何時啟動 subscription 和何時發送通知。它由三部分組成:
- 調度器是一種數據結構。 它知道如何根據優先級或其他標准來存儲任務和將任務進行排序。
- 調度器是執行上下文。 它表示在何時何地執行任務(舉例來說,立即的,或另一種回調函數機制(比如 setTimeout 或 process.nextTick),或動畫幀)。
- 調度器有一個(虛擬的)時鍾。 調度器功能通過它的 getter 方法
now()
提供了“時間”的概念。在具體調度器上安排的任務將嚴格遵循該時鍾所表示的時間。
調度器可以讓你規定 Observable 在什么樣的執行上下文中發送通知給它的觀察者。
import { asyncScheduler, Observable } from 'rxjs';
//我們使用普通的 Observable ,它同步地發出值`1`、`2`、`3`,並使用操作符 `observeOn` 來指定 `async` 調度器發送這些值。
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
})
.pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//你會發現"just after subscribe"在"got value..."之前就出現了
//just before subscribe
//just after subscribe
//got value 1
//got value 2
//got value 3
//done
調度器類型
async
調度器是 RxJS 提供的內置調度器中的一個。可以通過使用 Scheduler
對象的靜態屬性創建並返回其中的每種類型的調度器。
調度器 | 目的 |
---|---|
null |
不傳遞任何調度器的話,會以同步遞歸的方式發送通知。用於定時操作或尾遞歸操作。 |
queueScheduler |
當前事件幀中的隊列調度(蹦床調度器)。用於迭代操作。 |
asapScheduler |
微任務的隊列調度,它使用可用的最快速的傳輸機制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用於異步轉換。 |
asyncScheduler |
使用 setInterval 的調度。用於基於時間的操作符。 |
animationFrameScheduler |
計划將在下一次瀏覽器內容重新繪制之前發生的任務。 可用於創建流暢的瀏覽器動畫。 |
Pipeable(操作符)
操作符就是函數,管道操作符本質上是一個純函數,它將一個Observable作為輸入並生成另一個Observable作為輸出。訂閱輸出Observable也將訂閱輸入Observable。 操作符有兩種:
管道操作符是一個將Observable作為其輸入並返回另一個Observable的函數。這是一個純粹的操作:以前的Observable保持不變。
-
管道操作符是可以使用語法
observableInstance.pipe(operator())
傳遞給Observable的類型。 這些包括filter()
和mergeMap()
。 調用時,它們不會更改現有的Observable實例。 相反,它們返回一個新的Observable,其訂閱邏輯基於第一個Observable。 -
創建運算符是另一種運算符,可以稱為獨立函數來創建新的Observable。例如:
of(1,2,3)
創建一個observable ,該對象將依次發射1、2和3。創建運算符將在后面的部分中詳細討論。
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
常用的操作符
finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>
:
返回原始Observable,但在Observable完成或發生錯誤終止時將調用指定的函數。
創建操作符
ajax
bindCallback
bindNodeCallback
defer
empty
from
fromEvent
fromEventPattern
generate
interval
of
range
throwError
timer
iif
連接創建操作符
These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.
轉換操作符
buffer
bufferCount
bufferTime
bufferToggle
bufferWhen
concatMap
concatMapTo
exhaust
exhaustMap
expand
groupBy
map
mapTo
mergeMap
mergeMapTo
mergeScan
pairwise
partition
pluck
scan
switchMap
switchMapTo
window
windowCount
windowTime
windowToggle
windowWhen
過濾操作符
audit
auditTime
debounce
debounceTime
distinct
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
elementAt
filter
first
ignoreElements
last
sample
sampleTime
single
skip
skipLast
skipUntil
skipWhile
take
takeLast
takeUntil
takeWhile
throttle
throttleTime
組合操作符
Also see the Join Creation Operators section above.
多播操作符
錯誤處理操作符
工具操作符
tap
delay
delayWhen
dematerialize
materialize
observeOn
subscribeOn
timeInterval
timestamp
timeout
timeoutWith
toArray