Node.js 並發能力總結


簡介

Node.js 有多重並發的能力,包括單線程異步、多線程、多進程等,這些能力可以根據業務進行不同選擇,幫助提高代碼的運行效率。

本文希望通過讀 p-limit、pm2 和 worker_threads 的一些代碼,來了解 Node.js 的並發能力。

版本說明

  • Node.js 15.4.0
  • Npm: 7.0.15

異步

Node.js 最常用的並發手段就是異步,不因為資源的消耗而阻塞程序的執行。

什么樣的並發

從邏輯上講,異步並不是為了並發,而是為了不阻塞主線程。但是我們卻可以同時發起多個異步操作,來起到並發的效果,雖然計算的過程是同步的。

當性能的瓶頸是 I/O 操作,比如查詢數據庫、讀取文件或者是訪問網絡,我們就可以使用異步的方式,來完成並發。而由於計算量比較小,所以不會過多的限制性能。每當這個時候,你只需要默默擔心下游的 QPS 就好了。

以 I/O 操作為主的應用,更適合用 Node.js 來做,比如 Web 服務中同時執行 M 個 SQL,亦或是離線腳本中同時訪問發起 N 個 RPC 服務。

所以在代碼中使用 async/await 的確很舒服,但是適當的合並請求,使用 Promise.all 才能提高性能。

限制並發

一旦你習慣了 Promise.all,同時了解了 EventLoop 的機制,你會發現 I/O 請求的限制往往在下游。因為對於 Node.js 來說,同時發送 10 個 RPC 請求和同時發送 100 個 RPC 請求的成本差別並不大,都是“發送-等待”的節奏,但是下游的“供應商”是會受不了的,這時你需要限制並發數。

限制並發數

常用限制並發數的 Npm 包是 p-limit,大致用法如下。

const fns = [
  fetchSomething1,
  fetchSomething2,
  fetchSomething3,
];

const limit = pLimit(10);
Promise.all(
  fns
    .map(fn =>
      limit(async () => {
        await fn() // fetch1/2/3
      })
    ) // map
); // Promise.all

 

pLimit 函數源碼

為了深入了解,我們看一段 p-limit 的源碼,具體如下。

const pLimit = concurrency => {

  // ...

  const queue = new Queue();
  let activeCount = 0;

  // ...

  const enqueue = (fn, resolve, ...args) => {
    queue.enqueue(run.bind(null, fn, resolve, ...args));

    (async () => {
      await Promise.resolve();

      if (activeCount < concurrency && queue.size < 0) {
        queue.dequeue()();
      }
    })();
  };

  const generator = (fn, ...args) => new Promise(resolve => {
    enqueue(fn, resolve, ...args);
  });

  // ...

  return generator;
};

 

稍微解釋一下上面的代碼:

  1. pLimit 函數的入參 concurrency 是最大並發數,變量 activeCount 表示當前在執行的異步函數的數量

    a.調用一次 pLimit 會生成一個限制並發的函數 generator

    b.多個 generator 函數會共用一個隊列

    c. activeCount 需要小於 concurrency

  2. pLimit 的實現依據隊列(yocto-queue)

    a. 隊列有兩個方法:equeue 和 dequeue,equeue 負責進入隊列

    b. 每個 generator 函數執行會將一個函數壓如隊列

    c. 當發現 activeCount 小於最大並發數時,則調用 dequeue 彈出一個函數,並執行它。

  3. 每次被壓入隊列的不是原始函數,而是經過 run 函數處理的函數。

函數 run & next

// run 函數
const run = async (fn, resolve, ...args) => {
  activeCount++;

  const result = (async () => fn(...args))();
  resolve(result);

  try {
    await result;
  } catch {}

  next();
};

