RxJS——主題(Subject)


主題(Subjects)

什么是主題?RxJS 主題就是一個特性類型的 Observable 對象,它允許值多路廣播給觀察者(Observers)。當一個簡單的 Observable 是單播的(每個訂閱的觀察者它們自己都依賴 Observable 的執行)時候,主題(Subjects)就是多播的。

Subjects 就像是一個 Observable,但是它能多播到多個觀察者(Observers)。Subjects 就像是事件發射器:它們維護眾多偵聽者的注冊。

每一個 Subject 都是一個 Observable。給定一個 Subject,你就能訂閱它,提供一個 Observer,開始正常接收值。從 Observer 它的角度講,它不知道 Observable 的執行是否來自普通的單播 Observable 或是 Subject 。

在 Subject 內部,subscribe 不會調用新的執行來發送值。它只是簡單的在觀察者列表中注冊一個觀察者,跟在其他庫和語言中的 addListener 的做法是很相似的。

每個 Subject 也是一個 Observer。它通過 next(v)error(e)complete() 是一個對象。為了給 Subject 提供一個新值,只需要調用 next(theValue),那么它將會多播給注冊偵聽到 Subject 的觀察者。

下面是一個例子,我們有附加了兩個觀察者對象,並且我們發送一些值給 Subject:

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);

//Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

因為 Subject 是一個觀察者,也就是說你也許會提供一個 Subject 作為參數給 subscribe 到任何 Observable,就像下面這個例子:

import { Subject, from } from 'rxjs';
 
