RxJS 合并数据流 操作符学习


1. 合并类操作符 (Join Creation Operators)

  • combineLatest :合并最后一个数据

合并多个Observable创建一个Observable,其值是根据其每个输入Observable的最新值计算得出的。

import { combineLatest, of ,timer} from 'rxjs';
import { delay, startWith , map} from 'rxjs/operators';

// combineLatest

// 合并两个timer Observables
const firstTimer = timer(0,1000);
const secondTimer = timer(500,1000);

const combinedTimers = combineLatest(firstTimer,secondTimer);
combinedTimers.subscribe(value => console.log(`合并两个timer${value}`))

// 合并多个数组

const observables = [1, 5, 10].map(
    n => of(n).pipe(
      delay(n * 1000),   // emit 0 and then emit n after n seconds
      startWith(0),
    )
  );
  const combined = combineLatest(observables);
  combined.subscribe(value => console.log(value));


// 计算体脂
const weight = of(70,72,76,79,75);
const height = of(1.76, 1.77, 1.78);
const bmi = combineLatest(weight,height).pipe(
    map(([w,h]) => w / (h*h))
);

bmi.subscribe(x => console.log("BMI is "+ x));
  • concat :⾸尾相连

把多个Observable中的数据内容依次合并

    import {concat, interval, range} from 'rxjs';
    import {take} from 'rxjs/operators';

    // 1.合并一个timer observable 和 一个 1到10的数据
    const timer = interval(1000).pipe(take(4));
    const sequence = range(1,10);
    const result = concat(timer,sequence);
    result.subscribe(
        x=>console.log(x)
    )
    // 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10

    // 2.合并三个 Observables

    const timer1 = interval(1000).pipe(take(10));
    const timer2 = interval(2000).pipe(take(6));
    const timer3 = interval(500).pipe(take(10));

    const result1 = concat(timer1,timer2,timer3);
    result1.subscribe(x=>console.log(x));

    // 先是每隔一秒输出0,1,2,3,... 9, 然后每隔2秒输出 0...5,然后每隔.5s 输出 0....9

    // 3.合并两个相同的 Observables
    const timer4 = interval(1000).pipe(take(2));

    concat(timer4, timer4)
    .subscribe(
        value => console.log(value),
        err => {},
        () => console.log('...and it is done!')
    )

    // 每隔一秒分别输出0,1,0,1 ...and it is done! 一共4秒
  • forkJoin:forkJoin就是RxJS界的Promise.all

forkJoin等待所有输⼊的Observable对象完结之后把最后⼀个数据合并。

    import { forkJoin, of, timer } from 'rxjs';

    //  字典,对象
    const observable = forkJoin(
        {
            foo:of(1,2,3,4),
            bar: Promise.resolve(8),
            baz:timer(4000)
        }
    )
    observable.subscribe({
        next: value => console.log(value),
        complete: () => console.log('This is how it ends!'),
    });

    // {foo: 4, bar: 8, baz: 0}
    // This is how it ends!

    //  数组
    const observable1 = forkJoin([
    of(1, 2, 3, 4),
    Promise.resolve(8),
    timer(4000),
    ]);
    observable1.subscribe({
    next: value => console.log(value),
    complete: () => console.log('This is how it ends!'),
    });
    //   [4, 8, 0]
    //   This is how it ends!
  • merge:先到先得快速通过

    通过将多个Observable的值混合到一个Observable中,将其平展在一起。

    import {merge, fromEvent, interval} from 'rxjs';
    
    //  1 interval 和 click 
    const clicks = fromEvent(document,'click');
    const timer = interval(1000);
    
    const clicksOrTimer = merge(clicks, timer);
    clicksOrTimer.subscribe(x => console.log(x));
    
    //  控制台每隔一秒输出0 ...., 如果有click,输出click
    
    //  2. 合并三个数据流,但是有两个立马执行。
    // timer1和timer2 立刻运行,timer3等待timer1和timer2结束之后运行
    const timer1 = interval(1000).pipe(take(10));
    const timer2 = interval(2000).pipe(take(6));
    const timer3 = interval(500).pipe(take(10));
    const concurrent = 2; 
    const merged = merge(timer1, timer2, timer3, concurrent);
    merged.subscribe(x => console.log(x))
    
  • race: 胜者通吃

