
壹 ❀ 引
之前在整理手寫Promise
相關資料時,在文章推薦區碰巧看到了一道手寫Promise
並發控制調度器的筆試題(大廠可能愛考),結果今天同事又正好問了我一個關於Promise
調度處理的場景問題,這就讓我瞬間想起了前面看的題,出於興趣我也抽時間嘗試實現了下,外加上幾道相關的題統一做個整理,本文開始!!
貳 ❀ 題一
我們假定有一個請求request
與一個限制最多同時3
個請求的調度器,要求實現一次最多處理limit
個數的請求:
// 假設請求API為
function request(params) {
return new Promise((resolve, reject) => {
setTimeout(() => resolve(params), 1000);
});
}
// 最多處理3個請求的調度器
function Scheduler(limit=3){
// ...
};
const createPromise = Scheduler();
createPromise(1).then((res) => console.log(res));
createPromise(2).then((res) => console.log(res));
createPromise(3).then((res) => console.log(res));
createPromise(4).then((res) => console.log(res));
createPromise(5).then((res) => console.log(res));
// 預期,等1秒后輸出1 2 3 ,再等一秒輸出4 5
比如上述代碼limit
是3,因此最終輸出效果為,等待1s后輸出1 2 3
,之后又過一秒輸出4 5
。
我們先大致分析下思路,首先Scheduler
返回了一個方法createPromise
,我們能通過createPromise
直接創建Promise
,那么在Scheduler
內部一定得返回一個方法可用於生成Promise
。
其次,createPromise
被調用多次,但請求是分批次處理的,那我們可以在Scheduler
內創建一個數組用於存儲createPromise
的參數。另外,因為有limit
限制最多處理3
個請求,我們還需要一個變量用於記錄當前已經在處理的請求數。其它先別管,先搭建模板:
// 最多處理3個請求的調度器
function Scheduler(limit = 3) {
const pending = [];
let count = 0;
// 返回一個創建請求的方法
return function (param) {
//內部返回一個promise
return new Promise((resolve, reject) => {});
};
}
到這里,起碼createPromise(1)
能得到一個Promise
,並順利調用.then()
方法了。在手寫Promise
一文中,我們知道new Promise(fn)
其實是會將fn
傳入到Promise
同步調用,因此我們可以在fn
內部進行調用請求相關參數的存儲操作。
另外,已知有請求接口request
,很明顯還沒有地方做這個請求處理,而且有趣的是題目要求一次最多處理3個請求
,那么我們要時時感知請求數量的變化,比如現在只有2個請求了,可以再加入一個請求,用遞歸肯定是合理的,考慮到方便遞歸,我們將request
的相關操作以及limit
判斷封裝在一個方法run
中,再次補全代碼:
function Scheduler(limit = 3) {
const pending = [];
let count = 0;
// 處理request以及limit判斷的地方
const run = () => {
// 數組為空嗎?超出limit限制了嗎?
if (!pending.length || count >= limit) return;
// 依次取出之前存儲的參數
const param = pending.shift();
count++;
request(param).finally(() => {
count--;
// 遞歸,繼續判斷能不能執行下一個request
run();
});
};
// 返回一個創建請求的方法
return function (param) {
//內部返回一個promise
return new Promise((resolve, reject) => {
// 存儲數據
pending.push(param);
// 開始請求
run();
});
};
}
以上代碼其實已經滿足了請求的調度限制,但我們希望createPromise(1)
創建的Promise
能感知狀態變化,所以再次修改,將創建Promise
處的resolve reject
也作為參數存儲起來,如下:
// 這里只貼需要修改的代碼
request(param)
.then((res) => resolve(res))
.catch((err) => reject(err))
.finally(() => {
count--;
// 遞歸,繼續判斷能不能執行下一個request
run();
});
//內部返回一個promise
return new Promise((resolve, reject) => {
// 存儲數據
pending.push([param, resolve, reject]);
});
而.then((res) => resolve(res))
其實可以簡寫成.then(resolve)
,所以最終代碼為:
function Scheduler(limit = 3) {
const pending = [];
let count = 0;
// 處理request以及limit判斷的地方
const run = () => {
// 數組為空嗎?超出limit限制了嗎?
if (!pending.length || count >= limit) return;
// 依次取出之前存儲的參數
const [param, resolve, reject] = pending.shift();
count++;
request(param)
.then(resolve)
.catch(reject)
.finally(() => {
count--;
// 遞歸,繼續判斷能不能執行下一個request
run();
});
};
// 返回一個創建請求的方法
return function (param) {
//內部返回一個promise
return new Promise((resolve, reject) => {
// 存儲數據
pending.push([param, resolve, reject]);
// 開始請求
run();
});
};
}
運行代碼,可以看到已符合預期:

假設我們將limit
修改為2,效果如下

叄 ❀ 題二
這一道題的要求與上者類似,實現一個限制並發的異步調度器Scheduler
,保證同時運行的任務最多2個,完善如下代碼,使程序能正常輸出:
class Scheduler {
add(promiseCreator) { ... }
// ...
}
const timeout = time => new Promise(resolve => {
setTimeout(resolve, time);
})
const scheduler = new Scheduler();
const addTask = (time,order) => {
scheduler.add(() => timeout(time).then(()=>console.log(order)))
}
addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// output: 2 3 1 4
我們先分析題意,首先timeout
方法調用會得到一個Promise
,我們會通過調用addTask
來模擬請求行為,而方法內部核心調用的其實是scheduler.add(Promise)
。看樣子我們還是能跟上題一樣定一個存儲Promise
的數組pending
,以及當前請求的數量count
。
讓我們再看addTask
執行的輸出順序,因為addTask
調用是同步行為(微任務要走也得先把四次addTask
跑完),所以我們可以理解為當第一個Promise
准備開始執行時,pending
中已經存儲過4個Promise
了,我們將四個Promise
按它們的值來命名為P1、P2、P3、P4
,現在開始模擬這個過程:
-
由於限制器為2,所以一開始
P1 P2
就進入准備狀態了,由於定時器的緣故,500ms更快P2
先執行,因此先輸出2。 -
因為釋放了
P2
,於是P3
被通知也可以准備運行了,它的時間是300ms,P1
看着是1000ms但其實已經過了500ms,但依舊比不過300ms,所以緊接着輸出3,注意,此時再過200msP1
就可以執行了。 -
限制再次被放開,
P4
也開始執行,前面說了此時的P1
僅需200ms就能執行,而P4
需要400ms,因此先輸出1,最后輸出4。
那么問題來了,我們需要去手動控制或者說判斷這些Promise
之間的時間差,然后決定誰應該先執行嗎?很明顯不需要,我們要做的就是保證永遠有2個Promise
在等待執行,至於時間的差異,定時器會公平的到點修改Promise
狀態,並不需要人為干預。
那么大概思路了解了,開始補全代碼,思路與上一道題完全一致:
class Scheduler {
constructor() {
// 記錄promise的數組
this.pending = [];
// 限制器
this.limit = 2;
// 記錄當前已被啟動的promise
this.count = 0;
}
add(promiseCreator) {
// 單純存儲promise
this.pending.push(promiseCreator);
// 啟動執行,至於能不能走run內部會控制
this.run();
}
run() {
// 假設pending為空,或者調用大於限制直接返回
if (!this.pending.length || this.count >= this.limit) {
return;
}
this.count++;
this.pending
.shift()()
.finally(() => {
this.count--;
// 輪詢
this.run();
});
}
}
運行代碼,已經能正確輸出2 3 1 4
,而且思路與題目一完全一致。不知道你發現沒,第二題其實與第一題一模一樣,所謂定時器時間不同只是一個幌子,咱們只用控制好執行的數量,至於順序定時器自己會幫我們安排好。
肆 ❀ 題三
題三同樣是限制器問題,不過這次參數是直接傳入了一個數組,要求同樣是是並發最多同時處理3個請求,但要求如果全部成功則返回結果的數組,且結果順序與參數順序保持一致,如果失敗則直接返回失敗,是不是有點Promise.all()
的意思了:
// 假設請求API為
function request(params) {
return new Promise((resolve, reject) => {
setTimeout(() => resolve(params), 1000);
});
}
// 最多處理3個請求的調度器
function Scheduler(list=[], limit=3){
// ...
};
Scheduler([1,2,3,4,5]).then(console.log); // 1 2 3 4 5
我們簡單分析下需求,首先Scheduler
能調用.then
,那么內部一定得返回一個Promise
;其次,前兩題是調一次咱們存一個參數,然后排隊按照限制最大數有序執行,第三題參數直接是一個數組,那咱們完全可以遍歷,以此模擬前兩題依次執行的行為,然后在內部一樣的做個執行限制豈不美哉?
但有兩點需要注意,一是成功狀態下,我們怎么保證執行結果的順序與參數一致,第二是跳出遞歸的條件是什么?大家可以思考下自己嘗試寫一寫,這里我直接上代碼:
// 最多處理3個請求的調度器
function Scheduler(list = [], limit = 3) {
let count = 0;
// 用於統計成功的次數
let resLength = 0;
// 淺拷貝一份,原數據的length我們還有用
const pending = [...list];
const resList = [];
// 一定得返回一個promise
return new Promise((resolve, reject) => {
const run = () => {
if (!pending.length || count >= limit) return;
count++;
const index = list.length - pending.length;
const params = pending.shift();
request(params)
.then((res) => {
// 使用輸出來驗證限制器生效沒
console.log('用於驗證限制器:', res);
count--;
resLength++;
// 按index來保存結果
resList[index] = res;
// 全部成功了嗎?沒有就繼續請求,否則resolve(resList)跳出遞歸;
resLength === list.length ? resolve(resList) : run();
})
.catch(reject) // 有一個失敗就直接失敗
};
// 遍歷,模擬前兩次依次調用的動作,然后在run內部控制如何執行
list.forEach(() => run());
})
}
Scheduler([1, 2, 3, 4, 5]).then(console.log); // 1 2 3 4 5
注釋寫的很詳細了,這里就不再解釋了。可以思考下,為什么index
不直接在forEach
時傳遞過去呢?接下來說說實現過程中我踩過的坑。
其實一開始我是這么個結構:
// 最多處理3個請求的調度器
function Scheduler(list = [], limit = 3) {
const run = (value,index,resolve,reject) => {
// ...
};
return new Promise(function (resolve, reject) {
// 遍歷,模擬前兩次依次調用的動作,然后在run內部控制如何執行
list.forEach((value, index) => run(value,index,resolve,reject));
})
}
我們知道循環調用run
的過程是同步的,調用當時我們還能知道index
相關參數,但因為限制的存在,你的這次調用可能並不能立刻執行,而是要依賴后續的遞歸,而遞歸時咱們又要從哪獲取這四個參數呢?所以想了想還不如直接用一個new Promise
將整個執行包裹,也符合這是一個大Promise
的設定。
第二個坑是,為什么我要加一個resLength
來統計成功的次數,其實一開始我沒加這個參數,而跳出遞歸的判斷是這么寫的:
// 全部成功了嗎?沒有就繼續請求,否則resolve(resList)跳出遞歸;
resList.length === list.length ? resolve(resList) : run();
然后在不同時間resolve
的例子測試情況下,出現了[empty, empty, empty, empty, 5]
這樣的結果,思考下為什么會出現?
其實很簡單,假設前四個請求都要5S,而第五個請求只要1S,那么自然最后一個請求最先結束,因為resList[4] = res;
,這就直接生成了前四個元素是empty
的數組,而empty
是占位的,且此時數組的length
已經是5了,滿足條件直接resolve()
導致了最終的錯誤。
伍 ❀ 總
回到文章開頭同事提到的需求,大概意思是有一個批量注冊的需求,考慮到批量的數量問題,需要加一個請求限制器,比如一次10個10個處理,同時還需要知道所有注冊結束后,哪些成功哪些失敗了,而Promise.all()
很明顯不能幫我們既拿到成功又拿到失敗的數據,而基於題三,我們只用定一個對象,兩個數組分別保存成功失敗的數據,比如:
// 同樣,不管成功還是失敗,我們讓resLength都自增,滿足條件后再統一resolve
// 你可以在最終的結果中判斷rejected長度是否大於0,如果大於就表示有失敗的,然后就能提示失敗了幾個,非常完美
const res = {
resolved:[],
rejected:[]
};
有興趣的同學可以模擬實現下這個需求。然后我拋出一個問題,題三雖然執行結果順序與參數順序一致,本質上因為這個執行過程就是有序的,因為大家都是等待1S,那么假設我的請求時間不相等,
比如我們修改request
部分代碼為:
const time = [1, 3, 5, 2, 6];
// 假設請求API為
function request(params) {
return new Promise((resolve, reject) => {
setTimeout(() => resolve(params), time[Math.floor(Math.random() * 5)] * 1000);
});
}
以上代碼還能保證結果的順序與參數一致嗎?為什么會一致?大家可以思考下這個過程,說到這個點其實就與Promise.all
的效果完全一致了,考慮到篇幅問題,我們下一篇文章來模擬實現Promise.all
與Promise.race
,那么到這里本文結束。