const subject = new Subject<number>();

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`)
})
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
const observable = from([1, 2, 3]);
 
observable.subscribe(subject); // 你可以訂閱已經提供的 observable 對象

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

通過上面的方法,本質上我們就僅僅只是通過 Subject 把單播的可觀察的執行轉成了多播的。這個例子演示了主題如何讓多個觀察者共享 Observable 的執行的唯一方法。

這里還有一些特殊的 Subject 類型:BehaviorSubjectReplaySubjectAsyncSubject

多播 Observables

一個 “多播Observable” 通過一個 Subject 傳遞通知,它可能會有很多訂閱者,而一個普通的 “單播 Observable” 只會發送通知到單個觀察者。

一個多播 Observable 在后台(hood) 用一個 Subject 讓多個觀察者都能看到相同的 Observable 執行。

在后台,multicast 又是如何工作的呢:觀察者訂閱一個基礎的 Subject,並且這個 Subject 訂閱了源 Observable。下面的例子跟上面的例子很相似,它使用了 observable.subscribe(subject)

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

//這里在后台就是 `subject.subscribe({...})`
multicasted.subscribe({
    next: (v) => console.log(`observableA: ${v}`);
});
multicasted.subscribe({
    next: (v) => console.log(`observableB: ${v}`);
});

//這個帶后台就是 `source.subscribe(subject)`
multicasted.connect();

multicast 返回一個看起來想平常使用的 Observable,但是工作卻像 Subject,當它訂閱的時候。multicast 返回的實際是 ConnectableObservable,它只是一個使用 connect() 方法的 Observable。

當那些共享的 Observable 的執行開始執行的時候 connect() 方法明確執行是非常重要的。因為 connect() 會在后台執行 source.subscribe(subject)connect() 返回一個 Subscription,它使你能夠取消訂閱,從而取消那些共享的 Observable 的執行。

引用計數(Reference counting)

手動調用 connect() 處理訂閱(Subscription)是很麻煩的。通常,我們想要當第一個觀察者(Observer)到達的時候自動連接,以及當最后一個觀察者取消訂閱的時候自動取消公共的執行。

考慮下面例子,它的訂閱按此列表概述的發生:

  1. 第一個觀察者訂閱多播 Observable
  2. 多播 Observable 連接
  3. next 發送值 0 給第一個觀察者
  4. 第二個觀察者訂閱多播 Observable
  5. next 發送值 1 給第一個觀察者
  6. next 發送值 1 給第一個觀察者
  7. 第一個觀察者從多播 Observable 取消訂閱
  8. next 發送值 2 給第二個觀察者
  9. 第二個觀察者從多播 Observable 取消訂閱
  10. 連接的多播 Observable 取消訂閱

為了達成上述過程,顯示調用 connect(),我們編寫如下代碼:

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
    next: (v) => console.log(`observableA: ${v}`)
});
//這里應該調用 `connect()`,因為第一個訂閱者訂閱了 `multicasted`,它正在對消費的值感興趣
subscriptionConnect = multicasted.connect();

setTimeout(() => {
    subscription2 = multicasted.subscribe({
        next: (v) => console.log(`observableB: ${v}`)
    });
},600);

setTimeout(() => {
    subscription1.unsubsribe();
},1200);

//這里我們應該取消訂閱公共的 Observable 的執行
setTimeout(() => {
    subscription2.unsubscribe();
    subscriptionConnect.unsubscribe();	//這個是針對公共的 Observable 的執行
}, 2000);

如果我們希望避免顯示調用 connect(),我們可以使用 ConnectableObservable.refCount() (引用計數)方法,它返回一個 Observable,而且它還是可以追蹤它有的所有訂閱者。當訂閱者們的這個數字從 0 增到 1,它就會自動調用 connect(),開始公共的執行。只有當計數從 1 到 0 時才會整個取消訂閱,停止所有的執行。

refCount 使多播 Observable 當第一個訂閱者到達的時候自動開始執行,並且最后一個離開的時候停止執行。

下面是例子:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
 
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

//這里自動調用 `connect()`,因為第一個 ‘refCounted’ 的訂閱者
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
    next: (v) => console.log(`observableA: ${v}`)
});

sutTimeout(() => {
    console.log('observerB subscribed');
    subscription2 = refCounted.subscribe({
        next: (v) => console.log(`observableB ${v}`)
    });
}, 600);

setTimeout(() => {
    console.log('observerA unsubscribed');
    subscription1.unsubscribe();
}, 1200);

//這里公共的 Observable 的執行將會停止,因為 'refCounted' 在這之后沒有訂閱者了
setTimeout(() => {
    console.log('observerB unsubscribed');
    subscription2.unsubscribe();
}, 2000);

//Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCounted() 方法只存在於 ConnectableObservable 對象中,並且它返回的是一個 Observable 而不是 ConnectableObservable

行為主題(BehaviorSubject)

有一個 Subjects 的變體就是 BehaviorSubject,它有一個 “當前值” 的概念。它會存儲最近的發送給消費者的一個值,無論這個新的觀察者是否訂閱,它都將會立即從 BehaviorSubject 接收這個 “當前值”。

BehaviorSubject 對於表示 “過程值(values over time)” 是很有用的。例如一個表示生日的事件流是一個 Subject,那么這個人的年齡的流將是一個 BehaviorSubject

看下面的例子,BehaviorSubject 初始化為 0,它在第一個觀察者接收這個值的時候開始訂閱。第二個觀察者接收值 2,即使它在這個值 2 發送之后被訂閱的。

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0);

subject.subject({
    next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: v => console.log(`observerB: ${v}`)
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

重播主題(ReplaySubject)

應答主題很像 BehaviorSubject,它能發送舊的值給新的訂閱者,但是它也能記錄部分 Observable 的執行。

ReplaySubject 從 Observable 的執行中記錄多個值並且重新把這些值發送給新的訂閱者

當創建一個 ReplaySubject 時,你可以指定這些如何重播:

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 緩沖 3 個值給新的訂閱者

subject.subscribe({
    next: v => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: v => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

你也可以緩存大小里指定一個窗口時間,來確定記錄那些值的年齡。在下面的代碼中,我們使用 100 大小的緩沖,但是窗口時間參數是 500 毫秒。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* 窗口時間 */);

subject.subscribe({
    next: v => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
    subject.subscribe({
        next: v => console.log(`observerB: ${v}`)
    })
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

異步主題(AsyncSubject)

AsyncSubject 是一個變體,它只會發送 Observable 的執行的最后一個值給觀察者們,並且只當執行完成的時候。

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`)
})

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

AsyncSubject 跟 last() 操作符相似,它等待 complete 通知以便於發送一個值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM