前言
本文寫一下js中es5和es6針對異步函數,串行執行和並行執行的方案,以及串行和並行結合使用的例子。原文地址
es5方式
在es6出來之前,社區nodejs中針對回調地獄,已經有了promise方案。假如多個異步函數,執行循環怎么安排,如何才能更快的執行完所有的異步函數,再執行下一步呢?這里就出現了js的串行執行和並行執行的問題。
異步函數串行執行
var items = [ 1, 2, 3, 4, 5, 6 ];
var results = [];
function async(arg, callback) {
console.log('參數為 ' + arg +' , 1秒后返回結果');
setTimeout(function () { callback(arg * 2); }, 1000);
}
function final(value) {
console.log('完成: ', value);
}
function series(item) {
if(item) {
async( item, function(result) {
results.push(result);
return series(items.shift());// 遞歸執行完所有的數據
});
} else {
return final(results[results.length - 1]);
}
}
series(items.shift());
異步函數並行執行
上面函數是一個一個執行的,上一個執行結束再執行下一個,類似es6(es5之后統稱es6)中 async 和await,那有沒有類似promise.all這種,所有的並行執行的呢?
可以如下寫:
var items = [ 1, 2, 3, 4, 5, 6 ];
var results = [];
function async(arg, callback) {
console.log('參數為 ' + arg +' , 1秒后返回結果');
setTimeout(function () { callback(arg * 2); }, 1000);
}
function final(value) {
console.log('完成: ', value);
}
items.forEach(function(item) {// 循環完成
async(item, function(result){
results.push(result);
if(results.length === items.length) {// 判斷執行完畢的個數是否等於要執行函數的個數
final(results[results.length - 1]);
}
})
});
異步函數串行執行和並行執行結合
假如並行執行很多條異步(幾百條)數據,每個異步數據中有很多的(https)請求數據,勢必造成tcp 連接數不足,或者堆積了無數調用棧導致內存溢出。所以並行執行不易太多數據,因此,出現了並行和串行結合的方式。
代碼可以如下書寫:
var items = [ 1, 2, 3, 4, 5, 6 ];
var results = [];
var running = 0;
var limit = 2;
function async(arg, callback) {
console.log('參數為 ' + arg +' , 1秒后返回結果');
setTimeout(function () { callback(arg * 2); }, 1000);
}
function final(value) {
console.log('完成: ', value);
}
function launcher() {
while(running < limit && items.length > 0) {
var item = items.shift();
async(item, function(result) {
results.push(result);
running--;
if(items.length > 0) {
launcher();
} else if(running == 0) {
final(results);
}
});
running++;
}
}
launcher();
es6方式
es6天然自帶串行和並行的執行方式,例如串行可以用async和await(前文已經講解),並行可以用promise.all等等。那么針對串行和並行結合,限制promise all並發數量,社區也有一些方案,例如
tiny-async-pool、es6-promise-pool、p-limit
簡單封裝一個promise all並發數限制解決方案函數
function PromiseLimit(funcArray, limit = 5) { // 並發執行5條數據
let i = 0;
const result = [];
const executing = [];
const queue = function() {
if (i === funcArray.length) return Promise.all(executing);
const p = funcArray[i++]();
result.push(p);
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= limit) {
return Promise.race(executing).then(
() => queue(),
e => Promise.reject(e)
);
}
return Promise.resolve().then(() => queue());
};
return queue().then(() => Promise.all(result));
}
使用:
// 測試代碼
const result = [];
for (let index = 0; index < 10; index++) {
result.push(function() {
return new Promise((resolve, reject) => {
console.log("開始" + index, new Date().toLocaleString());
setTimeout(() => {
resolve(index);
console.log("結束" + index, new Date().toLocaleString());
}, parseInt(Math.random() * 10000));
});
});
}
PromiseLimit(result).then(data => {
console.log(data);
});
修改測試代碼,新增隨機失敗邏輯
// 修改測試代碼 隨機失敗或者成功
const result = [];
for (let index = 0; index < 10; index++) {
result.push(function() {
return new Promise((resolve, reject) => {
console.log("開始" + index, new Date().toLocaleString());
setTimeout(() => {
if (Math.random() > 0.5) {
resolve(index);
} else {
reject(index);
}
console.log("結束" + index, new Date().toLocaleString());
}, parseInt(Math.random() * 1000));
});
});
}
PromiseLimit(result).then(
data => {
console.log("成功", data);
},
data => {
console.log("失敗", data);
}
);
async 和await 結合promise all
async function PromiseAll(promises,batchSize=10) {
const result = [];
while(promises.length > 0) {
const data = await Promise.all(promises.splice(0,batchSize));
result.push(...data);
}
return result;
}
這么寫有2個問題
1、在調用Promise.all前就已經創建好了promises,實際上promise已經執行了
2、你這個實現必須等前面batchSize個promise resolve,才能跑下一批的batchSize個,也就是promise all全部成功才可以。
改進如下:
async function asyncPool(array,poolLimit,iteratorFn) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(ret);
}
使用:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool( [1000, 5000, 3000, 2000], 2,timeout).then(results => {
...
});