在
map
中返回Promises,然后等待結果本文譯自How to use async functions with Array.map in Javascript - Tamás Sallai 。
在前面的文章中,我們介紹了 async / await如何幫助執行異步命令 ,但在異步處理集合時卻無濟於事。在本文中,我們將研究該map
函數,該函數是最常用的函數,它將數據從一種形式轉換為另一種形式(這里可以理解為 map
具有返回值)。
1. Array.map
該map
是最簡單和最常見的采集功能。它通過迭代函數運行每個元素,並返回包含結果的數組。
向每個元素添加一的同步版本:
const arr = [1, 2, 3];
const syncRes = arr.map((i) => {
return i + 1;
});
console.log(syncRes);
// 2,3,4
異步版本需要做兩件事。首先,它需要將每個項目映射到具有新值的 Promise
,這是async
在函數執行之前添加的內容。
其次,它需要等待所有Promises
,然后將結果收集到Array中。幸運的是,Promise.all
內置調用正是我們執行步驟2所需的。
這使得一個異步的一般模式map
是Promise.all(arr.map(async (...) => ...))
。
異步實現與同步實現相同:
const arr = [1, 2, 3];
const asyncRes = await Promise.all(arr.map(async (i) => {
await sleep(10);
return i + 1;
}));
console.log(asyncRes);
// 2,3,4
2. 並發
上面的實現為數組的每個元素並行運行迭代函數。通常這很好,但是在某些情況下,它可能會消耗過多的資源。當異步函數訪問 API
或消耗過多的RAM以至於無法一次運行太多RAM時,可能會發生這種情況。
盡管異步map
易於編寫,但要增加並發控件。在接下來的幾個示例中,我們將研究不同的解決方案。
2.1 批量處理
最簡單的方法是對元素進行分組並逐個處理。這使您可以控制一次可以運行的最大並行任務數。但是由於一組必須在下一組開始之前完成,因此每組中最慢的元素成為限制因素。
為了進行分組,下面的示例使用Underscore.js
的groupBy
實現。許多庫提供了一種實現,並且它們大多數都是可互換的。Lodash
是個例外,因為其 groupBy
不傳遞 item
的索引。
如果您不熟悉groupBy
,它將通過迭代函數運行每個元素,並返回一個對象,其鍵為結果,值為產生該值的元素的列表。
為了使群體最多n
的元素,一個迭代器 Math.floor(i / n)
,其中 i
是元素的索引。例如,一組大小為3的元素將映射以下元素:
0 => 0
1 => 0
2 => 0
3 => 1
4 => 1
5 => 1
6 => 2
...
Javascript實現:
const arr = [30, 10, 20, 20, 15, 20, 10];
console.log(
_.groupBy(arr, (_v, i) => Math.floor(i / 3))
);
// {
// 0: [30, 10, 20],
// 1: [20, 15, 20],
// 2: [10]
// }
最后一組可能比其他組小,但是保證所有組都不會超過最大組大小。
要映射一組,通常的Promise.all(group.map(...))
構造是很好。
要按順序映射組,我們需要一個reduce,它將先前的結果(memo
)與當前組的結果連接起來:
return Object.values(groups)
.reduce(async (memo, group) => [
...(await memo),
...(await Promise.all(group.map(iteratee)))
], []);
此實現基於以下事實:await memo
等待上一個結果的完成才進行下一個任務。
實現批處理的完整實現:
const arr = [30, 10, 20, 20, 15, 20, 10];
const mapInGroups = (arr, iteratee, groupSize) => {
const groups = _.groupBy(arr, (_v, i) => Math.floor(i / groupSize));
return Object.values(groups)
.reduce(async (memo, group) => [
...(await memo),
...(await Promise.all(group.map(iteratee)))
], []);
};
const res = await mapInGroups(arr, async (v) => {
console.log(`S ${v}`);
await sleep(v);
console.log(`F ${v}`);
return v + 1;
}, 3);
// -- first batch --
// S 30
// S 10
// S 20
// F 10
// F 20
// F 30
// -- second batch --
// S 20
// S 15
// S 20
// F 15
// F 20
// F 20
// -- third batch --
// S 10
// F 10
console.log(res);
// 31,11,21,21,16,21,11
2.2 並行處理
並發控制的另一種類型是並行執行大多數n
任務,並在完成一項任務時啟動一個新任務。
我無法為此提供一個簡單的實現,但是幸運的是,Bluebird提供了一個開箱即用的庫。這很簡單,只需導入庫並使用Promise.map
支持該concurrency
選項的功能即可。
在下面的示例中,並發限制為2
,這意味着立即啟動2個任務,然后每完成一個任務,就開始一個新任務,直到沒有剩余:
const arr = [30, 10, 20, 20, 15, 20, 10];
// Bluebird promise
const res = await Promise.map(arr, async (v) => {
console.log(`S ${v}`)
await sleep(v);
console.log(`F ${v}`);
return v + 1;
}, {concurrency: 2});
// S 30
// S 10
// F 10
// S 10
// F 30
// S 20
// F 10
// S 15
// F 20
// S 20
// F 15
// S 20
// F 20
// F 20
console.log(res);
// 31,11,21,21,16,21,11
2.3 順序處理
有時,並發太多,因此應該一個接一個地處理元素。
一個簡單的實現是使用並發性為 1 的 Bluebird
的 Promise
。但是在這種情況下,它不保證包括一個庫,因為reduce
這樣做很簡單:
const arr = [1, 2, 3];
const res = await arr.reduce(async (memo, v) => {
const results = await memo;
console.log(`S ${v}`)
await sleep(10);
console.log(`F ${v}`);
return [...results, v + 1];
}, []);
// S 1
// F 1
// S 2
// F 2
// S 3
// F 3
console.log(res);
// 2,3,4
確保在執行任何其他操作之前 await memo
,因為如果沒有 await
,它仍然會並發運行!
3. 結論
該map
功能很容易轉換為異步,因為Promise.all
內置功能繁重。但是控制並發需要一些計划。
推薦閱讀
如果對你有所幫助,可以點贊、收藏。