主題(Subjects)
什么是主題?RxJS 主題就是一個特性類型的 Observable 對象,它允許值多路廣播給觀察者(Observers)。當一個簡單的 Observable 是單播的(每個訂閱的觀察者它們自己都依賴 Observable 的執行)時候,主題(Subjects)就是多播的。
Subjects 就像是一個 Observable,但是它能多播到多個觀察者(Observers)。Subjects 就像是事件發射器:它們維護眾多偵聽者的注冊。
每一個 Subject 都是一個 Observable。給定一個 Subject,你就能訂閱它,提供一個 Observer,開始正常接收值。從 Observer 它的角度講,它不知道 Observable 的執行是否來自普通的單播 Observable 或是 Subject 。
在 Subject 內部,subscribe
不會調用新的執行來發送值。它只是簡單的在觀察者列表中注冊一個觀察者,跟在其他庫和語言中的 addListener
的做法是很相似的。
每個 Subject 也是一個 Observer。它通過 next(v)
,error(e)
,complete()
是一個對象。為了給 Subject 提供一個新值,只需要調用 next(theValue)
,那么它將會多播給注冊偵聽到 Subject 的觀察者。
下面是一個例子,我們有附加了兩個觀察者對象,並且我們發送一些值給 Subject:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
//Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
因為 Subject 是一個觀察者,也就是說你也許會提供一個 Subject 作為參數給 subscribe
到任何 Observable,就像下面這個例子:
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
})
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // 你可以訂閱已經提供的 observable 對象
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
通過上面的方法,本質上我們就僅僅只是通過 Subject 把單播的可觀察的執行轉成了多播的。這個例子演示了主題如何讓多個觀察者共享 Observable 的執行的唯一方法。
這里還有一些特殊的 Subject 類型:BehaviorSubject
,ReplaySubject
,AsyncSubject
。
多播 Observables
一個 “多播Observable” 通過一個 Subject 傳遞通知,它可能會有很多訂閱者,而一個普通的 “單播 Observable” 只會發送通知到單個觀察者。
一個多播 Observable 在后台(hood) 用一個 Subject 讓多個觀察者都能看到相同的 Observable 執行。
在后台,multicast
又是如何工作的呢:觀察者訂閱一個基礎的 Subject,並且這個 Subject 訂閱了源 Observable。下面的例子跟上面的例子很相似,它使用了 observable.subscribe(subject)
:
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
//這里在后台就是 `subject.subscribe({...})`
multicasted.subscribe({
next: (v) => console.log(`observableA: ${v}`);
});
multicasted.subscribe({
next: (v) => console.log(`observableB: ${v}`);
});
//這個帶后台就是 `source.subscribe(subject)`
multicasted.connect();
multicast
返回一個看起來想平常使用的 Observable,但是工作卻像 Subject,當它訂閱的時候。multicast
返回的實際是 ConnectableObservable
,它只是一個使用 connect()
方法的 Observable。
當那些共享的 Observable 的執行開始執行的時候 connect()
方法明確執行是非常重要的。因為 connect()
會在后台執行 source.subscribe(subject)
,connect()
返回一個 Subscription,它使你能夠取消訂閱,從而取消那些共享的 Observable 的執行。
引用計數(Reference counting)
手動調用 connect()
處理訂閱(Subscription)是很麻煩的。通常,我們想要當第一個觀察者(Observer)到達的時候自動連接,以及當最后一個觀察者取消訂閱的時候自動取消公共的執行。
考慮下面例子,它的訂閱按此列表概述的發生:
- 第一個觀察者訂閱多播 Observable
- 多播 Observable 連接
next
發送值 0 給第一個觀察者- 第二個觀察者訂閱多播 Observable
next
發送值 1 給第一個觀察者next
發送值 1 給第一個觀察者- 第一個觀察者從多播 Observable 取消訂閱
next
發送值 2 給第二個觀察者- 第二個觀察者從多播 Observable 取消訂閱
- 連接的多播 Observable 取消訂閱
為了達成上述過程,顯示調用 connect()
,我們編寫如下代碼:
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observableA: ${v}`)
});
//這里應該調用 `connect()`,因為第一個訂閱者訂閱了 `multicasted`,它正在對消費的值感興趣
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observableB: ${v}`)
});
},600);
setTimeout(() => {
subscription1.unsubsribe();
},1200);
//這里我們應該取消訂閱公共的 Observable 的執行
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); //這個是針對公共的 Observable 的執行
}, 2000);
如果我們希望避免顯示調用 connect()
,我們可以使用 ConnectableObservable.refCount()
(引用計數)方法,它返回一個 Observable,而且它還是可以追蹤它有的所有訂閱者。當訂閱者們的這個數字從 0 增到 1,它就會自動調用 connect()
,開始公共的執行。只有當計數從 1 到 0 時才會整個取消訂閱,停止所有的執行。
refCount 使多播 Observable 當第一個訂閱者到達的時候自動開始執行,並且最后一個離開的時候停止執行。
下面是例子:
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
//這里自動調用 `connect()`,因為第一個 ‘refCounted’ 的訂閱者
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observableA: ${v}`)
});
sutTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observableB ${v}`)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
//這里公共的 Observable 的執行將會停止,因為 'refCounted' 在這之后沒有訂閱者了
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
//Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
refCounted()
方法只存在於 ConnectableObservable
對象中,並且它返回的是一個 Observable 而不是 ConnectableObservable
。
行為主題(BehaviorSubject)
有一個 Subjects 的變體就是 BehaviorSubject
,它有一個 “當前值” 的概念。它會存儲最近的發送給消費者的一個值,無論這個新的觀察者是否訂閱,它都將會立即從 BehaviorSubject
接收這個 “當前值”。
BehaviorSubject 對於表示 “過程值(values over time)” 是很有用的。例如一個表示生日的事件流是一個 Subject,那么這個人的年齡的流將是一個 BehaviorSubject
看下面的例子,BehaviorSubject 初始化為 0,它在第一個觀察者接收這個值的時候開始訂閱。第二個觀察者接收值 2,即使它在這個值 2 發送之后被訂閱的。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0);
subject.subject({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: v => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
重播主題(ReplaySubject)
應答主題很像 BehaviorSubject
,它能發送舊的值給新的訂閱者,但是它也能記錄部分 Observable 的執行。
ReplaySubject 從 Observable 的執行中記錄多個值並且重新把這些值發送給新的訂閱者
當創建一個 ReplaySubject
時,你可以指定這些如何重播:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 緩沖 3 個值給新的訂閱者
subject.subscribe({
next: v => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: v => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
你也可以緩存大小里指定一個窗口時間,來確定記錄那些值的年齡。在下面的代碼中,我們使用 100 大小的緩沖,但是窗口時間參數是 500 毫秒。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* 窗口時間 */);
subject.subscribe({
next: v => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: v => console.log(`observerB: ${v}`)
})
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
異步主題(AsyncSubject)
AsyncSubject 是一個變體,它只會發送 Observable 的執行的最后一個值給觀察者們,並且只當執行完成的時候。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
})
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
AsyncSubject 跟 last()
操作符相似,它等待 complete
通知以便於發送一個值。