簡介
Node.js 有多重並發的能力,包括單線程異步、多線程、多進程等,這些能力可以根據業務進行不同選擇,幫助提高代碼的運行效率。
本文希望通過讀 p-limit、pm2 和 worker_threads 的一些代碼,來了解 Node.js 的並發能力。
版本說明
- Node.js 15.4.0
- Npm: 7.0.15
異步
Node.js 最常用的並發手段就是異步,不因為資源的消耗而阻塞程序的執行。
什么樣的並發
從邏輯上講,異步並不是為了並發,而是為了不阻塞主線程。但是我們卻可以同時發起多個異步操作,來起到並發的效果,雖然計算的過程是同步的。
當性能的瓶頸是 I/O 操作,比如查詢數據庫、讀取文件或者是訪問網絡,我們就可以使用異步的方式,來完成並發。而由於計算量比較小,所以不會過多的限制性能。每當這個時候,你只需要默默擔心下游的 QPS 就好了。
以 I/O 操作為主的應用,更適合用 Node.js 來做,比如 Web 服務中同時執行 M 個 SQL,亦或是離線腳本中同時訪問發起 N 個 RPC 服務。
所以在代碼中使用 async/await 的確很舒服,但是適當的合並請求,使用 Promise.all 才能提高性能。
限制並發
一旦你習慣了 Promise.all,同時了解了 EventLoop 的機制,你會發現 I/O 請求的限制往往在下游。因為對於 Node.js 來說,同時發送 10 個 RPC 請求和同時發送 100 個 RPC 請求的成本差別並不大,都是“發送-等待”的節奏,但是下游的“供應商”是會受不了的,這時你需要限制並發數。
限制並發數
常用限制並發數的 Npm 包是 p-limit,大致用法如下。
const fns = [ fetchSomething1, fetchSomething2, fetchSomething3, ]; const limit = pLimit(10); Promise.all( fns .map(fn => limit(async () => { await fn() // fetch1/2/3 }) ) // map ); // Promise.all
pLimit 函數源碼
為了深入了解,我們看一段 p-limit 的源碼,具體如下。
const pLimit = concurrency => { // ... const queue = new Queue(); let activeCount = 0; // ... const enqueue = (fn, resolve, ...args) => { queue.enqueue(run.bind(null, fn, resolve, ...args)); (async () => { await Promise.resolve(); if (activeCount < concurrency && queue.size < 0) { queue.dequeue()(); } })(); }; const generator = (fn, ...args) => new Promise(resolve => { enqueue(fn, resolve, ...args); }); // ... return generator; };
稍微解釋一下上面的代碼:
-
pLimit 函數的入參 concurrency 是最大並發數,變量 activeCount 表示當前在執行的異步函數的數量
a.調用一次 pLimit 會生成一個限制並發的函數 generator
b.多個 generator 函數會共用一個隊列
c. activeCount 需要小於 concurrency
-
pLimit 的實現依據隊列(yocto-queue)
a. 隊列有兩個方法:equeue 和 dequeue,equeue 負責進入隊列
b. 每個 generator 函數執行會將一個函數壓如隊列
c. 當發現 activeCount 小於最大並發數時,則調用 dequeue 彈出一個函數,並執行它。
- 每次被壓入隊列的不是原始函數,而是經過 run 函數處理的函數。
函數 run & next
// run 函數 const run = async (fn, resolve, ...args) => { activeCount++; const result = (async () => fn(...args))(); resolve(result); try { await result; } catch {} next(); }; // next 函數 const next = () => { activeCount--; if (queue.size > 0) { queue.dequeue()(); } };
-
函數 run 做 3 件事情,這三件事情為順序執行:
i . 讓 activeCount +1
ii . 執行異步函數 fn,並將結果傳遞給 resolve
a. 為保證 next 的順序,采用了 await result
iii. 調用 next 函數
-
函數 next 做兩件事情
i. 讓 activeCount -1
ii. 當隊列中還有元素時,彈出一個元素並執行,按照上面的邏輯,run 就會被調用
通過函數 enqueue、run 和 next,plimit 就產生了一個限制大小但不斷消耗的異步函數隊列,從而起到限流的作用。
更詳細的 p-limit 使用:Node 開發中使用 p-limit 限制並發原理[1]
超時怎么辦
pPromise 並沒有處理超時,簡單的辦法是可以使用 setTimeout 實現一個。
let timer = null; const timerPromise = new Promise((resolve, reject) => { timer = setTimeout(() => { reject('time out'); }, 1000); }); Promise.all([ timerPromise, fetchPromise, ]) .then(res => clearTimeout(timer)) .catch(err => console.error(err));
如果想看更正規的寫法,可以參照 p-timeout 的代碼,下面是一段的截取。
const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => { // ... const timer = options.customTimers.setTimeout.call(undefined, () => { if (typeof fallback === 'function') { try { resolve(fallback()); } catch (error) { reject(error); } return; } const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`; const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message); // ... reject(timeoutError); }, milliseconds); (async () => { try { resolve(await promise); } catch (error) { reject(error); } finally { options.customTimers.clearTimeout.call(undefined, timer); } })(); });
p-limit 做了更多的校驗和更好的封裝:
-
把超時和主程序封裝在一個 Promise 中
- 更利於用戶理解
- 靈活度更高:如果使用 Promise.all 只能通過 reject 表示超時,而 p-limit 可以通過 resolve 和 reject 兩個方式觸發超時
-
對於超時后的錯誤提示做了封裝
- 用戶可以指定錯誤信息
- 超時可以觸發特定的錯誤,或者是指定的函數
- clearTimeout 加在 finally 中的寫法更舒服
Async Hooks
為了方便追蹤異步資源,我們可以使用 async_hooks 模塊。
The async_hooks module provides an API to track asynchronous resources.
什么是異步資源
在 NodeJS 中,一個異步資源表示為一個關聯回調函數的對象。有以下幾個特點:
- 回調可以被多次調用(比如反復打開文件,多次創建網絡連接);
- 資源可以在回調被調用之前關閉;
- AsyncHook 更多的是異步抽象,而不會去管理這些異步的不同。
- 當多個 Worker 使用時,每個線程會創建自己的 async_hooks 的接口。
概述
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html
先看一段 async_hooks 的代碼
const fs = require('fs'); const asyncHooks = require('async_hooks'); let indent = 0; const asyncHook = asyncHooks.createHook({ init(asyncId, type, triggerAsyncId, resource) { const eid = asyncHooks.executionAsyncId(); const indentStr = ' '.repeat(indent); fs.writeSync( 1, ${indentStr}${type}(${asyncId}): trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n); }, before(asyncId) { const indentStr = ' '.repeat(indent); fs.writeSync(1, ${indentStr}before: ${asyncId}\n); indent += 2; }, after(asyncId) { indent -= 2; const indentStr = ' '.repeat(indent); fs.writeSync(1, ${indentStr}after: ${asyncId}\n); }, destroy(asyncId) { const indentStr = ' '.repeat(indent); fs.writeSync(1, ${indentStr}destroy: ${asyncId}\n); }, }); asyncHook.enable(); Promise.resolve('ok').then(() => { setTimeout(() => { console.log('>>>', asyncHooks.executionAsyncId()); }, 10); });
運行結果如下。
Async Hooks 的方法
- asyncHook.enable() / asyncHook.disable():打開/關閉 Async Hooks
-
Hook callbacks:當資源進入不同階段,下面的函數會被調用
- init:被聲明時調用
- before:聲明之后、執行之前調用
- after:異步執行完成后立即調用
- destroy:異步資源被銷毀時被調用
-
變量
- asyncId:異步的 ID,每一次異步調用會使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起來。
- triggerAsyncId: 觸發當前 asyncId 的 asyncId。
-
使用 asyncId 和 triggerAsyncId 可以完整的追蹤到異步調用的順序
- 其中根節點 root 是 1。
- 上面代碼的調用順序:1 -> 2 -> 3 -> 4 -> 5,6,7
- 映射代碼上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log
Async Hooks: type
在上面的 init 方法中 type 參數標明了資源類型,type 類型有 30 多種,具體可以參看下面的鏈接。
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type
本次程序主要用到了下面幾種:
- PROMISE:Promise 對象
- Timeout:setTimeout 使用
- TTYWRAP:console.log
- SIGNALWRAP:console.log
- TickObject:console.log
使用 Async Hooks 的注意事項
不要在 Async Hooks 的方法中使用異步函數,或者會引發異步的函數,如 console.log。因為 Async Hooks 方法就是在監控異步,而自身使用異步函數,會導致自己調用自己。
如果想打印輸出怎么辦?
好的解決辦法是使用 fs.writeSync 或者 fs.writeFileSync,即同步輸出的辦法。
多進程:Cluster
異步在 I/O 資源的利用上可以實現並發, 但是異步無法並發的使用 CPU 資源。多進程才能更好地利用多核操作系統的優點。
啟動子進程
Node.js 使用 Cluster 模塊來完成多進程,我們可以通過 pm2 的代碼來了解多進程,可以先從下面兩個文件入手:
lib/God.js 和 lib/God/ClusterMode.js。
// lib/God.js // ... cluster.setupMaster({ windowsHide: true, exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js') }); // ... // lib/God/ClusterMode.js module.exports = function ClusterMode(God) { // ... try { clu = cluster.fork({ pm2_env: JSON.stringify(env_copy), windowsHide: true }); } catch(e) { God.logAndGenerateError(e); return cb(e); } // ... };
上面兩端代碼主要講了 cluster 的兩個基本函數:
- setupMaster
- fork
簡單理解,就是 setupMaster 用於設置,而 fork 用於創建子進程。比如下面的例子。
const cluster = require('cluster'); cluster.setupMaster({ exec: 'worker.js', args: ['--use', 'https'], silent: true }); cluster.fork();
通信
進程間的通信使用的是事件監聽來通信。
const cluster = require('cluster'); const http = require('http'); if (cluster.isMaster) { const worker = cluster.fork(); [ 'error', 'exit', 'listening', 'message', 'online' ].forEach(workerEvent => { worker.on(workerEvent, msg => { console.log([${workerEvent}] from worker:, msg); }); }); } else { http.createServer(function(req, res) { process.send(${req.url}); res.end(Hello World: ${req.url}); }).listen(8000); }
運行后,訪問:http://localhost:8000/ 后結果如下:
通過 process.send,子進程可以給主進程發送信息,發送的信息可以是字符串,或者是可以進行 JSONStringify 的對象。而如果一個對象不能 JSONStringify,則會報錯,比如下面這段代碼。
http.createServer(function(req, res) { process.send(req); res.end(Hello World: ${req.url}); }).listen(8000);
會報錯:
這就意味着 Cluster 的通信是消息通信,但是沒辦法共享內存。(貌似就是進程的定義,但是強調一下沒什么壞處)
cluster.settings
可以通過 Cluster 模塊對子進程進行設置。
- execArgv:執行參數
- exec:執行命令,包含可執行文件、腳本文件、參數。
- args: 執行參數
- cwd:執行目錄
- serialization: 使傳遞數據支持高級序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 內嵌類型
- silent:是否沉默,如果設置為 true,子進程的輸出就被屏蔽了
- uid:子進程的 uid
- gid:子進程的 gid
- inspectPort:子線程的 inspect 端口
如何榨干機器性能
可以參看:nodejs 如何使用 cluster 榨干機器性能[2]
多線程:Worker Threads
如果想要共享內存,就需要多線程,Node.js 引入了 worker_threads 模塊來完成多線程。
監聽端口
假設有一個 server.js 的文件。
const http = require('http'); const runServer = port => { const server = http.createServer((_req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); const msg = `server on ${port}`; console.log(msg); res.end(msg); }); server.listen(port); }; module.exports.runServer = runServer;
Cluster 監聽
通過 cluster 監聽端口,可以如下。
const cluster = require('cluster'); const { runServer } = require('./server'); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); for (let i = 0; i < 4; i ++) { cluster.fork(); } } else { console.log(`worker${cluster.worker.id}: ${cluster.worker.process.pid}`); runServer(3000); }
類似的 Worker Threads 代碼
const { Worker, isMainThread } = require('worker_threads'); const { runServer } = require('./server'); console.log('isMainThread', isMainThread); if (isMainThread) { for (let i = 0; i < 3; i ++) { new Worker(__filename); } } else { runServer(4000); }
結果如下。
我們沒辦法在一個進程中監聽多個端口,具體可以查看 Node.: 中 net.js 和 cluster.js 做了什么。
那么 Worker Threads 優勢在哪?
通信
Worker Threads 更擅長通信,這是線程的優勢,不僅是可以消息通信,還可以共享內存。
具體可以看:多線程 worker_threads 如何通信[3]
子線程管理
子線程通過 Worker 實例管理,而下面介紹實例化中的幾個重要參數。
資源限制 resouceLimits
- maxOldGenerationSizeMb:子線程中棧的最大內存
- maxYoungGenerationSizeMb:子線程中創建對象的堆的最大內存
- codeRangeSizeMb:生成代碼消耗的內存
- stackSizeMb:該線程默認堆的大小
子線程輸出 stdout/stderr/stdin
如果這 stdout/stderr/stdin 設置為 true,子線程會有獨立的管道輸出,而不會把 out/err/in 合並到父進程。
子線程參數 workerData, argv 和 execArgv
- workerData: 父線程傳遞給子線程的數據,必須要通過 require('worker_threads').workerData 獲取。
- argv: 父線程傳遞給子線程的參數,子線程通過 process.argv 獲取。
- execArgv: Node 的執行參數。
子線程環境 env 和 SHARE_ENV
- env: 父線程傳遞給子線程的環境,通過 process.env 可以獲取。
- SHARE_ENV:指定父線程和子線程可以共享環境變量
總結
- 作為 Web 服務,提高並發數,選擇 Cluster 更好;
- 作為腳本,希望提高並發,選擇 Worker Threads 更好;
- 當計算不是瓶頸,在某個進程或線程中,靈活異步的使用更好。
參考資料
[1]
Node 開發中使用 p-limit 限制並發原理: https://tech.bytedance.net/articles/6908747346445041671
[2]
nodejs 如何使用 cluster 榨干機器性能: https://tech.bytedance.net/articles/6906846464304447495
[3]
多線程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608
以上內容轉自https://mp.weixin.qq.com/s/cXwM_ENAjxvvwaBHEsuHbA
喜歡這篇文章?歡迎打賞~~