調度器
什么是調度器?調度器是當開始訂閱時,控制通知推送的。它由三個部分組成。
- 調度是數據結構。它知道怎樣在優先級或其他標准去存儲和排隊運行的任務
- 調度器是一個執行上下文。它表示任務在何時何地執行(例如,立即或是在回調機制中如 setTimeout 或 process.nextTick,又或是動畫框架)
- 調度器有一個(虛擬)計時器。它提供一個 “時間” 的概念,通過在調度器的方法
now()
。在特定的調度程序上調度,它只遵循計時器表示的時間。
調度器能讓你在執行上下文定義 Observable 推送的通知給觀察者
下面是一個例子,我們用一個簡單的 Observable 來同步推送值 1,2,3,並且為了使用這些值,使用操作符 observeOn
到特定的 async
調度程序。
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.log('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
執行的輸出:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
注意是如何通知 got value...
是在 just after subscribe
之后被推送的,它跟我們之前的默認的行為很相差很遠的。這是因為 observeOn(asyncScheduler)
在 new Observable
和最終的觀察者之間引入了一個代理觀察者。我們來把之前的代碼重命名一些標識再來看下:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
var observable = new Observable((proxyObserver) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);
var finalObserver = {
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
proxyObserver
在 observeOn(asyncScheduler)
中創建的,這個代理觀察者里的 next(value)
函數大概像下面這樣:
const proxyObserver = {
next(val) {
asyncScheduler.schedule(
(x) => finalObserver.next(x),
0, //延遲參數
val //這個值是從上面的 x 傳過來的
);
},
// ...
}
the async
調度程序操作能還使用 setTimeout
或 setInterval
,即使如果給定了 delay
的值是 0。通常,在 Javascript 中,setTimeout(fn, 0)
知道如何在下一個事件循環迭代最早運行函數 fn
。
調度器的方法 schedule()
有一個參數 delay
,它指的是相對於調度器自己內部時鍾的一個時間數字量。一個調度器的時間不需要有任何相關的實際時間。這個是一個暫時的操作像 delay
操作符一樣而不是值實際的時間,但是時間是由調度器的時鍾決定的。在測試中這特別有用,它這個虛擬的時間調度器也許用在偽裝的真實時間,當在真正同步運行調度任務的時候。
調度器類型
async
調度器是 RsJS 眾多調度器中內置的一個調度器。他們每一個都能被創建和返回,通過使用 Scheduler
對象的靜態屬性。
SCHEDULER | PURPOSE |
---|---|
null |
不傳遞任何調度器,通知以同步和遞歸的方式傳遞。這個將用於常量時間操作符或尾遞歸操作符。 |
queueScheduler |
對當前的事件框架隊列調度(蹦床調度器)。迭代操作用這個 |
asapScheduler |
在微任務隊列調度,它與 promises 的隊列使用相同。基本上在當前 job 之后,但是在下一個 job 之前。異步約束使用這個 |
asyncScheduler |
用 setInterval 調度工作。在以時間為基礎的操作用這個 |
animationFrameScheduler |
調度任務將會發生在下個瀏覽器內容重繪之前。能使用在平滑的創建瀏覽器動畫 |
使用調度器
你也許早就在你的代碼中使用 RsJS 的調度器,沒有顯式的說明使用的調度器類型。這是因為所有的處理並發的 Observable 操作符都有可選的調度器。如果你不提供調度器,RxJS 通過使用最小並發原則將會選擇一個默認的調度器。這就是說最少並發的調度器需要安全的選擇操作符。例如,返回一個有限和少量的 Observable 的操作符,RxJS 不使用調度器,例如 null
或 undefined
。對於返回一個可能很大的或者無限的消息,那么就會使用 queue
調度器。對於使用時間的的操作符,那么就會用 async
。
因為 RxJS 使用最少並發調度器,你可以選擇一個不同的調度器,如果你想以性能為目的來引入並發。為了指定一個特定的調度器,你可以送那些操作符方法傳遞調度器,例如 from([10, 20, 30], asyncShceduler)
靜態創建操作符通常將一個調度器作為參數傳遞。例如,from([10, 20, 30])
允許你當推送每一個從 array
轉化的通知時指定一個調度器。它通常是操作符上的最后一個參數。下面的靜態創建操作符都會傳遞一個調度器參數:
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
上下文將會調用 subscribe() 來用 subscribeOn
調度。subscribe
默認會在 Observable 調用,它將會同步和立即的方式調用。然而,你也許要延遲或在給定的調度器調度實際的訂閱,使用操作符的 subscribeOn(scheduler) 的實例,里面的 scheduler
就是你要提供的參數。
使用 observeOn
來調度上下文將推送的通知。就像我們看到的一樣,實例操作符 observeOn(scheduler)
在源 Observable 和 目標觀察者之間引入 Observer 中間者,這個中介者調度器通過你給定的 shceduler
對目標觀察者調用。
實例操作符傳遞一個調度器作為參數。
時間相關操作符像 bufferTime
,debounceTime
,delay
,sampleTime
,throttleTime
,timeInterval
,timeout
,timeoutWith
,windowTime
最后一個參數全都帶一個調度器參數,以及其他操作符默認都會在 asyncScheduler
。
其他實例傳調度器作為參數的操作符:cache
,combineLatest
,concat
,merge
,expand
,publishReplay
,startWith
。
注意到 cache
和 publishReplay
這兩個都接受一個調取器,因為他們都使用 ReplaySubject。ReplaySubject 構造函數傳遞一個可選的調度器作為最后一個參數,因為 ReplaySubject 能處理時間,它只有在調度器的上下文中才會有意義。ReplaySubject 默認使用 queue
調度器提供一個時鍾。
PS:RxJS 系列文章同步至 https://github.com/MarsonShine/JavascriptStudy/tree/master/src/rxjs/docs