前言
第一次接觸rxjs也是因為angular2應用,內置了rxjs的依賴,了解之后發現它的強大,是一個可以代替promise的框架,但是只處理promise的東西有點拿尚方寶劍砍蚊子的意思。
如果我們的應用是完全rxjs的應用,會顯得代碼比較清晰,代碼寫的爽。
angular團隊和微軟合作,采用的typescript和rxjs,互相宣傳。。
rxjs
rxjs是一個比較簡單的庫,它只有Observable,Observer,subscription,subject,Operators,Scheduler6個對象概念。比較類似於觀察者模式,如果再了解一些函數式編程和node的stream就更好了。
observable 可觀察對象
observable是一個可觀察對象,也類似觀察者模式中的可觀察對象,后面的Subscription就相當於觀察者模式中的訂閱者。
給一個例子:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
創建了一個Obervable對象,這里用到了create操作符。
create操作符:創建一個新的 Observable ,當觀察者( Observer )訂閱該 Observable 時,它會執行指定的函數。
observer 觀察者
如上例子中的observer,給一個典型的observer例子:
var observer={
next:x=>console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
}
有點類似promise的返回,每來一個“流”就會執行一個next,出錯會執行一個observer的error,完成后或者調用complete便不再監聽observable,執行complete函數。這些函數的集合也就是observer。
要使用觀察者,需要訂閱可觀察對象:
observable.subscribe(observer)
Subscription訂閱
訂閱是一個表示一次性資源的對象,通常是一個可觀察對象的執行。
它有一個重要的方法:unsubscribe,顧名思義。。。
比如observable的例子:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
var observer={
next:x=>console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
};
observable.subscribe(observer);
//返回
Observer got a next value: 1
Observer got a next value: 2
Observer got a next value: 3
Observer got a next value: 4 //after 1s return
Observer got a complete notification
如果在最后調用subscription.unsubscribe();
那么4就不會執行,complete也不會執行,就會取消掉這個觀察。
Subject
Subject是允許值被多播到多個觀察者的一種特殊的Observable。然而純粹的可觀察對象是單播的(每一個訂閱的觀察者擁有單獨的可觀察對象的執行)。
subject是Observable對象,並且自帶next,error,complete函數,所以我們不用在定義observer:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
//返回
observerA: 1
observerB: 1
observerA: 2
observerB: 2
由於subject自帶next等等的函數,所以它也是個observer,也可以這樣用:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
Operators操作符
rx因為operators強大,我們可以流式的處理主要因為有operators在。
操作符是可觀察對象上定義的方法,例如.map(...),.filter(...),.merge(...),等等。他們類似fp,返回新的observable而subscription對象也會繼承。
比如
Rx.Observable.interval(500).filter(x => x%2==1).subscribe( res => console.log(res) );
// 一秒輸出一個數,返回單數。
這里的filter就是操作符,我們通過操作符來完成一系列的神奇操作。
Scheduler調度者
什么是調度者?調度者控制着何時啟動一個訂閱和何時通知被發送。
名稱 | 類型 | 屬性 | 描述 |
---|---|---|---|
queue | Scheduler | 在當前事件幀中調度隊列(trampoline 調度器)。迭代操作符使用此調度器。 | |
asap | Scheduler | 微任務隊列上的調度, 使用盡可能快的轉化機制, 或者是 Node.js 的 process.nextTick(),或者是 Web Worker 的消息通道,或者 setTimeout , 或者其他。異步轉化使用此調度器. | |
async | Scheduler | 使用 setInterval 調度工作。基於時間的操作符使用此調度器。 | |
animationFrame | Scheduler | 使用 requestAnimationFrame 調度工作。與平台的重繪同步使用此調度器。 |
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.observeOn(Rx.Scheduler.async);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//返回
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
這是因為observeOn(Rx.Scheduler.async)在Observable.create和最終的Observer之間引入了一個代理Observer。
var proxyObserver = {
next: (val) => {
Rx.Scheduler.async.schedule(
(x) => finalObserver.next(x),
0 /* delay */,
val /* will be the x for the function above */
);
},
// ...
}
使用rxjs
搜索功能
<input id="text"></input>
<script>
var text = document.querySelector('#text');
text.addEventListener('keyup', (e) =>{
var searchText = e.target.value;
// 發送輸入內容到后台
$.ajax({
url: `xx.com/${searchText}`,
success: data => {
// 拿到后台返回數據,並展示搜索結果
render(data);
}
});
});
</script>
之前實現一個搜索效果,其實需要這樣的代碼,應用到函數節流還需要寫為
clearTimeout(timer);
// 定時器,在 250 毫秒后觸發
timer = setTimeout(() => {
console.log('發起請求..');
},250)
還要考慮一種情況,如果我們搜索了a,然后馬上改為了b,會返回a的結果,這樣我們就需要判斷一下:
clearTimeout(timer)
timer = setTimeout(() => {
// 聲明一個當前所搜的狀態變量
currentSearch = '書';
var searchText = e.target.value;
$.ajax({
url: `xx.com/${searchText}`,
success: data => {
// 判斷后台返回的標志與我們存的當前搜索變量是否一致
if (data.search === currentSearch) {
// 渲染展示
render(data);
} else {
// ..
}
}
});
這種代碼其實就很雜亂了。
如果用rxjs,我們的代碼能簡單並且清楚很多:
var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
.debounceTime(250)
.pluck('target', 'value')
.switchMap(url => Http.get(url))
.subscribe(data => render(data));
rxjs幾個操作符
forkJoin
rxjs版的promise.all
const getPostOne$ = Rx.Observable.timer(1000).mapTo({id: 1});
const getPostTwo$ = Rx.Observable.timer(2000).mapTo({id: 2});
Rx.Observable.forkJoin(getPostOne$, getPostTwo$).subscribe(res => console.log(res))
//返回
[ { id: 1 }, { id: 2 } ]
pairwise
可以保存上一個值
Rx.Observable
.fromEvent(document, 'scroll')
.map(e => window.pageYOffset)
.pairwise()
.subscribe(pair => console.log(pair)); // pair[1] - pair[0]
switchMap
合並兩個流的值,並只發出最新的值
const clicks$ = Rx.Observable.fromEvent(document, 'click');
const innerObservable$ = Rx.Observable.interval(1000);
clicks$.switchMap(event => innerObservable$)
.subscribe(val => console.log(val));
每次點擊觸發才發送interval值,並且點擊之后interval重新發送,取消掉之前的值。如果是mergeMap,則不取消之前的值。
toPromise
返回promise
let source = Rx.Observable
.of(42)
.toPromise();
source.then((value) => console.log('Value: %s', value));
// => Value: 42
fromPromise
將 Promise 轉化為 Observable。
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
有了和promise相互轉化的api,就很方便的用rx,ng2中內置rx,用着不爽就任意改成promise來寫。
takeUntil
public takeUntil(notifier: Observable): Observable
發出源 Observable 發出的值,直到notifier:Observable 發出值。
rx.Observable.interval(1000).takeUntil(rx.Observable.fromEvent(document,'click'))
觸發interval,然后每次點擊停止觸發。
所以它還有一個用法就是建立一個stop流,來避免手動調用unsubscribe。
const data$ = this.getData();
const cancelBtn = this.element.querySelector('.cancel-button');
const rangeSelector = this.element.querySelector('.rangeSelector');
const cancel$ = Observable.fromEvent(cancelBtn, 'click');
const range$ = Observable.fromEvent(rangeSelector, 'change').map(e => e.target.value);
const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500))
this.subscription = data$.takeUntil(stop$).subscribe(data => this.updateData(data));
rxjs在ng2
先提BehaviorSubject
BehaviorSubject繼承自Observable類,它儲存着要發射給消費者的最新的值。
無論何時一個新的觀察者訂閱它,都會立即接受到這個來自BehaviorSubject的"當前值"。
比如
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
//返回
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
每次next就傳一個值,在observer里面寫函數處理。
例子
我們有一個material table的例子來看。
代碼看文最后
我們做的是一個table中的filter功能,類似find item by name。
一般的思路就是獲取這個input的值,函數節流,在我們的table數據中filter這個name,然后給原來綁定的data賦值。
對於rx的寫法就很清楚了。
Observable.fromEvent(this.filter.nativeElement, 'keyup')
.debounceTime(150)
.distinctUntilChanged()
.subscribe(() => {
if (!this.dataSource) { return; }
this.dataSource.filter = this.filter.nativeElement.value;
});
我們獲取輸入的值,節流,去重,賦值給this.dataSource,this.dataSource其實是ExampleDataSource的實例。
ExampleDatabase類是生成數據的類,可以忽略,ExampleDataSource是我們做處理的一個類,material暴露了一個connect方法,返回的observable直接綁定table的data。
主要的處理在ExampleDataSource里:
export class ExampleDataSource extends DataSource<any> {
_filterChange = new BehaviorSubject('');
get filter(): string { return this._filterChange.value; }
set filter(filter: string) { this._filterChange.next(filter); }
constructor(private _exampleDatabase: ExampleDatabase) {
super();
}
/** Connect function called by the table to retrieve one stream containing the data to render. */
connect(): Observable<UserData[]> {
const displayDataChanges = [
this._exampleDatabase.dataChange,
this._filterChange,
];
return Observable.merge(...displayDataChanges).map(() => {
return this._exampleDatabase.data.slice().filter((item: UserData) => {
let searchStr = (item.name + item.color).toLowerCase();
return searchStr.indexOf(this.filter.toLowerCase()) != -1;
});
});
}
我們設置了filter這個屬性的get和set,每次我們按下按鍵,給this.dataSource.filter賦值的時候,實際上,我們調用了BehaviorSubject的next方法,
發了一個事件。我們還需要merge一下_exampleDatabase.dataChange事件,為了當table數據改變的時候,我們能做出相應的處理。
然后就用map操作符,filter一下我們的data數據。給table數據綁定material已經幫我們做了。
附文:
import {Component, ElementRef, ViewChild} from '@angular/core';
import {DataSource} from '@angular/cdk';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/operator/startWith';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/observable/fromEvent';
@Component({
selector: 'table-filtering-example',
styleUrls: ['table-filtering-example.css'],
templateUrl: 'table-filtering-example.html',
})
export class TableFilteringExample {
displayedColumns = ['userId', 'userName', 'progress', 'color'];
exampleDatabase = new ExampleDatabase();
dataSource: ExampleDataSource | null;
@ViewChild('filter') filter: ElementRef;
ngOnInit() {
this.dataSource = new ExampleDataSource(this.exampleDatabase);
Observable.fromEvent(this.filter.nativeElement, 'keyup')
.debounceTime(150)
.distinctUntilChanged()
.subscribe(() => {
if (!this.dataSource) { return; }
this.dataSource.filter = this.filter.nativeElement.value;
});
}
}
/** Constants used to fill up our data base. */
const COLORS = ['maroon', 'red', 'orange', 'yellow', 'olive', 'green', 'purple',
'fuchsia', 'lime', 'teal', 'aqua', 'blue', 'navy', 'black', 'gray'];
const NAMES = ['Maia', 'Asher', 'Olivia', 'Atticus', 'Amelia', 'Jack',
'Charlotte', 'Theodore', 'Isla', 'Oliver', 'Isabella', 'Jasper',
'Cora', 'Levi', 'Violet', 'Arthur', 'Mia', 'Thomas', 'Elizabeth'];
export interface UserData {
id: string;
name: string;
progress: string;
color: string;
}
/** An example database that the data source uses to retrieve data for the table. */
export class ExampleDatabase {
/** Stream that emits whenever the data has been modified. */
dataChange: BehaviorSubject<UserData[]> = new BehaviorSubject<UserData[]>([]);
get data(): UserData[] { return this.dataChange.value; }
constructor() {
// Fill up the database with 100 users.
for (let i = 0; i < 100; i++) { this.addUser(); }
}
/** Adds a new user to the database. */
addUser() {
const copiedData = this.data.slice();
copiedData.push(this.createNewUser());
this.dataChange.next(copiedData);
}
/** Builds and returns a new User. */
private createNewUser() {
const name =
NAMES[Math.round(Math.random() * (NAMES.length - 1))] + ' ' +
NAMES[Math.round(Math.random() * (NAMES.length - 1))].charAt(0) + '.';
return {
id: (this.data.length + 1).toString(),
name: name,
progress: Math.round(Math.random() * 100).toString(),
color: COLORS[Math.round(Math.random() * (COLORS.length - 1))]
};
}
}
/**
* Data source to provide what data should be rendered in the table. Note that the data source
* can retrieve its data in any way. In this case, the data source is provided a reference
* to a common data base, ExampleDatabase. It is not the data source's responsibility to manage
* the underlying data. Instead, it only needs to take the data and send the table exactly what
* should be rendered.
*/
export class ExampleDataSource extends DataSource<any> {
_filterChange = new BehaviorSubject('');
get filter(): string { return this._filterChange.value; }
set filter(filter: string) { this._filterChange.next(filter); }
constructor(private _exampleDatabase: ExampleDatabase) {
super();
}
/** Connect function called by the table to retrieve one stream containing the data to render. */
connect(): Observable<UserData[]> {
const displayDataChanges = [
this._exampleDatabase.dataChange,
this._filterChange,
];
return Observable.merge(...displayDataChanges).map(() => {
return this._exampleDatabase.data.slice().filter((item: UserData) => {
let searchStr = (item.name + item.color).toLowerCase();
return searchStr.indexOf(this.filter.toLowerCase()) != -1;
});
});
}
disconnect() {}
}