內容導航
RxJS是什么
RxJS 是一個庫,它通過使用 observable 序列來編寫異步和基於事件的程序。它提供了一個核心類型 Observable,附屬類型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 啟發的操作符 (map、filter、reduce、every, 等等),這些數組操作符可以把異步事件作為集合來處理。
可以把 RxJS 當做是用來處理事件的 Lodash 。
ReactiveX 結合了 觀察者模式、迭代器模式 和 使用集合的函數式編程,以滿足以一種理想方式來管理事件序列所需要的一切。
RxJS的主要成員
- Observable (可觀察對象): 表示一個概念,這個概念是一個可調用的未來值或事件的集合。
- Observer (觀察者): 一個回調函數的集合,它知道如何去監聽由 Observable 提供的值。
- Subscription (訂閱): 表示 Observable 的執行,主要用於取消 Observable 的執行。
- Operators (操作符): 采用函數式編程風格的純函數 (pure function),使用像
map、filter、concat、flatMap等這樣的操作符來處理集合。 - Subject (主體): 相當於 EventEmitter,並且是將值或事件多路推送給多個 Observer 的唯一方式。
- Schedulers (調度器): 用來控制並發並且是中央集權的調度員,允許我們在發生計算時進行協調,例如
setTimeout或requestAnimationFrame或其他。
Observable (可觀察對象)
RxJS 是基於觀察者模式和迭代器模式以函數式編程思維來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 作為被觀察者,是一個值或事件的流集合;而 Observer 則作為觀察者,根據 Observables 進行處理。Observables 是多個值的惰性推送集合。
- of():用於創建簡單的Observable,該Observable只發出給定的參數,在發送完這些參數后發出完成通知.
- from():從一個數組、類數組對象、promise、迭代器對象或者類Observable對象創建一個Observable.
- fromEvent(),:把event轉換成Observable.
- range():在指定起始值返回指定數量數字.
- interval():基於給定時間間隔發出數字序列。返回一個發出無限自增的序列整數,可以選擇固定的時間間隔進行發送。
- timer():創建一個Observable,該Observable在初始延時之后開始發送並且在每個時間周期后發出自增的數字
創建 Observable
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const Observable1 = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err); //傳遞一個錯誤對象,如果捕捉到異常的話。
}
});
const Observable2 = from([
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
]);
const Observable3 = of("Dave","Nick");//把所有參數組合到數組,逐個提供給消費者
const Observable4 = range(1,10);
const Observable5 = interval(3000);//從零開始每3000毫秒自增並提供給消費者
const Observable6 = timer(3000,1000);//等待3000毫秒后,從零開始每1000毫秒自增並提供給消費者
訂閱 Observables
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源。
當調用了 observable.subscribe ,觀察者會被附加到新創建的 Observable 執行中。這個調用還返回一個對象,即 Subscription (訂閱):
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable1 = range(1,10);
observable1.subscribe(
num => {
console.log(num);
},
err => console.log(err),
() => console.log("Streaming is over.")
);
執行 Observables
Observable 執行可以傳遞三種類型的值:
- "Next" 通知: 發送一個值,比如數字、字符串、對象,等等。
- "Error" 通知: 發送一個 JavaScript 錯誤 或 異常。
- "Complete" 通知: 不再發送任何值。
"Next" 通知是最重要,也是最常見的類型:它們表示傳遞給觀察者的實際數據。"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,並且只會執行其中的一個。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const observable = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 因為違反規約,所以不會發送
} catch (err) {
subscriber.error(err); //傳遞一個錯誤對象,如果捕捉到異常的話。
}
});
清理 Observable 執行
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或內存資源
當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要調用
unsubscribe()方法就可以取消執行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable = new Observable(subscriber => {
let intervalID = setInterval(() => {
subscriber.next('hi');
}, 1000);
// 提供取消和清理 interval 資源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
let subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Observer (觀察者)
觀察者是由 Observable 發送的值的消費者。觀察者只是一組回調函數的集合,每個回調函數對應一種 Observable 發送的通知類型:next、error 和 complete 。下面的示例是一個典型的觀察者對象:
觀察者只是有三個回調函數的對象,每個回調函數對應一種 Observable 發送的通知類型。
observable.subscribe(
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
);
Subscription (訂閱)
Subscription 是表示可清理資源的對象,通常是 Observable 的執行。Subscription 有一個重要的方法,即 unsubscribe,它不需要任何參數,只是用來清理由 Subscription 占用的資源。在上一個版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理對象)。
Subscription 基本上只有一個
unsubscribe()函數,這個函數用來釋放資源或去取消 Observable 執行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
var observable1 = interval(1000);
var subscription1 = observable1.subscribe(x => console.log(x));
// 稍后:
// 這會取消正在進行中的 Observable 執行
// Observable 執行是通過使用觀察者調用 subscribe 方法啟動的
subscription1.unsubscribe();
var observable2 = interval(400);
var observable3 = interval(300);
var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
subscription2.add(childSubscription);
setTimeout(() => {
// subscription 和 childSubscription 都會取消訂閱
subscription2.unsubscribe();
}, 1000);
Subject (主體)
RxJS Subject 是一種特殊類型的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行)。
Subject 像是 Observable,但是可以多播給多個觀察者。Subject 還像是 EventEmitters,維護着多個監聽器的注冊表。
還有一些特殊類型的 Subject:
BehaviorSubject、ReplaySubject和AsyncSubject。
每個 Subject 都是 Observable 。 - 對於 Subject,你可以提供一個觀察者並使用 subscribe 方法,就可以開始正常接收值。從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject 。
在 Subject 的內部,subscribe 不會調用發送值的新執行。它只是將給定的觀察者注冊到觀察者列表中,類似於其他庫或語言中的 addListener 的工作方式。
每個 Subject 都是觀察者。 - Subject 是一個有如下方法的對象: next(v)、error(e) 和 complete() 。要給 Subject 提供新值,只要調用 next(theValue),它會將值多播給已注冊監聽該 Subject 的觀察者們。
import { Subject,from } from 'rxjs';
//我們為 Subject 添加了兩個觀察者,然后給 Subject 提供一些值
var subject1 = new Subject();
subject1.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject1.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject1.next(1);
subject1.next(2);
//因為 Subject 是觀察者,這也就在意味着你可以把 Subject 作為參數傳給任何 Observable 的 subscribe 方法
var subject2 =new Subject();
subject2.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject2.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = from([1, 2, 3]);
observable.subscribe(subject2); // 你可以提供一個 Subject 進行訂閱
多播的 Observables
“多播 Observable” 通過 Subject 來發送通知,這個 Subject 可能有多個訂閱者,然而普通的 “單播 Observable” 只發送通知給單個觀察者。
多播 Observable 在底層是通過使用 Subject 使得多個觀察者可以看見同一個 Observable 執行。
在底層,這就是 multicast 操作符的工作原理:觀察者訂閱一個基礎的 Subject,然后 Subject 訂閱源 Observable 。
import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators';
const source = timer(1000, 2500).pipe(take(5));
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerD: ' + v)
});
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
source.subscribe(subject);
BehaviorSubject
Subject 的其中一個變體就是 BehaviorSubject,它有一個“當前值”的概念。它保存了發送給消費者的最新值。並且當有新的觀察者訂閱時,會立即從 BehaviorSubject 那接收到“當前值”。
BehaviorSubjects 適合用來表示“隨時間推移的值”。舉例來說,生日的流是一個 Subject,但年齡的流應該是一個 BehaviorSubject 。
import { BehaviorSubject } from 'rxjs';
//BehaviorSubject 使用值0進行初始化,當第一個觀察者訂閱時會得到0。第二個觀察者訂閱時會得到值2,盡管它是在值2發送之后訂閱的。
const subject = new BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
ReplaySubject
ReplaySubject 類似於 BehaviorSubject,它可以發送舊值給新的訂閱者,但它還可以記錄 Observable 執行的一部分。
ReplaySubject記錄 Observable 執行中的多個值並將其回放給新的訂閱者。除了緩沖數量,你還可以指定 window time (以毫秒為單位)來確定多久之前的值可以記錄。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 為新的訂閱者緩沖最后3個值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
//我們緩存數量100,但 window time 參數只設置了120毫秒
const subject = new ReplaySubject(100, 120 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
AsyncSubject
AsyncSubject 是另一個 Subject 變體,只有當 Observable 執行完成時(執行 complete()),它才會將執行的最后一個值發送給觀察者。
AsyncSubject 和
last()操作符類似,因為它也是等待complete通知,以發送一個單個值。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
Scheduler (調度器)
調度器控制着何時啟動 subscription 和何時發送通知。它由三部分組成:
- 調度器是一種數據結構。 它知道如何根據優先級或其他標准來存儲任務和將任務進行排序。
- 調度器是執行上下文。 它表示在何時何地執行任務(舉例來說,立即的,或另一種回調函數機制(比如 setTimeout 或 process.nextTick),或動畫幀)。
- 調度器有一個(虛擬的)時鍾。 調度器功能通過它的 getter 方法
now()提供了“時間”的概念。在具體調度器上安排的任務將嚴格遵循該時鍾所表示的時間。
調度器可以讓你規定 Observable 在什么樣的執行上下文中發送通知給它的觀察者。
import { asyncScheduler, Observable } from 'rxjs';
//我們使用普通的 Observable ,它同步地發出值`1`、`2`、`3`,並使用操作符 `observeOn` 來指定 `async` 調度器發送這些值。
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
})
.pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//你會發現"just after subscribe"在"got value..."之前就出現了
//just before subscribe
//just after subscribe
//got value 1
//got value 2
//got value 3
//done
調度器類型
async 調度器是 RxJS 提供的內置調度器中的一個。可以通過使用 Scheduler 對象的靜態屬性創建並返回其中的每種類型的調度器。
| 調度器 | 目的 |
|---|---|
null |
不傳遞任何調度器的話,會以同步遞歸的方式發送通知。用於定時操作或尾遞歸操作。 |
queueScheduler |
當前事件幀中的隊列調度(蹦床調度器)。用於迭代操作。 |
asapScheduler |
微任務的隊列調度,它使用可用的最快速的傳輸機制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用於異步轉換。 |
asyncScheduler |
使用 setInterval 的調度。用於基於時間的操作符。 |
animationFrameScheduler |
計划將在下一次瀏覽器內容重新繪制之前發生的任務。 可用於創建流暢的瀏覽器動畫。 |
Pipeable(操作符)
操作符就是函數,管道操作符本質上是一個純函數,它將一個Observable作為輸入並生成另一個Observable作為輸出。訂閱輸出Observable也將訂閱輸入Observable。 操作符有兩種:
管道操作符是一個將Observable作為其輸入並返回另一個Observable的函數。這是一個純粹的操作:以前的Observable保持不變。
-
管道操作符是可以使用語法
observableInstance.pipe(operator())傳遞給Observable的類型。 這些包括filter()和mergeMap()。 調用時,它們不會更改現有的Observable實例。 相反,它們返回一個新的Observable,其訂閱邏輯基於第一個Observable。 -
創建運算符是另一種運算符,可以稱為獨立函數來創建新的Observable。例如:
of(1,2,3)創建一個observable ,該對象將依次發射1、2和3。創建運算符將在后面的部分中詳細討論。
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
常用的操作符
finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:
返回原始Observable,但在Observable完成或發生錯誤終止時將調用指定的函數。
創建操作符
ajaxbindCallbackbindNodeCallbackdeferemptyfromfromEventfromEventPatterngenerateintervalofrangethrowErrortimeriif
連接創建操作符
These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.
轉換操作符
bufferbufferCountbufferTimebufferTogglebufferWhenconcatMapconcatMapToexhaustexhaustMapexpandgroupBymapmapTomergeMapmergeMapTomergeScanpairwisepartitionpluckscanswitchMapswitchMapTowindowwindowCountwindowTimewindowTogglewindowWhen
過濾操作符
auditauditTimedebouncedebounceTimedistinctdistinctKeydistinctUntilChangeddistinctUntilKeyChangedelementAtfilterfirstignoreElementslastsamplesampleTimesingleskipskipLastskipUntilskipWhiletaketakeLasttakeUntiltakeWhilethrottlethrottleTime
組合操作符
Also see the Join Creation Operators section above.
多播操作符
錯誤處理操作符
工具操作符
tapdelaydelayWhendematerializematerializeobserveOnsubscribeOntimeIntervaltimestamptimeouttimeoutWithtoArray