第⼀个吐出数据的Observable对象就是胜者,race产⽣的Observable就会完全采⽤胜者Observable对象的数据,其余的输⼊Observable对象则会被退订⽽抛弃。

  import { race, interval } from 'rxjs';
  import { mapTo } from 'rxjs/operators';
  
  const obs1 = interval(1000).pipe(mapTo('fast one'));
  const obs2 = interval(3000).pipe(mapTo('medium one'));
  const obs3 = interval(5000).pipe(mapTo('slow one'));
  
  race(obs3, obs1, obs2)
  .subscribe(
  winner => console.log(winner)
  );

  // fast one
  • zip: 拉链式组合

组合多个Observable来创建一个Observable,其值是根据其每个输入Observable的值依次计算得出的

  import { zip, of } from 'rxjs';
  import { map } from 'rxjs/operators';
  
  let age$ = of(27, 25, 29);
  let name$ = of('Foo', 'Bar', 'Beer');
  let isDev$ = of(true, true, false);
  
  zip(age$, name$, isDev$).pipe(
  map(([age, name, isDev]) => ({ age, name, isDev })),
  )
  .subscribe(x => console.log(x));
  // {age: 27, name: "Foo", isDev: true}
  // bundle.js:15 {age: 25, name: "Bar", isDev: true}
  // bundle.js:15 {age: 29, name: "Beer", isDev: false}

2. 高阶操作符 (Higher Order Observable)

⾼阶Observable的本质是⽤管理数据的⽅式来管理多个Observable对象.

1. ⾼阶Observable的理解

普通Observable通常发出字符串或数值之类的普通值,但是很多时候需要处理可观察的可观察对象,所谓的高阶Observable。

    //  高阶observable
    import {interval} from 'rxjs';

    const ho$ = interval(1000).take(2).map(x=>interval(1500).map(y=>`${x}:${y}`).take(2))

    //⾼阶Observable打开了⼀扇⼤门,⽤Observable来管理多个Observable对象。

 
    const clicks = fromEvent(document, 'click');
    const higherOrder = clicks.pipe(
    map(ev => interval(1000).pipe(take(4))),
    );

2. 操作⾼阶Observable的合并类操作符

  • concatAll
import { fromEvent, interval } from 'rxjs';
import { map, take, concatAll } from 'rxjs/operators';
 
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(ev => interval(1000).pipe(take(4))),
);
const firstOrder = higherOrder.pipe(concatAll());
firstOrder.subscribe(x => console.log(x));
// 每次点击会输出0~3
  • mergeAll
import { fromEvent, interval } from 'rxjs';
import { map, mergeAll } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(map((ev) => interval(1000)));
const firstOrder = higherOrder.pipe(mergeAll());
firstOrder.subscribe(x => console.log(x));
  • zipAll
  • combineAll

3. 进化的⾼阶Observable处理

  • 1.switch:切换输⼊Observab
import { fromEvent, interval } from 'rxjs';
import { switchAll, map, tap } from 'rxjs/operators';
 
const clicks = fromEvent(document, 'click').pipe(tap(() => console.log('click')));
const source = clicks.pipe(map((ev) => interval(1000)));
 
source.pipe(
  switchAll()
).subscribe(x => console.log(x));
 
 /* Output
 click
 1
 2
 3
 4
 ...
 click
 1
 2
 3
 ...
 click
 ...
 */
  • 2.exhau
import { fromEvent, interval } from 'rxjs';
import { exhaust, map, take } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map((ev) => interval(1000).pipe(take(5))),
);
const result = higherOrder.pipe(exhaust());
result.subscribe(x => console.log(x));

/**
 * click
 * 0
 * 1
 * 2
 * 3
 * click
 * ...
 * / 
 *
 无论点击多少次,执行一个

小结

学习了RxJS中合并多个Observable对象的⽅法,了解了高阶Observable。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM