RxJS 合並數據流 操作符學習


1. 合並類操作符 (Join Creation Operators)

  • combineLatest :合並最后一個數據

合並多個Observable創建一個Observable,其值是根據其每個輸入Observable的最新值計算得出的。

import { combineLatest, of ,timer} from 'rxjs';
import { delay, startWith , map} from 'rxjs/operators';

// combineLatest

// 合並兩個timer Observables
const firstTimer = timer(0,1000);
const secondTimer = timer(500,1000);

const combinedTimers = combineLatest(firstTimer,secondTimer);
combinedTimers.subscribe(value => console.log(`合並兩個timer${value}`))

// 合並多個數組

const observables = [1, 5, 10].map(
    n => of(n).pipe(
      delay(n * 1000),   // emit 0 and then emit n after n seconds
      startWith(0),
    )
  );
  const combined = combineLatest(observables);
  combined.subscribe(value => console.log(value));


// 計算體脂
const weight = of(70,72,76,79,75);
const height = of(1.76, 1.77, 1.78);
const bmi = combineLatest(weight,height).pipe(
    map(([w,h]) => w / (h*h))
);

bmi.subscribe(x => console.log("BMI is "+ x));
  • concat :⾸尾相連

把多個Observable中的數據內容依次合並

    import {concat, interval, range} from 'rxjs';
    import {take} from 'rxjs/operators';

    // 1.合並一個timer observable 和 一個 1到10的數據
    const timer = interval(1000).pipe(take(4));
    const sequence = range(1,10);
    const result = concat(timer,sequence);
    result.subscribe(
        x=>console.log(x)
    )
    // 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10

    // 2.合並三個 Observables

    const timer1 = interval(1000).pipe(take(10));
    const timer2 = interval(2000).pipe(take(6));
    const timer3 = interval(500).pipe(take(10));

    const result1 = concat(timer1,timer2,timer3);
    result1.subscribe(x=>console.log(x));

    // 先是每隔一秒輸出0,1,2,3,... 9, 然后每隔2秒輸出 0...5,然后每隔.5s 輸出 0....9

    // 3.合並兩個相同的 Observables
    const timer4 = interval(1000).pipe(take(2));

    concat(timer4, timer4)
    .subscribe(
        value => console.log(value),
        err => {},
        () => console.log('...and it is done!')
    )

    // 每隔一秒分別輸出0,1,0,1 ...and it is done! 一共4秒
  • forkJoin:forkJoin就是RxJS界的Promise.all

forkJoin等待所有輸⼊的Observable對象完結之后把最后⼀個數據合並。

    import { forkJoin, of, timer } from 'rxjs';

    //  字典,對象
    const observable = forkJoin(
        {
            foo:of(1,2,3,4),
            bar: Promise.resolve(8),
            baz:timer(4000)
        }
    )
    observable.subscribe({
        next: value => console.log(value),
        complete: () => console.log('This is how it ends!'),
    });

    // {foo: 4, bar: 8, baz: 0}
    // This is how it ends!

    //  數組
    const observable1 = forkJoin([
    of(1, 2, 3, 4),
    Promise.resolve(8),
    timer(4000),
    ]);
    observable1.subscribe({
    next: value => console.log(value),
    complete: () => console.log('This is how it ends!'),
    });
    //   [4, 8, 0]
    //   This is how it ends!
  • merge:先到先得快速通過

    通過將多個Observable的值混合到一個Observable中,將其平展在一起。

    import {merge, fromEvent, interval} from 'rxjs';
    
    //  1 interval 和 click 
    const clicks = fromEvent(document,'click');
    const timer = interval(1000);
    
    const clicksOrTimer = merge(clicks, timer);
    clicksOrTimer.subscribe(x => console.log(x));
    
    //  控制台每隔一秒輸出0 ...., 如果有click,輸出click
    
    //  2. 合並三個數據流,但是有兩個立馬執行。
    // timer1和timer2 立刻運行,timer3等待timer1和timer2結束之后運行
    const timer1 = interval(1000).pipe(take(10));
    const timer2 = interval(2000).pipe(take(6));
    const timer3 = interval(500).pipe(take(10));
    const concurrent = 2; 
    const merged = merge(timer1, timer2, timer3, concurrent);
    merged.subscribe(x => console.log(x))
    
  • race: 勝者通吃

