背景
我們在需要保證代碼在多個異步處理之后執行,我們通常會使用
Promise.all(promises: []).then(fun: function);
Promise.all可以保證,promises數組中所有promise對象都達到resolve狀態,才執行then回調
那么會出現的情況是,你在瞬間發出幾十萬http請求(tcp連接數不足可能造成等待),或者堆積了無數調用棧導致內存溢出.
這個時候需要我們對HTTP的連接數做限制。
內容
//promise並發限制
class PromisePool {
constructor(max, fn) {
this.max = max; //最大並發量
this.fn = fn; //自定義的請求函數
this.pool = []; //並發池
this.urls = []; //剩余的請求地址
}
start(urls) {
this.urls = urls; //先循環把並發池塞滿
while (this.pool.length < this.max) {
let url = this.urls.shift();
this.setTask(url);
}
//利用Promise.race方法來獲得並發池中某任務完成的信號
let race = Promise.race(this.pool);
return this.run(race);
}
run(race) {
race
.then(res => {
//每當並發池跑完一個任務,就再塞入一個任務
let url = this.urls.shift();
this.setTask(url);
return this.run(Promise.race(this.pool));
})
}
setTask(url) {
if (!url) return
let task = this.fn(url);
this.pool.push(task); //將該任務推入pool並發池中
console.log(`\x1B[43m ${url} 開始,當前並發數:${this.pool.length}`)
task.then(res => {
//請求結束后將該Promise任務從並發池中移除
this.pool.splice(this.pool.indexOf(task), 1);
console.log(`\x1B[43m ${url} 結束,當前並發數:${this.pool.length}`);
})
}
}
//test
const URLS = [
'bytedance.com',
'tencent.com',
'alibaba.com',
'microsoft.com',
'apple.com',
'hulu.com',
'amazon.com'
]
//自定義請求函數
var requestFn = url => {
return new Promise(resolve => {
setTimeout(() => {
resolve(`任務${url}完成`)
}, 1000)
}).then(res => {
console.log('外部邏輯', res);
})
}
const pool = new PromisePool(5, requestFn); //並發數為5
pool.start(URLS)
從上面可以看出,思路如下:定義一個 PromisePool 對象,初始化一個 pool 作為並發池,然后先循環把並發池塞滿,不斷地調用 setTask 然后通過自己自定義的任務函數(任務函數可以是網絡請求封裝的 promise 對象,或者是其他的),而且每個任務是一個Promise對象包裝的,執行完就 pop 出連接池, 任務push 進並發池 pool 中。
//利用Promise.race方法來獲得並發池中某任務完成的信號
let race = Promise.race(this.pool);
return this.run(race);
run(race) {
race
.then(res => {
//每當並發池跑完一個任務,就再塞入一個任務
let url = this.urls.shift();
this.setTask(url);
return this.run(Promise.race(this.pool));
})
}
這個地方就是不斷通過遞歸的方式,每當並發池跑完一個任務,就再塞入一個任務
其他
npm中有很多實現這個功能的第三方包,比如async-pool、es6-promise-pool、p-limit,也可以直接拿來用