調度器
什么是調度器?調度器是當開始訂閱時,控制通知推送的。它由三個部分組成。
- 調度是數據結構。它知道怎樣在優先級或其他標准去存儲和排隊運行的任務
- 調度器是一個執行上下文。它表示任務在何時何地執行(例如,立即或是在回調機制中如 setTimeout 或 process.nextTick,又或是動畫框架)
- 調度器有一個(虛擬)計時器。它提供一個 “時間” 的概念,通過在調度器的方法
now()。在特定的調度程序上調度,它只遵循計時器表示的時間。
調度器能讓你在執行上下文定義 Observable 推送的通知給觀察者
下面是一個例子,我們用一個簡單的 Observable 來同步推送值 1,2,3,並且為了使用這些值,使用操作符 observeOn 到特定的 async 調度程序。
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.log('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
執行的輸出:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
注意是如何通知 got value... 是在 just after subscribe 之后被推送的,它跟我們之前的默認的行為很相差很遠的。這是因為 observeOn(asyncScheduler) 在 new Observable 和最終的觀察者之間引入了一個代理觀察者。我們來把之前的代碼重命名一些標識再來看下:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
var observable = new Observable((proxyObserver) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);
var finalObserver = {
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
proxyObserver 在 observeOn(asyncScheduler) 中創建的,這個代理觀察者里的 next(value) 函數大概像下面這樣:
const proxyObserver = {
next(val) {
asyncScheduler.schedule(
(x) => finalObserver.next(x),
0, //延遲參數
val //這個值是從上面的 x 傳過來的
);
},
// ...
}
the async 調度程序操作能還使用 setTimeout 或 setInterval,即使如果給定了 delay 的值是 0。通常,在 Javascript 中,setTimeout(fn, 0) 知道如何在下一個事件循環迭代最早運行函數 fn 。
調度器的方法 schedule() 有一個參數 delay,它指的是相對於調度器自己內部時鍾的一個時間數字量。一個調度器的時間不需要有任何相關的實際時間。這個是一個暫時的操作像 delay 操作符一樣而不是值實際的時間,但是時間是由調度器的時鍾決定的。在測試中這特別有用,它這個虛擬的時間調度器也許用在偽裝的真實時間,當在真正同步運行調度任務的時候。
調度器類型
async 調度器是 RsJS 眾多調度器中內置的一個調度器。他們每一個都能被創建和返回,通過使用 Scheduler 對象的靜態屬性。
| SCHEDULER | PURPOSE |
|---|---|
null |
不傳遞任何調度器,通知以同步和遞歸的方式傳遞。這個將用於常量時間操作符或尾遞歸操作符。 |
queueScheduler |
對當前的事件框架隊列調度(蹦床調度器)。迭代操作用這個 |
asapScheduler |
在微任務隊列調度,它與 promises 的隊列使用相同。基本上在當前 job 之后,但是在下一個 job 之前。異步約束使用這個 |
asyncScheduler |
用 setInterval 調度工作。在以時間為基礎的操作用這個 |
animationFrameScheduler |
調度任務將會發生在下個瀏覽器內容重繪之前。能使用在平滑的創建瀏覽器動畫 |
使用調度器
你也許早就在你的代碼中使用 RsJS 的調度器,沒有顯式的說明使用的調度器類型。這是因為所有的處理並發的 Observable 操作符都有可選的調度器。如果你不提供調度器,RxJS 通過使用最小並發原則將會選擇一個默認的調度器。這就是說最少並發的調度器需要安全的選擇操作符。例如,返回一個有限和少量的 Observable 的操作符,RxJS 不使用調度器,例如 null 或 undefined。對於返回一個可能很大的或者無限的消息,那么就會使用 queue 調度器。對於使用時間的的操作符,那么就會用 async。
因為 RxJS 使用最少並發調度器,你可以選擇一個不同的調度器,如果你想以性能為目的來引入並發。為了指定一個特定的調度器,你可以送那些操作符方法傳遞調度器,例如 from([10, 20, 30], asyncShceduler)
靜態創建操作符通常將一個調度器作為參數傳遞。例如,from([10, 20, 30]) 允許你當推送每一個從 array 轉化的通知時指定一個調度器。它通常是操作符上的最后一個參數。下面的靜態創建操作符都會傳遞一個調度器參數:
bindCallbackbindNodeCallbackcombineLatestconcatemptyfromfromPromiseintervalmergeofrangethrowtimer
上下文將會調用 subscribe() 來用 subscribeOn 調度。subscribe 默認會在 Observable 調用,它將會同步和立即的方式調用。然而,你也許要延遲或在給定的調度器調度實際的訂閱,使用操作符的 subscribeOn(scheduler) 的實例,里面的 scheduler 就是你要提供的參數。
使用 observeOn 來調度上下文將推送的通知。就像我們看到的一樣,實例操作符 observeOn(scheduler) 在源 Observable 和 目標觀察者之間引入 Observer 中間者,這個中介者調度器通過你給定的 shceduler 對目標觀察者調用。
實例操作符傳遞一個調度器作為參數。
時間相關操作符像 bufferTime,debounceTime,delay,sampleTime,throttleTime,timeInterval,timeout,timeoutWith,windowTime 最后一個參數全都帶一個調度器參數,以及其他操作符默認都會在 asyncScheduler 。
其他實例傳調度器作為參數的操作符:cache,combineLatest,concat,merge,expand,publishReplay,startWith。
注意到 cache 和 publishReplay 這兩個都接受一個調取器,因為他們都使用 ReplaySubject。ReplaySubject 構造函數傳遞一個可選的調度器作為最后一個參數,因為 ReplaySubject 能處理時間,它只有在調度器的上下文中才會有意義。ReplaySubject 默認使用 queue 調度器提供一個時鍾。
PS:RxJS 系列文章同步至 https://github.com/MarsonShine/JavascriptStudy/tree/master/src/rxjs/docs