第⼀個吐出數據的Observable對象就是勝者,race產⽣的Observable就會完全采⽤勝者Observable對象的數據,其余的輸⼊Observable對象則會被退訂⽽拋棄。

  import { race, interval } from 'rxjs';
  import { mapTo } from 'rxjs/operators';
  
  const obs1 = interval(1000).pipe(mapTo('fast one'));
  const obs2 = interval(3000).pipe(mapTo('medium one'));
  const obs3 = interval(5000).pipe(mapTo('slow one'));
  
  race(obs3, obs1, obs2)
  .subscribe(
  winner => console.log(winner)
  );

  // fast one
  • zip: 拉鏈式組合

組合多個Observable來創建一個Observable,其值是根據其每個輸入Observable的值依次計算得出的

  import { zip, of } from 'rxjs';
  import { map } from 'rxjs/operators';
  
  let age$ = of(27, 25, 29);
  let name$ = of('Foo', 'Bar', 'Beer');
  let isDev$ = of(true, true, false);
  
  zip(age$, name$, isDev$).pipe(
  map(([age, name, isDev]) => ({ age, name, isDev })),
  )
  .subscribe(x => console.log(x));
  // {age: 27, name: "Foo", isDev: true}
  // bundle.js:15 {age: 25, name: "Bar", isDev: true}
  // bundle.js:15 {age: 29, name: "Beer", isDev: false}

2. 高階操作符 (Higher Order Observable)

⾼階Observable的本質是⽤管理數據的⽅式來管理多個Observable對象.

1. ⾼階Observable的理解

普通Observable通常發出字符串或數值之類的普通值,但是很多時候需要處理可觀察的可觀察對象,所謂的高階Observable。

    //  高階observable
    import {interval} from 'rxjs';

    const ho$ = interval(1000).take(2).map(x=>interval(1500).map(y=>`${x}:${y}`).take(2))

    //⾼階Observable打開了⼀扇⼤門,⽤Observable來管理多個Observable對象。

 
    const clicks = fromEvent(document, 'click');
    const higherOrder = clicks.pipe(
    map(ev => interval(1000).pipe(take(4))),
    );

2. 操作⾼階Observable的合並類操作符

  • concatAll
import { fromEvent, interval } from 'rxjs';
import { map, take, concatAll } from 'rxjs/operators';
 
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(ev => interval(1000).pipe(take(4))),
);
const firstOrder = higherOrder.pipe(concatAll());
firstOrder.subscribe(x => console.log(x));
// 每次點擊會輸出0~3
  • mergeAll
import { fromEvent, interval } from 'rxjs';
import { map, mergeAll } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(map((ev) => interval(1000)));
const firstOrder = higherOrder.pipe(mergeAll());
firstOrder.subscribe(x => console.log(x));
  • zipAll
  • combineAll

3. 進化的⾼階Observable處理

  • 1.switch:切換輸⼊Observab
import { fromEvent, interval } from 'rxjs';
import { switchAll, map, tap } from 'rxjs/operators';
 
const clicks = fromEvent(document, 'click').pipe(tap(() => console.log('click')));
const source = clicks.pipe(map((ev) => interval(1000)));
 
source.pipe(
  switchAll()
).subscribe(x => console.log(x));
 
 /* Output
 click
 1
 2
 3
 4
 ...
 click
 1
 2
 3
 ...
 click
 ...
 */
  • 2.exhau
import { fromEvent, interval } from 'rxjs';
import { exhaust, map, take } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map((ev) => interval(1000).pipe(take(5))),
);
const result = higherOrder.pipe(exhaust());
result.subscribe(x => console.log(x));

/**
 * click
 * 0
 * 1
 * 2
 * 3
 * click
 * ...
 * / 
 *
 無論點擊多少次,執行一個

小結

學習了RxJS中合並多個Observable對象的⽅法,了解了高階Observable。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM