1. 合并类操作符 (Join Creation Operators)
- combineLatest :合并最后一个数据
合并多个Observable创建一个Observable,其值是根据其每个输入Observable的最新值计算得出的。
import { combineLatest, of ,timer} from 'rxjs';
import { delay, startWith , map} from 'rxjs/operators';
// combineLatest
// 合并两个timer Observables
const firstTimer = timer(0,1000);
const secondTimer = timer(500,1000);
const combinedTimers = combineLatest(firstTimer,secondTimer);
combinedTimers.subscribe(value => console.log(`合并两个timer${value}`))
// 合并多个数组
const observables = [1, 5, 10].map(
n => of(n).pipe(
delay(n * 1000), // emit 0 and then emit n after n seconds
startWith(0),
)
);
const combined = combineLatest(observables);
combined.subscribe(value => console.log(value));
// 计算体脂
const weight = of(70,72,76,79,75);
const height = of(1.76, 1.77, 1.78);
const bmi = combineLatest(weight,height).pipe(
map(([w,h]) => w / (h*h))
);
bmi.subscribe(x => console.log("BMI is "+ x));
- concat :⾸尾相连
把多个Observable中的数据内容依次合并
import {concat, interval, range} from 'rxjs';
import {take} from 'rxjs/operators';
// 1.合并一个timer observable 和 一个 1到10的数据
const timer = interval(1000).pipe(take(4));
const sequence = range(1,10);
const result = concat(timer,sequence);
result.subscribe(
x=>console.log(x)
)
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
// 2.合并三个 Observables
const timer1 = interval(1000).pipe(take(10));
const timer2 = interval(2000).pipe(take(6));
const timer3 = interval(500).pipe(take(10));
const result1 = concat(timer1,timer2,timer3);
result1.subscribe(x=>console.log(x));
// 先是每隔一秒输出0,1,2,3,... 9, 然后每隔2秒输出 0...5,然后每隔.5s 输出 0....9
// 3.合并两个相同的 Observables
const timer4 = interval(1000).pipe(take(2));
concat(timer4, timer4)
.subscribe(
value => console.log(value),
err => {},
() => console.log('...and it is done!')
)
// 每隔一秒分别输出0,1,0,1 ...and it is done! 一共4秒
- forkJoin:forkJoin就是RxJS界的Promise.all
forkJoin等待所有输⼊的Observable对象完结之后把最后⼀个数据合并。
import { forkJoin, of, timer } from 'rxjs';
// 字典,对象
const observable = forkJoin(
{
foo:of(1,2,3,4),
bar: Promise.resolve(8),
baz:timer(4000)
}
)
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// {foo: 4, bar: 8, baz: 0}
// This is how it ends!
// 数组
const observable1 = forkJoin([
of(1, 2, 3, 4),
Promise.resolve(8),
timer(4000),
]);
observable1.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// [4, 8, 0]
// This is how it ends!
- merge:先到先得快速通过
通过将多个Observable的值混合到一个Observable中,将其平展在一起。
import {merge, fromEvent, interval} from 'rxjs'; // 1 interval 和 click const clicks = fromEvent(document,'click'); const timer = interval(1000); const clicksOrTimer = merge(clicks, timer); clicksOrTimer.subscribe(x => console.log(x)); // 控制台每隔一秒输出0 ...., 如果有click,输出click // 2. 合并三个数据流,但是有两个立马执行。 // timer1和timer2 立刻运行,timer3等待timer1和timer2结束之后运行 const timer1 = interval(1000).pipe(take(10)); const timer2 = interval(2000).pipe(take(6)); const timer3 = interval(500).pipe(take(10)); const concurrent = 2; const merged = merge(timer1, timer2, timer3, concurrent); merged.subscribe(x => console.log(x))
- race: 胜者通吃
第⼀个吐出数据的Observable对象就是胜者,race产⽣的Observable就会完全采⽤胜者Observable对象的数据,其余的输⼊Observable对象则会被退订⽽抛弃。
import { race, interval } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const obs1 = interval(1000).pipe(mapTo('fast one'));
const obs2 = interval(3000).pipe(mapTo('medium one'));
const obs3 = interval(5000).pipe(mapTo('slow one'));
race(obs3, obs1, obs2)
.subscribe(
winner => console.log(winner)
);
// fast one
- zip: 拉链式组合
组合多个Observable来创建一个Observable,其值是根据其每个输入Observable的值依次计算得出的
import { zip, of } from 'rxjs';
import { map } from 'rxjs/operators';
let age$ = of(27, 25, 29);
let name$ = of('Foo', 'Bar', 'Beer');
let isDev$ = of(true, true, false);
zip(age$, name$, isDev$).pipe(
map(([age, name, isDev]) => ({ age, name, isDev })),
)
.subscribe(x => console.log(x));
// {age: 27, name: "Foo", isDev: true}
// bundle.js:15 {age: 25, name: "Bar", isDev: true}
// bundle.js:15 {age: 29, name: "Beer", isDev: false}
2. 高阶操作符 (Higher Order Observable)
⾼阶Observable的本质是⽤管理数据的⽅式来管理多个Observable对象.
1. ⾼阶Observable的理解
普通Observable通常发出字符串或数值之类的普通值,但是很多时候需要处理可观察的可观察对象,所谓的高阶Observable。
// 高阶observable
import {interval} from 'rxjs';
const ho$ = interval(1000).take(2).map(x=>interval(1500).map(y=>`${x}:${y}`).take(2))
//⾼阶Observable打开了⼀扇⼤门,⽤Observable来管理多个Observable对象。
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(ev => interval(1000).pipe(take(4))),
);
2. 操作⾼阶Observable的合并类操作符
- concatAll
import { fromEvent, interval } from 'rxjs';
import { map, take, concatAll } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(ev => interval(1000).pipe(take(4))),
);
const firstOrder = higherOrder.pipe(concatAll());
firstOrder.subscribe(x => console.log(x));
// 每次点击会输出0~3
- mergeAll
import { fromEvent, interval } from 'rxjs';
import { map, mergeAll } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(map((ev) => interval(1000)));
const firstOrder = higherOrder.pipe(mergeAll());
firstOrder.subscribe(x => console.log(x));
- zipAll
- combineAll
3. 进化的⾼阶Observable处理
- 1.switch:切换输⼊Observab
import { fromEvent, interval } from 'rxjs';
import { switchAll, map, tap } from 'rxjs/operators';
const clicks = fromEvent(document, 'click').pipe(tap(() => console.log('click')));
const source = clicks.pipe(map((ev) => interval(1000)));
source.pipe(
switchAll()
).subscribe(x => console.log(x));
/* Output
click
1
2
3
4
...
click
1
2
3
...
click
...
*/
- 2.exhau
import { fromEvent, interval } from 'rxjs';
import { exhaust, map, take } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map((ev) => interval(1000).pipe(take(5))),
);
const result = higherOrder.pipe(exhaust());
result.subscribe(x => console.log(x));
/**
* click
* 0
* 1
* 2
* 3
* click
* ...
* /
*
无论点击多少次,执行一个
小结
学习了RxJS中合并多个Observable对象的⽅法,了解了高阶Observable。