rxjs合並數據流操作符


一、concat首尾相連

工作方式:

當第一個Observable對象complete之后,concat就會去subscribe第二個Observable對象獲取數據,把同樣的數據傳給下游。

直到最后一個Observable完結之后,concat產生的Observable也就完結了。

import { of,concat } from 'rxjs';
...  
  const source1$ = of(1,2,3)
  const source2$ = of(4,5,6)

  const source$ = concat(source1$,source2$)

  source$.subscribe(
    console.log,
    null,
    ()=>console.log('complete')
  )
...

輸出結果

 

 二、merge:先到先得快速通過

工作方式:

第一個Observable對象不完結,並不影響下游的observable對象,隨先得到,就先輸出。

當所有的Observable對象完結了,merge才會完結自己的Observable對象。

import { timer, merge } from 'rxjs';
import 'rxjs/add/operator/map'

...
  // 從第0毫秒開始,每隔1000毫秒產生一個數據,依次是:0A、1A、2A....
  const source1$ =  timer(0,1000).map( x=> x + 'A')

 // 從第500毫秒開始,每隔1000毫秒產生一個數據,依次是:0B、1B、2B... const source2$
= timer(500,1000).map( x=> x+'B') const source$ = merge(source1$,source2$) source$.subscribe( console.log, null, ()=>console.log('complete') ) ...

輸出結果前幾個數值如下:

 

 同步限流

merge有一個可選參數,放在最后。表示指定可以同時合並的observable對象個數。

如果前面兩個observable對象沒有完結,第三個Observable對象永遠不會進入source$。

const source1$ =  timer(0,1000).map( x=> x + 'A')
  const source2$ = timer(500,1000).map( x=> x+'B')
  const source3$ = timer(1000,1000).map( x=> x+'C')

  const source$ = merge(source1$,source2$,source3$,2)

merge的應用場景

fromEvent可以從網頁中獲取事件,但是fromEVent一次只能從一個DOM元素獲取一種類型的事件。

如果某個元素的click事件,也同時需要touchend事件,因為移動設備上touchend事件出現得比click更早。

這兩個事件的處理是一模一樣的。

此時需要借助merge的力量。

const source1$ =  fromEvent(document.querySelector('#text'),'click')
const source2$ = fromEvent(document.querySelector('#text'),'touchend')

const source$ = merge(source1$,source2$)

結論:

merge只對產生異步數據的Observable才有意義。

 三、zip:拉鏈式組合

工作方式:

一對一合並,如果source1$吐出數據更快,也會等着source2$數據吐出后一對一配對輸出。

即使source1$還有多余的數據,如果沒有配對數據,將會執行complete完結。

因此會造成數據擠壓問題。

zip也可以多個數據流合並。

  import { of, zip } from 'rxjs';
const source1$ =  of(1,2,3,4,5)
const source2$ = of('a','b','c')
const source$ = zip(source1$,source2$)

輸出結果:

 四、combineLatest合並最后一個數據

實例1:

import { timer, combineLatest } from 'rxjs';
// 從第500毫秒開始,每隔1000毫秒產生數據
const source1$ =  timer(500,1000)
// 從第1000毫秒開始,每隔1000毫秒產生數據 const source2$
= timer(1000,1000) const combineLatest$ = combineLatest(source1$,source2$)

輸出結果:(前幾個結果)

 

彈珠圖:

 

 

實例2:

    const source1$ =  timer(500,1000)
    const source2$ = of('a')

    const combineLatest$ = combineLatest(source1$,source2$)

執行結果:(前幾個結果)

 

 實例3:

    const source1$ =  of('a','b','c')
    const source2$ = of(1,2,3)

    const combineLatest$ = combineLatest(source1$,source2$)

執行結果:

 

 解析:

這是由combineLatest的⼯作⽅式決定的。
combineLatest會順序訂閱所有上游的Observable對象,
只有所有上游Observable對象都已經吐出數據了,
才會給下游傳遞所有上游“最新數據”組合的數據。在上⾯的例⼦中,
 
combineLatest的⼯作步驟如下:
  1)combineLatest訂閱source1$,不過,因為source1$是由of產⽣的同步數據流,在被訂閱時就會吐出所有數據,最后⼀個吐出的數據是字符串c。
  2)combineLatest訂閱source2$。
  3)source2$開始吐出數據,當吐出1時,和source1$的最后⼀個數據c組合傳給下游。
  4)source2$吐出2時,依然和source1$的最后⼀個數據c組合傳給下游。
  5)source2$吐出3時,還是和source1$的最后⼀個數據c組合傳給下游。

 實例4:

    const source1$ =  of('a','b','c')
    const source2$ = of(1,2,3)
    const source3$ = of('x','y')

    const combineLatest$ = combineLatest(source1$,source2$,source3$)

 

 執行結果:

 

 實例5:定制下游數據

zip和combineLatest一樣默認輸出的數據時數組形式,因此zip也和combineLatest一樣默認輸出的數據時數組形式,

同樣也可以利用最后一個函數參數來定制輸出數據的形式。

    const source1$ =  timer(500,1000)
    const source2$ = timer(1000,1000)
    const project = (a,b) => `${a} and ${b}`

    const combineLatest$ = combineLatest(source1$,source2$,project)

執行結果:

 五、withLatestFrom

工作方式:

withLatestFrom產生數據,只是更新兩者的最新數據,

如果兩者其中一個不是最新數據,也不會產生新的數據。

實例1:

import { timer } from 'rxjs';
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/withLatestFrom'

    const source1$ =  timer(0,2000).map( x=> 100 * x )
    const source2$ = timer(500,1000)
    const project = (a,b) => a + b

    const combineLatest$ = source1$.withLatestFrom(source2$,project)

執行結果:(前幾個)

 

 彈珠圖如下所示:

 六、解決glitch問題

實例1:

import { timer } from 'rxjs';
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/withLatestFrom'

    const original$ =  timer(0,1000)
    const source1$ = original$.map( x=> x+'a')
    const source2$ = original$.map( x=> x+'b')

    const combineLatest$ = source1$.withLatestFrom(source2$)

執行結果:(前幾個)

 

 實例2:

import { combineLatest, fromEvent } from 'rxjs';
import 'rxjs/add/operator/map'

    const event$ = fromEvent(document.body,'click');
    const x$ = event$.map(e => e.x);
    const y$ = event$.map(e => e.y);
    const project = (x,y) => `x:${x},y:${y}`;
    const result$ = combineLatest(x$,y$,project);

    result$.subscribe(
      (location) => {
        console.log('#render', location)
        document.querySelector('#text').innerHTML = location;
      }
    )

當點擊三次會出現以下結果:

 

 但是如果用withLatestFrom就可以解決以上問題:

減少不必要的網頁渲染。

const result$ = x$.withLatestFrom(x$,y$,project);

當點擊三次出現以下結果:

 七、race:勝者通吃

誰先吐出第一個數據為勝者,合並產生的數據就是勝者吐出的數據。

import { race, timer } from 'rxjs';
import 'rxjs/add/operator/map'
 
   const source1$ = timer(0,2000).map(e => e + 'a');
    const source2$ = timer(500,1000).map(e => e + 'b');
    const result$ = race(source1$,source2$)

    result$.subscribe(
      console.log,
      null,
      ()=>console.log('complete')
    )

執行結果:

 

 

彈珠圖

 

八、startWith:先吐出指定的若干個數據

實例1:

import { timer } from 'rxjs';
import 'rxjs/add/operator/startWith'

    const orginal$ = timer(0,1000)
    const result$ =  orginal$.startWith('start')

執行結果:

 

 實例2: 

同樣的效果,可以用concat來實現

    const orginal$ = timer(0,1000)
    const result$ = concat(of('start'),orginal$)

差異:

startWith的一點不足時所有參數都是同步吐出的,如果需要異步吐出參數,那還是只能利用concat。

 九、forkJoin:合並所有參數Observable對象的最后一個數據

實例:

import { interval, forkJoin } from 'rxjs';
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/take'    

    const source1$ = interval(1000).map( x=> x+'a' ).take(1);
    const source2$ = interval(1000).map( x => x +'b').take(3);
    const concated$ = forkJoin(source1$,source2$)

執行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM