async-pool的JS 實現並發控制


前言

最近看了些js 並發如何實現,也查閱了一下資料 ,查考了一下demo

日常開發中會遇到並發控制的場景,比如控制請求並發數。那么在 JavaScript 中如何實現並發控制呢?

 

並發控制。

下面有8個待辦任務要執行,而我們希望限制同時執行的任務個數,即最多只有 2 個任務能同時執行。

正在執行任務列表 中的任何 1 個任務完成后,程序會自動從 待辦任務列表 中獲取新的待辦任務並把該任務添加到 正在執行任務列表 ,

介紹 下 async-pool 這個庫來介紹一下異步任務並發控制的具體實現。

async-poolgithub.com/rxaviers/as…

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。

也是查閱了些資料 去研究了下 asyncPool 的使用

async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現,在分析其具體實現之前,我們來看一下它如何使用。

1.1 asyncPool 的使用

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
 

在以上代碼中,我們使用 async-pool 這個庫提供的 asyncPool 函數來實現異步任務的並發控制。 asyncPool 函數的簽名如下所示:

function asyncPool(poolLimit, array, iteratorFn){ ... }
 

該函數接收 3 個參數:

  • poolLimit(數字類型):表示限制的並發數;
  • array(數組類型):表示任務數組;
  • iteratorFn(函數類型):表示迭代函數,用於實現對每個任務項進行處理,該函數會返回一個 Promise 對象或異步函數。

對於以上示例來說,在使用了 asyncPool 函數之后,對應的執行過程如下所示:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
 

通過觀察以上的注釋信息,我們可以大致地了解 asyncPool 函數內部的控制流程。下面我們先來分析 asyncPool 函數的 ES7 實現。

1.2 asyncPool ES7 實現

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存儲所有的異步任務
  const executing = []; // 存儲正在執行的異步任務
  for (const item of array) {
    // 調用iteratorFn函數創建異步任務
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的異步任務

    // 當poolLimit值小於或等於總任務個數時,進行並發控制
    if (poolLimit <= array.length) {
      // 當任務完成后,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在執行的異步任務
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待較快的任務執行完成
      }
    }
  }
  return Promise.all(ret);
}
 

在以上代碼中,充分利用了 Promise.allPromise.race 函數特點,再結合 ES7 中提供的 async await 特性,

最終實現了並發控制的功能。

利用 await Promise.race(executing); 這行語句,我們會等待 正在執行任務列表 中較快的任務執行完成之后,才會繼續執行下一次循環。

asyncPool ES7 實現相對比較簡單,接下來我們來看一下不使用 async await 特性要如何實現同樣的功能。

1.3 asyncPool ES6 實現

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // 存儲所有的異步任務
  const executing = []; // 存儲正在執行的異步任務
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // 獲取新的任務項
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // 當poolLimit值小於或等於總任務個數時,進行並發控制
    if (poolLimit <= array.length) {
      // 當任務完成后,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing); 
      }
    }
 
    // 正在執行任務列表 中較快的任務執行完成之后,才會從array數組中獲取新的待辦任務
    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}
 

在 ES6 的實現版本中,通過內部封裝的 enqueue 函數來實現核心的控制邏輯。

Promise.race(executing) 返回的 Promise 對象變成已完成狀態時,才會調用 enqueue 函數,從 array 數組中獲取新的待辦任務。

 
 
上面就是並發的開發一下資料,可以在以后的業務中運用啦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM