歡迎指導與討論 :)
前言
本文是筆者翻譯 RxJS 5.X 官網各類operation操作系列的的第三篇 —— Combination組合與Multicasting廣播。如有錯漏,希望大家指出提醒O(∩_∩)O。更詳細的資料盡在rxjs官網 【http://reactivex.io/rxjs/manual/overview.htm】與帶有demo例子的網站【http://xgrommx.github.io/rx-book/content/observable】。
本文有關Combination操作的內容有:combineAll、combineLastest、concat、cancatAll、exhaust、forkJoin、merger、mergeAll、race、startWith、switch、withLastestFrom、zip、zipAll
有關Multicasting操作的內容有:cache、multicast、publish、publishBehavior、publishLast、publishReplay、share
一、combineAll
將高階Observable轉化為一階。當外層Observable結束時,對每個內層Observable使用combineLastest,並最終以數組項的形式返回每個內層Observable的最新值。
// 點擊三次后,外層Observable結束 // 然后對三個內層Observable使用combineLastest Rx.Observable.fromEvent( document, 'click') .map( ev => Rx.Observable.of( 1, 2, 3 )) .take( 3 ) .combineAll( ) .subscribe( x => console.log( x )); // 輸出 [ 3, 3, 1 ], [ 3, 3, 2 ], [ 3, 3, 3]
二、combineLastest
每當其中一個Observable發射值時,都會以數組的形式發射兩個Observable的最新值。
Rx.Observable.interval( 1000 ) .combineLatest( Rx.Observable.timer( 1000 , 2000 )) .subscribe( x => console.log( x )); // 輸出 [ 0, 0 ], [ 1, 0 ], [ 2, 1 ], [ 3, 2 ]...
三、concat
鏈式拼接兩個Observable的結果
Rx.Observable.of( 1, 2, 3 ) .concat( 'a', 'b', 'c' ) .subscribe( x => console.log( x )); // 輸出:1,2,3,a,b,c
四、cancatAll
將高階Observable轉化為一階。並將結果以鏈式拼接的形式進行發射。
Rx.Observable.fromEvent( document, 'click') .mapTo( Rx.Observable.interval( 1000 ).take( 3 )) .concatAll( ) .subscribe( x => console.log( x )); // 點擊三次,則輸出:0 1 2, 0 1 2, 0 1 2
五、exhaust
將高階Observable轉化為一階。並將結果以鏈式拼接的形式進行發射。但是,當前一個內層Observable的值還沒有發射完畢時,不會接受下一個內層Observable,並不會把它的值拼接到外層Observable中。
Rx.Observable.fromEvent( document, 'click') .mapTo( Rx.Observable.interval( 1000 ).take( 3 )) .exhaust( ) .subscribe( x => console.log( x )); // 在值被發射時,點擊不會產生新的Observable並進行值的拼接
六、forkJoin
將多個Observable進行並行計算,並數組的形式返回它們各自的最新值
Rx.Observable.forkJoin( Rx.Observable.of(42), Rx.Observable.range(0, 10), Rx.Observable.of(1,2,3) ) .subscribe( x => console.log( x )) // 輸出: [ 42, 10, 3 ]
七、merger
將兩個Observable進行合並,每當其中一個Observable發射值時,都會被外層Observer所收到。
Rx.Observable.fromEvent( document, 'click' ) .merge( Rx.Observable.interval( 1000 )) .subscribe( x => console.log( x )); // 不點擊的情況下,每秒輸出i, i為從零到n // 點擊一下,馬上輸出 $mouseEvent
八、mergeAll
將多個Observable進行合並,每當其中一個Observable發射值時,都會被外層Observer所收到。
Rx.Observable.fromEvent( document, 'click' ) .mapTo( Rx.Observable.interval( 1000 )) .mergeAll( ) .subscribe( x => console.log( x )); // 每次點擊,新增一個計時器,並把它合並到同一個流中
九、race —— 暫無
十、startWith
在源Observable開頭插入一個指定的值
Rx.Observable.interval( 1000 ) .startWith( 123 ) .subscribe( x => console.log( x )); // 輸出:123, 1, 2, 3,....
十一、switch
將高階Observable轉化為一階Observable。當生成新的內層Observable時,外層流會丟棄上一個內層Observable,並發射新的內層Observable的一系列的值,並重復上述過程。既有,重新開始的意思。
Rx.Observable.fromEvent( document, 'click' ) .mapTo( Rx.Observable.interval( 1000 )) .switch( ) .subscribe( x => console.log( x )); // 每次點擊會重新輸出:0,1,2,3,4,5....
十一、withLastestFrom
每當源Observable發射新的值時,會以數組的形式,把源Observable和另一個Observable的最新值進行組合並發射。
Rx.Observable.fromEvent( document, 'click' ) .withLatestFrom( Rx.Observable.interval( 1000 )) .subscribe( x => console.log( x )); // 每次點擊輸出: [ MouseEvent, x ], x是計時器的最新值
十二、zip
組合多個Observable,並生成一個新的Observable,其值能夠通過每個Observable的值,和自定義函數進行定義。
let age$ = Rx.Observable.of<number>(27, 25, 29); let name$ = Rx.Observable.of<string>('Foo', 'Bar', 'Beer'); let isDev$ = Rx.Observable.of<boolean>(true, true, false); Rx.Observable .zip(age$, name$, isDev$, (age: number, name: string, isDev: boolean) => ({ age, name, isDev })) .subscribe(x => console.log(x)); // 輸出 // {age: 27. name: 'Foo', isDev: true } // {age: 25. name: 'Bar', isDev: true } // {age: 29. name: 'Bear', isDev: false}
十三、zipAll —— 暫無
十四、cache —— 暫無
十五、multicast
返回一個ConnectableObservable。每一個訂閱了同一個Observable的observer,實際上是擁有不同的、獨立的Observable的執行( 原文:each subscribed Observer owns an independent execution of the Observable ),而Subject是多播的。
var source = Rx.Observable.create((o)=>{ o.next(1);o.next(2); }); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 原理是`subject.subscribe({...})`:返回的multicasted是一個connectableObservable
multicasted.subscribe({next: (v) => console.log('observerA: ' + v)}); multicasted.subscribe({next: (v) => console.log('observerB: ' + v)}); // 原理是 `source.subscribe(subject)`:
multicasted.connect(); // observerA: 1 observerB: 1 // observerA: 2 observerB: 2
十六、publish
返回一個ConnectableObservable。可進行廣播
var a$ = Rx.Observable.interval( 1000 ) .publish( ) a$.subscribe({next: (v) => console.log( v )}); a$.subscribe({next: (v) => console.log( v )}); a$.connect(); // 同時輸出 1 1, 2 2, 3 3...
十七、publishBehavior —— 暫無
十八、publishLast
返回一個ConnectableObservable。且只廣播該Observable的最后一個值
var a$ = Rx.Observable.of( 1, 3, 4, 5) .publishLast( ) a$.subscribe({next: (v) => console.log('observerA: ' + v)}); a$.subscribe({next: (v) => console.log('observerB: ' + v)}); a$.connect( ) // 輸出 // observerA: 5 // observerB: 5
十九、publishReplay
返回一個ConnectableObservable。且當第二個及以后才訂閱這個ConnectableObservable時,只會受到最新的n個值,這個n由我們以參數的形式提供
var a$ = Rx.Observable.of(1,2,3,4,5) .publishReplay( 3 ) .refCount( ) a$.subscribe({next: (v) => console.log('observerA: ' + v)}); a$.subscribe({next: (v) => console.log('observerB: ' + v)}); a$.subscribe({next: (v) => console.log('observerC: ' + v)}); // 輸出 // observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 // observerB: 3 observerB: 4 observerB: 5 // observerC: 3 observerC: 4 observerC: 5
二十、share
返回一個可被共享的Observable。它是 .publish( ).refCount( )的另一種寫法
var a$ = Rx.Observable.interval( 1000 ) .share( ) a$.subscribe({next: (v) => console.log( v )}); a$.subscribe({next: (v) => console.log( v )}); // 同時輸出 1 1, 2 2, 3 3...