1. 合並類操作符 (Join Creation Operators)
- combineLatest :合並最后一個數據
合並多個Observable創建一個Observable,其值是根據其每個輸入Observable的最新值計算得出的。
import { combineLatest, of ,timer} from 'rxjs';
import { delay, startWith , map} from 'rxjs/operators';
// combineLatest
// 合並兩個timer Observables
const firstTimer = timer(0,1000);
const secondTimer = timer(500,1000);
const combinedTimers = combineLatest(firstTimer,secondTimer);
combinedTimers.subscribe(value => console.log(`合並兩個timer${value}`))
// 合並多個數組
const observables = [1, 5, 10].map(
n => of(n).pipe(
delay(n * 1000), // emit 0 and then emit n after n seconds
startWith(0),
)
);
const combined = combineLatest(observables);
combined.subscribe(value => console.log(value));
// 計算體脂
const weight = of(70,72,76,79,75);
const height = of(1.76, 1.77, 1.78);
const bmi = combineLatest(weight,height).pipe(
map(([w,h]) => w / (h*h))
);
bmi.subscribe(x => console.log("BMI is "+ x));
- concat :⾸尾相連
把多個Observable中的數據內容依次合並
import {concat, interval, range} from 'rxjs';
import {take} from 'rxjs/operators';
// 1.合並一個timer observable 和 一個 1到10的數據
const timer = interval(1000).pipe(take(4));
const sequence = range(1,10);
const result = concat(timer,sequence);
result.subscribe(
x=>console.log(x)
)
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
// 2.合並三個 Observables
const timer1 = interval(1000).pipe(take(10));
const timer2 = interval(2000).pipe(take(6));
const timer3 = interval(500).pipe(take(10));
const result1 = concat(timer1,timer2,timer3);
result1.subscribe(x=>console.log(x));
// 先是每隔一秒輸出0,1,2,3,... 9, 然后每隔2秒輸出 0...5,然后每隔.5s 輸出 0....9
// 3.合並兩個相同的 Observables
const timer4 = interval(1000).pipe(take(2));
concat(timer4, timer4)
.subscribe(
value => console.log(value),
err => {},
() => console.log('...and it is done!')
)
// 每隔一秒分別輸出0,1,0,1 ...and it is done! 一共4秒
- forkJoin:forkJoin就是RxJS界的Promise.all
forkJoin等待所有輸⼊的Observable對象完結之后把最后⼀個數據合並。
import { forkJoin, of, timer } from 'rxjs';
// 字典,對象
const observable = forkJoin(
{
foo:of(1,2,3,4),
bar: Promise.resolve(8),
baz:timer(4000)
}
)
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// {foo: 4, bar: 8, baz: 0}
// This is how it ends!
// 數組
const observable1 = forkJoin([
of(1, 2, 3, 4),
Promise.resolve(8),
timer(4000),
]);
observable1.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// [4, 8, 0]
// This is how it ends!
- merge:先到先得快速通過
通過將多個Observable的值混合到一個Observable中,將其平展在一起。
import {merge, fromEvent, interval} from 'rxjs'; // 1 interval 和 click const clicks = fromEvent(document,'click'); const timer = interval(1000); const clicksOrTimer = merge(clicks, timer); clicksOrTimer.subscribe(x => console.log(x)); // 控制台每隔一秒輸出0 ...., 如果有click,輸出click // 2. 合並三個數據流,但是有兩個立馬執行。 // timer1和timer2 立刻運行,timer3等待timer1和timer2結束之后運行 const timer1 = interval(1000).pipe(take(10)); const timer2 = interval(2000).pipe(take(6)); const timer3 = interval(500).pipe(take(10)); const concurrent = 2; const merged = merge(timer1, timer2, timer3, concurrent); merged.subscribe(x => console.log(x)) - race: 勝者通吃
第⼀個吐出數據的Observable對象就是勝者,race產⽣的Observable就會完全采⽤勝者Observable對象的數據,其余的輸⼊Observable對象則會被退訂⽽拋棄。
import { race, interval } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const obs1 = interval(1000).pipe(mapTo('fast one'));
const obs2 = interval(3000).pipe(mapTo('medium one'));
const obs3 = interval(5000).pipe(mapTo('slow one'));
race(obs3, obs1, obs2)
.subscribe(
winner => console.log(winner)
);
// fast one
- zip: 拉鏈式組合
組合多個Observable來創建一個Observable,其值是根據其每個輸入Observable的值依次計算得出的
import { zip, of } from 'rxjs';
import { map } from 'rxjs/operators';
let age$ = of(27, 25, 29);
let name$ = of('Foo', 'Bar', 'Beer');
let isDev$ = of(true, true, false);
zip(age$, name$, isDev$).pipe(
map(([age, name, isDev]) => ({ age, name, isDev })),
)
.subscribe(x => console.log(x));
// {age: 27, name: "Foo", isDev: true}
// bundle.js:15 {age: 25, name: "Bar", isDev: true}
// bundle.js:15 {age: 29, name: "Beer", isDev: false}
2. 高階操作符 (Higher Order Observable)
⾼階Observable的本質是⽤管理數據的⽅式來管理多個Observable對象.
1. ⾼階Observable的理解
普通Observable通常發出字符串或數值之類的普通值,但是很多時候需要處理可觀察的可觀察對象,所謂的高階Observable。
// 高階observable
import {interval} from 'rxjs';
const ho$ = interval(1000).take(2).map(x=>interval(1500).map(y=>`${x}:${y}`).take(2))
//⾼階Observable打開了⼀扇⼤門,⽤Observable來管理多個Observable對象。
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(ev => interval(1000).pipe(take(4))),
);
2. 操作⾼階Observable的合並類操作符
- concatAll
import { fromEvent, interval } from 'rxjs';
import { map, take, concatAll } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(ev => interval(1000).pipe(take(4))),
);
const firstOrder = higherOrder.pipe(concatAll());
firstOrder.subscribe(x => console.log(x));
// 每次點擊會輸出0~3
- mergeAll
import { fromEvent, interval } from 'rxjs';
import { map, mergeAll } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(map((ev) => interval(1000)));
const firstOrder = higherOrder.pipe(mergeAll());
firstOrder.subscribe(x => console.log(x));
- zipAll
- combineAll
3. 進化的⾼階Observable處理
- 1.switch:切換輸⼊Observab
import { fromEvent, interval } from 'rxjs';
import { switchAll, map, tap } from 'rxjs/operators';
const clicks = fromEvent(document, 'click').pipe(tap(() => console.log('click')));
const source = clicks.pipe(map((ev) => interval(1000)));
source.pipe(
switchAll()
).subscribe(x => console.log(x));
/* Output
click
1
2
3
4
...
click
1
2
3
...
click
...
*/
- 2.exhau
import { fromEvent, interval } from 'rxjs';
import { exhaust, map, take } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map((ev) => interval(1000).pipe(take(5))),
);
const result = higherOrder.pipe(exhaust());
result.subscribe(x => console.log(x));
/**
* click
* 0
* 1
* 2
* 3
* click
* ...
* /
*
無論點擊多少次,執行一個
小結
學習了RxJS中合並多個Observable對象的⽅法,了解了高階Observable。
