如何實現一個簡單的並發控制?


並發控制的概念相信大家都非常熟悉,比如瀏覽器請求的並發控制等。今天,我們結合 async-pool 這個開源工具來看看如何實現一個簡單的並發控制。

async-pool 的代碼分為 es6 和 es7 兩個版本,都非常簡單,我們主要基於 es6 版本進行說明。

在去除參數校驗等邏輯以后,核心代碼如下,非常短小精悍:

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = [];
  const executing = [];
  const enqueue = function() {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++];
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing);
      }
    }

    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

asyncPool 支持三個參數,第一個是並發數量,第二個是一組請求輸入,第三個是返回 promise 的迭代函數。我們舉一個例子來進行說明。

假設我們現在有 500 個請求需要發送,並發數量控制是 50。那么我們可以這樣使用 asyncPool

asyncPool(50, [/* 500 個請求的參數數據 */], () => {/* 發起請求的函數 */})

我們現在來詳細說明 asyncPool 的工作原理。

首先,asyncPool 中初始化了兩個數組,ret 保存返回結果,其順序要與輸入順序一致,executing 用於記錄當前正在執行的請求。

asyncPool 中創建了一個 enqueue 函數,負責具體的並發控制邏輯。

enqueue 函數中,通過變量 i 來逐個獲取請求輸入參數,調用迭代函數發起請求,然后將返回的 promise 保存在 ret 中。

const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);

之后就是並發數量控制的核心邏輯:

let r = Promise.resolve();

if (poolLimit <= array.length) {
    const e = p.then(() => executing.splice(executing.indexOf(e), 1));
    executing.push(e);
    if (executing.length >= poolLimit) {
        r = Promise.race(executing);
    }
}

return r.then(() => enqueue());

如果並發數量限制大於要發起的請求數量,則無需通過 executing 數組來記錄正在執行的請求,直接循環發起請求即可。

如果並發數量限制小於要發起的請求數量,則首先通過之前調用迭代函數返回的 promise 生成一個新的 promise,放入 executing 中。在這個新的 promise 完成時,將其從 executing 中刪除。

如果 executing 數組長度大於並發數量控制,則使用 Promise.race(executing) 獲取最先返回的 promsie,並通過它進行下一次迭代。

通過變量 r 我們可以看到,在整個循環過程中,enqueue 函數會形成一個 promise 鏈,在最后一個 promise 返回之后,asyncPool 通過 Promise.all 將所有的結果返回。

return enqueue().then(() => Promise.all(ret));

至此,async-pool 的核心邏輯我們就分析完了。上面的分析過程是基於 es6 版本的代碼,es7 版本更加簡潔,如下,看官們可以自行分析:

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
    }
  }
  return Promise.all(ret);
}

我們知道,不管是 Promise.race 還是 Promise.all,只要有一個 promise 達到 Fufilled 或者 Rejected 狀態,整個就會返回。這在接口請求的的場景中是不合適的。我們應該如何改造呢?

其實也非常簡單,只要在迭代函數的調用處做一些特殊處理即可。

iteratorFn(item, array).then(resp => resp).catch(error => error);

常見面試知識點、技術解決方案、教程,都可以掃碼關注公眾號“眾里千尋”獲取,或者來這里 https://everfind.github.io

眾里千尋


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM