使用 Rx 中預定義的 Subject


看到一幅有趣的關於 Rx 學習的圖,想知道學習 Rx 的學習曲線?不,是峭壁!

我們可以直接通過 Rx 的 Observer 來創建 Observable 對象

但是,使用這種方式往往比較復雜,在特定的場景下,我們可以直接使用 Rx 提供的特定 Subject 來實現 Observable。這些特定的 Subject 是主題和訂閱者的混合體,我們可以直接使用這樣的一個對象來實現信息的發布和數據流的訂閱。

1. Subject

通用的 Subject,既可以被訂閱,從名字也可以看到它本身就是一個主題,所以可以直接用來發布信息。如果需要實現一個普通的主題,它就是最理想的選擇。

使用方式:

發布信息的方法:

onNext( value )

發布一個新的值到數據流中。

onCompleted()

數據流終止。

onError( error )

發布異常。

使用示例:

var subject = new Rx.Subject();

var subscription = subject.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

subject.next(42);

// => Next: 42

subject.next(56);

// => Next: 56

subject.completed();

// => Completed

 

See Also: 

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/subject.md

2. AsyncSubject

緩存直到 completed() 的最后一個值。所有的訂閱者都會收到同樣的最后一個值。

注意只能有一個值,在 completed() 之后,將不能再發布新的值。而所有的訂閱者也只能得到最后一個值。

使用方式:

必須使用 completed() 完成流,訂閱者將會在 completed() 之后才能得到最后一個值。

使用示例:

var subject = new Rx.AsyncSubject();

var i = 0;
var handle = setInterval(function () {
    subject.onNext(i);
    if (++i > 3) {
        subject.onCompleted();
        clearInterval(handle);
    }
}, 500);

var subscription = subject.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 3
// => Completed

圖例

See also: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md

3. BehaviorSubject

如果是希望訂閱者獲得當前的最后一個值,但是后面可能還會提供新的值,可以考慮這個。

緩存已經發布的最后數據,新的訂閱者可以接收到最后一個已經發布的值,和以后發布的新的值。

它可以直接設置一個初始值。如果不需要初始值,可以考慮使用 ReplaySubject.

使用方式:

BehaviorSubject(initialValue)

在構造函數中提供初始的值。

getValue()

獲取當前的值,或者拋出異常,在調用了 completed() 之后,最后的值被保留。在 error() 之后,總是拋出一個特定的異常。

使用示例

/* Initialize with initial value of 42 */
var subject = new Rx.BehaviorSubject(42);

var subscription = subject.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 42

subject.next(56);
// => Next: 56

subject.completed();
// => Completed

圖例

 

See also: 

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md

4. ReplaySubject

可以用來緩存流中最后 n 個數據,在新的觀察者登記之后,這些緩存的數據直接發布給新的觀察者。

使用說明:

在構造 ReplaySubject 對象的時候,配置緩存的數據元素數量以及時間窗口。

ReplaySubject([bufferSize], [windowSize], [scheduler])

使用特定的緩存大小,時間窗口和調度器來創建 ReplaySubject 對象實例.

Arguments

  1. [bufferSize = Number.MAX_VALUE] (Number): Maximum element count of the replay buffer.
  2. [windowSize = NUMBER.MAX_VALUE] (Number): Maximum time length of the replay buffer.
  3. [scheduler = Rx.Scheduler.currentThread] (Scheduler): Scheduler the observers are invoked on.

使用示例

var subject = new Rx.ReplaySubject(2 /* buffer size */);

subject.next('a');
subject.next('b');
subject.next('c');

var subscription = subject.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: b
// => Next: c

subject.next('d');
// => Next: d

 

圖例

 

See Also: 

http://reactivex.io/rxjs/manual/overview.html#subject

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM