介紹RxJS前,先介紹Observable
可觀察對象(Observable)
可觀察對象支持在應用中的發布者和訂閱者之間傳遞消息。
可觀察對象可以發送多個任意類型的值 —— 字面量、消息、事件。
基本用法和詞匯
作為發布者,你創建一個 Observable 的實例,其中定義了一個訂閱者(subscriber)函數。 當有消費者調用 subscribe() 方法時,這個函數就會執行。 訂閱者函數用於定義“如何獲取或生成那些要發布的值或消息”。
要執行所創建的可觀察對象,並開始從中接收通知,你就要調用它的 subscribe() 方法,並傳入一個觀察者(observer)。 這是一個 JavaScript 對象,它定義了你收到的這些消息的處理器(handler)。 subscribe() 調用會返回一個 Subscription 對象,該對象具有一個 unsubscribe() 方法。 當調用該方法時,你就會停止接收通知。
const locations = new Observable((observer) => {
// Get the next and error callbacks. These will be passed in when
// the consumer subscribes.
const {next, error} = observer;
let watchId;
// Simple geolocation API check provides values to publish
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition(next, error);
} else {
error('Geolocation not available');
}
// When the consumer unsubscribes, clean up data ready for next subscription.
return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});
// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
next(position) { console.log('Current Position: ', position); },
error(msg) { console.log('Error Getting Location: ', msg); }
});
// Stop listening for location after 10 seconds
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
定義觀察者
用於接收可觀察對象通知的處理器要實現 Observer 接口。這個對象定義了一些回調函數來處理可觀察對象可能會發來的三種通知
通知類型 | 說明 |
---|---|
next |
必要。用來處理每個送達值。在開始執行后可能執行零次或多次。 |
error |
可選。用來處理錯誤通知。錯誤會中斷這個可觀察對象實例的執行過程。 |
complete |
可選。用來處理執行完畢(complete)通知。當執行完畢后,這些值就會繼續傳給下一個處理器。 |
訂閱
只有當有人訂閱 Observable 的實例時,它才會開始發布值。
const myObservable = Observable.of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
subscribe() 方法還可以接收定義在同一行中的回調函數,無論 next、error 還是 complete 處理器,下面的代碼和剛才的等價:
myObservable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
創建可觀察對象
使用 Observable 構造函數可以創建任何類型的可觀察流。
下面是一個例子:
function fromEvent(target, eventName) {
return new Observable((observer) => {
// 事件處理函數,每次執行eventName,觀察者observer就next一條數據
const handler = (e) => observer.next(e);
// 添加事件綁定
target.addEventListener(eventName, handler);
return () => {
// 退訂
target.removeEventListener(eventName, handler);
};
});
}
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});
多播
多播用來讓可觀察對象在一次執行中同時廣播給多個訂閱者。借助支持多播的可觀察對象,你不必注冊多個監聽器,而是復用第一個(next)監聽器,並且把值發送給各個訂閱者。
多播的核心是,將observers放到一個數組,然后遍歷
function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
// Keep track of each observer (one for every active subscription)
const observers = [];
// Still a single timeoutId because there will only ever be one
// set of values being generated, multicasted to each subscriber
let timeoutId;
// Return the subscriber function (runs when subscribe()
// function is invoked)
return (observer) => {
observers.push(observer);
// When this is the first subscription, start the sequence
if (observers.length === 1) {
timeoutId = doSequence({
next(val) {
// Iterate through observers and notify all subscriptions
observers.forEach(obs => obs.next(val));
},
complete() {
// Notify all complete callbacks
observers.forEach(obs => obs.complete());
}
}, seq, 0);
}
return {
unsubscribe() {
// Remove from the observers array so it's no longer notified
observers.splice(observers.indexOf(observer), 1);
// If there's no more listeners, do cleanup
if (observers.length === 0) {
clearTimeout(timeoutId);
}
}
};
};
}
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, idx++);
}
}, 1000);
}
// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber);
// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
multicastSequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 1500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
RxJS 庫
RxJS(響應式擴展的 JavaScript 版)是一個使用可觀察對象進行響應式編程的庫,它讓組合異步代碼和基於回調的代碼變得更簡單,RxJS 提供了一種對 Observable 類型的實現.。
這些工具函數可用於:
- 把現有的異步代碼轉換成可觀察對象
- 迭代流中的各個值
- 把這些值映射成其它類型
- 對流進行過濾
- 組合多個流
創建可觀察對象的函數
RxJS 提供了一些用來創建可觀察對象的函數。這些函數可以簡化根據某些東西創建可觀察對象的過程,比如事件、定時器、promises等等。比如:
從promise創建一個Observable:
import { fromPromise } from 'rxjs';
// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
next(response) { console.log(response); },
error(err) { console.error('Error: ' + err); },
complete() { console.log('Completed'); }
});
從一個事件創建一個observable:
import { fromEvent } from 'rxjs';
const el = document.getElementById('my-element');
// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');
// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
// Log coords of mouse movements
console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
// When the mouse is over the upper-left of the screen,
// unsubscribe to stop listening for mouse movements
if (evt.clientX < 40 && evt.clientY < 40) {
subscription.unsubscribe();
}
});
從ajax創建一個observable:
import { ajax } from 'rxjs/ajax';
// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
操作符
操作符是基於可觀察對象構建的一些對集合進行復雜操作的函數.,常見的有 map()、filter()、concat() 和 flatMap()
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9
常用操作符
類別 | 操作 |
---|---|
創建 | from , fromPromise , fromEvent , of |
組合 | combineLatest , concat , merge , startWith , withLatestFrom , zip |
過濾 | debounceTime , distinctUntilChanged , filter , take , takeUntil |
轉換 | bufferTime , concatMap , map , mergeMap , scan , switchMap |
工具 | tap |
多播 | share |
錯誤處理
除了可以在訂閱時提供 error() 處理器外,RxJS 還提供了 catchError 操作符,它允許你在管道中處理已知錯誤。
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
重試失敗的可觀察對象
可以使用retry重試失敗的操作
import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
retry(3), // Retry up to 3 times before failing
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
可觀察對象的命名約定
約定俗成的,可觀察對象的名字以“$”符號結尾。
同樣的,如果你希望用某個屬性來存儲來自可觀察對象的最近一個值,它的命名慣例是與可觀察對象同名,但不帶“$”后綴。
import { Component } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-stopwatch',
templateUrl: './stopwatch.component.html'
})
export class StopwatchComponent {
// 最近一次值
stopwatchValue: number;
// 可觀察對象
stopwatchValue$: Observable<number>;
start() {
this.stopwatchValue$.subscribe(num =>
this.stopwatchValue = num
);
}
}
Angular中的observables
Angular 中大量使用了可觀察對象,作為處理各種常用異步操作的接口。
比如:
EventEmitter
類派生自Observable
。- HTTP 模塊使用可觀察對象來處理 AJAX 請求和響應
- 路由器和表單模塊使用可觀察對象來監聽對用戶輸入事件的響應
事件發送器 EventEmitter
Angular 提供了一個 EventEmitter 類,它用來從組件的 @Output() 屬性中發布一些值。EventEmitter 擴展了 Observable,並添加了一個 emit() 方法,這樣它就可以發送任意值了。當你調用 emit() 時,就會把所發送的值傳給訂閱上來的觀察者的 next() 方法
@Component({
selector: 'zippy',
template: `
<div class="zippy">
<div (click)="toggle()">Toggle</div>
<div [hidden]="!visible">
<ng-content></ng-content>
</div>
</div>`})
export class ZippyComponent {
visible = true;
@Output() open = new EventEmitter<any>();
@Output() close = new EventEmitter<any>();
toggle() {
this.visible = !this.visible;
if (this.visible) {
this.open.emit(null);
} else {
this.close.emit(null);
}
}
}
HTTP
Angular 的 HttpClient 從 HTTP 方法調用中返回了可觀察對象。例如,http.get(‘/api’) 就會返回可觀察對象。
為什么NG使用observable而不是Promise?
- 可觀察對象不會修改服務器的響應(和在承諾上串聯起來的
.then()
調用一樣)。反之,你可以使用一系列操作符來按需轉換這些值 - HTTP 請求是可以通過
unsubscribe()
方法來取消的 - 請求可以進行配置,以獲取進度事件的變化
- 失敗的請求很容易重試
Async 管道
AsyncPipe 會訂閱一個可觀察對象或承諾,並返回其發出的最后一個值。當發出新值時,該管道就會把這個組件標記為需要進行變更檢查的(因此可能導致刷新界面)
@Component({
selector: 'async-observable-pipe',
template: `<div><code>observable|async</code>:
Time: {{ time | async }}</div>`
})
export class AsyncObservablePipeComponent {
time = new Observable(observer =>
setInterval(() => observer.next(new Date().toString()), 1000)
);
}
路由器 (router)
Router.events 以可觀察對象的形式提供了其事件。 你可以使用 RxJS 中的 filter() 操作符來找到感興趣的事件,並且訂閱它們,以便根據瀏覽過程中產生的事件序列作出決定。
import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {
navStart: Observable<NavigationStart>;
constructor(private router: Router) {
// 通過filter過濾,只關注自己感興趣的
this.navStart = router.events.pipe(
filter(evt => evt instanceof NavigationStart)
) as Observable<NavigationStart>;
}
ngOnInit() {
this.navStart.subscribe(evt => console.log('Navigation Started!'));
}
}
響應式表單 (reactive forms)
FormControl 的 valueChanges 屬性和 statusChanges 屬性包含了會發出變更事件的可觀察對象
import { FormGroup } from '@angular/forms';
@Component({
selector: 'my-component',
template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
nameChangeLog: string[] = [];
heroForm: FormGroup;
ngOnInit() {
this.logNameChange();
}
logNameChange() {
const nameControl = this.heroForm.get('name');
nameControl.valueChanges.forEach(
(value: string) => this.nameChangeLog.push(value)
);
}
}
可觀察對象In Action
搜索建議(suggestions)
可觀察對象可以簡化輸入提示建議的實現方式。典型的輸入提示要完成一系列獨立的任務:
- 從輸入中監聽數據。
- 移除輸入值前后的空白字符,並確認它達到了最小長度。
- 防抖(這樣才能防止連續按鍵時每次按鍵都發起 API 請求,而應該等到按鍵出現停頓時才發起)
- 如果輸入值沒有變化,則不要發起請求(比如按某個字符,然后快速按退格)。
- 如果已發出的 AJAX 請求的結果會因為后續的修改而變得無效,那就取消它。
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => e.target.value),
filter(text => text.length > 2), // 過濾
debounceTime(10),// 延時
distinctUntilChanged(),//發生變化后再執行
switchMap(() => ajax('/api/endpoint'))
);
typeahead.subscribe(data => {
// Handle the data from the API
});
指數化backoff
指數化退避是一種失敗后重試 API 的技巧,它會在每次連續的失敗之后讓重試時間逐漸變長,超過最大重試次數之后就會徹底放棄。 如果使用承諾和其它跟蹤 AJAX 調用的方法會非常復雜,而使用可觀察對象,這非常簡單:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) {
return pipe(
retryWhen(attempts => range(1, maxTries)
.pipe(
zip(attempts, (i) => i),
map(i => i * i),
mergeMap(i => timer(i * ms))
)
)
);
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
Observables VS. promises
可觀察對象經常拿來和承諾進行對比。有一些關鍵的不同點:
- 可觀察對象是聲明式的,在被訂閱之前,它不會開始執行,promise是在創建時就立即執行的
- 可觀察對象能提供多個值,promise只提供一個,這讓可觀察對象可用於隨着時間的推移獲取多個值
- 可觀察對象會區分串聯處理和訂閱語句,promise只有
.then()
語句 - 可觀察對象的
subscribe()
會負責處理錯誤,promise會把錯誤推送給它的子promise
作者:Jadepeng
出處:jqpeng的技術記事本--http://www.cnblogs.com/xiaoqi
您的支持是對博主最大的鼓勵,感謝您的認真閱讀。
本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。