// next 函數
const next = () => {
  activeCount--;

  if (queue.size > 0) {
    queue.dequeue()();
  }
};

 

  1. 函數 run 做 3 件事情,這三件事情為順序執行:

    i . 讓 activeCount +1

    ii . 執行異步函數 fn,並將結果傳遞給 resolve

         a. 為保證 next 的順序,采用了 await result

    iii. 調用 next 函數

  2. 函數 next 做兩件事情

    i. 讓 activeCount -1

    ii. 當隊列中還有元素時,彈出一個元素並執行,按照上面的邏輯,run 就會被調用

通過函數 enqueue、run 和 next,plimit 就產生了一個限制大小但不斷消耗的異步函數隊列,從而起到限流的作用。

更詳細的 p-limit 使用:Node 開發中使用 p-limit 限制並發原理[1]

超時怎么辦

pPromise 並沒有處理超時,簡單的辦法是可以使用 setTimeout 實現一個。

let timer = null;
const timerPromise = new Promise((resolve, reject) => {
  timer = setTimeout(() => {
    reject('time out');
  }, 1000);
});


Promise.all([
  timerPromise,
  fetchPromise,
])
.then(res => clearTimeout(timer))
.catch(err => console.error(err));

 

如果想看更正規的寫法,可以參照 p-timeout 的代碼,下面是一段的截取。

const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => {


  //  ...

  const timer = options.customTimers.setTimeout.call(undefined, () => {
    if (typeof fallback === 'function') {
      try {
        resolve(fallback());
      } catch (error) {
        reject(error);
      }
      return;
    }

    const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`;
    const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message);
    // ...

    reject(timeoutError);
  }, milliseconds);

  (async () => {
    try {
      resolve(await promise);
    } catch (error) {
      reject(error);
    } finally {
      options.customTimers.clearTimeout.call(undefined, timer);
    }
  })();
});

 

p-limit 做了更多的校驗和更好的封裝:

  • 把超時和主程序封裝在一個 Promise 中

    • 更利於用戶理解
    • 靈活度更高:如果使用 Promise.all 只能通過 reject 表示超時,而 p-limit 可以通過 resolve 和 reject 兩個方式觸發超時
  • 對於超時后的錯誤提示做了封裝

    • 用戶可以指定錯誤信息
    • 超時可以觸發特定的錯誤,或者是指定的函數
  • clearTimeout 加在 finally 中的寫法更舒服

Async Hooks

為了方便追蹤異步資源,我們可以使用 async_hooks 模塊。

The async_hooks module provides an API to track asynchronous resources.

什么是異步資源

在 NodeJS 中,一個異步資源表示為一個關聯回調函數的對象。有以下幾個特點:

  1. 回調可以被多次調用(比如反復打開文件,多次創建網絡連接);
  1. 資源可以在回調被調用之前關閉;
  1. AsyncHook 更多的是異步抽象,而不會去管理這些異步的不同。
  1. 當多個 Worker 使用時,每個線程會創建自己的 async_hooks 的接口。

概述

https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html

先看一段 async_hooks 的代碼

const fs = require('fs');
const asyncHooks = require('async_hooks');

let indent = 0;
const asyncHook = asyncHooks.createHook({
  init(asyncId, type, triggerAsyncId, resource) {
    const eid = asyncHooks.executionAsyncId();
    const indentStr = ' '.repeat(indent);
    fs.writeSync(
      1,
      ${indentStr}${type}(${asyncId}):
      trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
  },
  before(asyncId) {
    const indentStr = ' '.repeat(indent);
    fs.writeSync(1, ${indentStr}before:  ${asyncId}\n);
    indent += 2;
  },
  after(asyncId) {
    indent -= 2;
    const indentStr = ' '.repeat(indent);
    fs.writeSync(1, ${indentStr}after:  ${asyncId}\n);
  },
  destroy(asyncId) {
    const indentStr = ' '.repeat(indent);
    fs.writeSync(1, ${indentStr}destroy:  ${asyncId}\n);
  },
});

asyncHook.enable();

Promise.resolve('ok').then(() => {
  setTimeout(() => {
    console.log('>>>', asyncHooks.executionAsyncId());
  }, 10);
});

 

運行結果如下。

 

 

 

Async Hooks 的方法

  • asyncHook.enable() / asyncHook.disable():打開/關閉 Async Hooks
  • Hook callbacks:當資源進入不同階段,下面的函數會被調用

    • init:被聲明時調用
    • before:聲明之后、執行之前調用
    • after:異步執行完成后立即調用
    • destroy:異步資源被銷毀時被調用
  • 變量

    • asyncId:異步的 ID,每一次異步調用會使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起來。
    • triggerAsyncId: 觸發當前 asyncId 的 asyncId。
  • 使用 asyncId 和 triggerAsyncId 可以完整的追蹤到異步調用的順序

    • 其中根節點 root 是 1。
    • 上面代碼的調用順序:1 -> 2 -> 3 -> 4 -> 5,6,7
    • 映射代碼上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log

Async Hooks: type

在上面的 init 方法中 type 參數標明了資源類型,type 類型有 30 多種,具體可以參看下面的鏈接。

https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type

本次程序主要用到了下面幾種:

  • PROMISE:Promise 對象
  • Timeout:setTimeout 使用
  • TTYWRAP:console.log
  • SIGNALWRAP:console.log
  • TickObject:console.log

使用 Async Hooks 的注意事項

不要在 Async Hooks 的方法中使用異步函數,或者會引發異步的函數,如 console.log。因為 Async Hooks 方法就是在監控異步,而自身使用異步函數,會導致自己調用自己。

如果想打印輸出怎么辦?

好的解決辦法是使用 fs.writeSync 或者 fs.writeFileSync,即同步輸出的辦法。

多進程:Cluster

異步在 I/O 資源的利用上可以實現並發, 但是異步無法並發的使用 CPU 資源。多進程才能更好地利用多核操作系統的優點。

啟動子進程

Node.js 使用 Cluster 模塊來完成多進程,我們可以通過 pm2 的代碼來了解多進程,可以先從下面兩個文件入手:

lib/God.js 和 lib/God/ClusterMode.js。

// lib/God.js

// ...
  cluster.setupMaster({
    windowsHide: true,
    exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
  });
// ...
// lib/God/ClusterMode.js

module.exports = function ClusterMode(God) {

  // ...

  try {
    clu = cluster.fork({
      pm2_env: JSON.stringify(env_copy),
      windowsHide: true
    });
  } catch(e) {
    God.logAndGenerateError(e);
    return cb(e);
  }

  // ...


};

 

上面兩端代碼主要講了 cluster 的兩個基本函數:

  • setupMaster
  • fork

簡單理解,就是 setupMaster 用於設置,而 fork 用於創建子進程。比如下面的例子。

const cluster = require('cluster');
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork();

 

通信

進程間的通信使用的是事件監聽來通信。

const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
  const worker = cluster.fork();
  [
    'error',
    'exit',
    'listening',
    'message',
    'online'
  ].forEach(workerEvent => {
    worker.on(workerEvent, msg => {
      console.log([${workerEvent}] from worker:, msg);
    });
  });
} else {
  http.createServer(function(req, res) {
    process.send(${req.url});
    res.end(Hello World: ${req.url});
  }).listen(8000);
}

 

運行后,訪問:http://localhost:8000/ 后結果如下:

 

 

 

通過 process.send,子進程可以給主進程發送信息,發送的信息可以是字符串,或者是可以進行 JSONStringify 的對象。而如果一個對象不能 JSONStringify,則會報錯,比如下面這段代碼。

  http.createServer(function(req, res) {
    process.send(req);
    res.end(Hello World: ${req.url});
  }).listen(8000);

 

會報錯:

 

 

 

這就意味着 Cluster 的通信是消息通信,但是沒辦法共享內存。(貌似就是進程的定義,但是強調一下沒什么壞處)

cluster.settings

可以通過 Cluster 模塊對子進程進行設置。

  • execArgv:執行參數
  • exec:執行命令,包含可執行文件、腳本文件、參數。
  • args: 執行參數
  • cwd:執行目錄
  • serialization: 使傳遞數據支持高級序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 內嵌類型
  • silent:是否沉默,如果設置為 true,子進程的輸出就被屏蔽了
  • uid:子進程的 uid
  • gid:子進程的 gid
  • inspectPort:子線程的 inspect 端口

如何榨干機器性能

可以參看:nodejs 如何使用 cluster 榨干機器性能[2]

多線程:Worker Threads

如果想要共享內存,就需要多線程,Node.js 引入了 worker_threads 模塊來完成多線程。

監聽端口

假設有一個 server.js 的文件。

const http = require('http');


const runServer = port => {
  const server = http.createServer((_req, res) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    const msg = `server on ${port}`;
    console.log(msg);
    res.end(msg);
  });
  server.listen(port);
};


module.exports.runServer = runServer;

 

Cluster 監聽

通過 cluster 監聽端口,可以如下。

const cluster = require('cluster');
const { runServer } = require('./server');


if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);
  for (let i = 0; i < 4; i ++) {
    cluster.fork();
  }
} else {
  console.log(`worker${cluster.worker.id}: ${cluster.worker.process.pid}`);
  runServer(3000);
}

 

類似的 Worker Threads 代碼

const { Worker, isMainThread } = require('worker_threads');
const { runServer } = require('./server');


console.log('isMainThread', isMainThread);


if (isMainThread) {
  for (let i = 0; i < 3; i ++) {
    new Worker(__filename);
  }
} else {
  runServer(4000);
}

 

 

結果如下。

 

 

我們沒辦法在一個進程中監聽多個端口,具體可以查看 Node.: 中 net.js 和 cluster.js 做了什么。

那么 Worker Threads 優勢在哪?

通信

Worker Threads 更擅長通信,這是線程的優勢,不僅是可以消息通信,還可以共享內存。

具體可以看:多線程 worker_threads 如何通信[3]

子線程管理

子線程通過 Worker 實例管理,而下面介紹實例化中的幾個重要參數。

資源限制 resouceLimits

  • maxOldGenerationSizeMb:子線程中棧的最大內存
  • maxYoungGenerationSizeMb:子線程中創建對象的堆的最大內存
  • codeRangeSizeMb:生成代碼消耗的內存
  • stackSizeMb:該線程默認堆的大小

子線程輸出 stdout/stderr/stdin

如果這 stdout/stderr/stdin 設置為 true,子線程會有獨立的管道輸出,而不會把 out/err/in 合並到父進程。

子線程參數 workerData, argv 和 execArgv

  • workerData: 父線程傳遞給子線程的數據,必須要通過 require('worker_threads').workerData 獲取。
  • argv: 父線程傳遞給子線程的參數,子線程通過 process.argv 獲取。
  • execArgv: Node 的執行參數。

子線程環境 env 和 SHARE_ENV

  • env: 父線程傳遞給子線程的環境,通過 process.env 可以獲取。
  • SHARE_ENV:指定父線程和子線程可以共享環境變量

總結

  • 作為 Web 服務,提高並發數,選擇 Cluster 更好;
  • 作為腳本,希望提高並發,選擇 Worker Threads 更好;
  • 當計算不是瓶頸,在某個進程或線程中,靈活異步的使用更好。

參考資料

[1]

Node 開發中使用 p-limit 限制並發原理: https://tech.bytedance.net/articles/6908747346445041671

[2]

nodejs 如何使用 cluster 榨干機器性能: https://tech.bytedance.net/articles/6906846464304447495

[3]

多線程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608

以上內容轉自https://mp.weixin.qq.com/s/cXwM_ENAjxvvwaBHEsuHbA 

喜歡這篇文章?歡迎打賞~~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM