Node.js 是如何工作的
Node.js 使用兩種線程:event loop 處理的主線程和 worker pool 中的幾個輔助線程。
事件循環是一種機制,它采用回調(函數)並注冊它們,准備在將來的某個時刻執行。它與相關的 JavaScript 代碼在同一個線程中運行。當 JavaScript 操作阻塞線程時,事件循環也會被阻止。
工作池是一種執行模型,它產生並處理單獨的線程,然后同步執行任務,並將結果返回到事件循環。事件循環使用返回的結果執行提供的回調。
簡而言之,它負責異步 I/O操作 —— 主要是與系統磁盤和網絡的交互。它主要由諸如 fs
(I/O 密集)或 crypto
(CPU 密集)等模塊使用。工作池用 libuv 實現,當 Node 需要在 JavaScript 和 C++ 之間進行內部通信時,會導致輕微的延遲,但這幾乎不可察覺。
基於這兩種機制,我們可以編寫如下代碼:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => { if (err) { return null; } console.log(content.toString()); });
前面提到的 fs
模塊告訴工作池使用其中一個線程來讀取文件的內容,並在完成后通知事件循環。然后事件循環獲取提供的回調函數,並用文件的內容執行它。
以上是非阻塞代碼的示例,我們不必同步等待某事的發生。只需告訴工作池去讀取文件,並用結果去調用提供的函數即可。由於工作池有自己的線程,因此事件循環可以在讀取文件時繼續正常執行。
在不需要同步執行某些復雜操作時,這一切都相安無事:任何運行時間太長的函數都會阻塞線程。如果應用程序中有大量這類功能,就可能會明顯降低服務器的吞吐量,甚至完全凍結它。在這種情況下,無法繼續將工作委派給工作池。
在需要對數據進行復雜的計算時(如AI、機器學習或大數據)無法真正有效地使用 Node.js,因為操作阻塞了主(且唯一)線程,使服務器無響應。在 Node.js v10.5.0 發布之前就是這種情況,在這一版本增加了對多線程的支持。
worker_threads
worker_threads
模塊允許我們創建功能齊全的多線程 Node.js 程序。
thread worker 是在單獨的線程中生成的一段代碼(通常從文件中取出)。
注意,術語 thread worker,worker 和 thread 經常互換使用,他們都指的是同一件事。
要想使用 thread worker,必須導入 worker_threads
模塊。讓我們先寫一個函數來幫助我們生成這些thread worker,然后再討論它們的屬性。
type WorkerCallback = (err: any, result?: any) => any; export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }
要創建一個 worker,首先必須創建一個 Worker
類的實例。它的第一個參數提供了包含 worker 的代碼的文件的路徑;第二個參數提供了一個名為 workerData
的包含一個屬性的對象。這是我們希望線程在開始運行時可以訪問的數據。
請注意:不管你是用的是 JavaScript, 還是最終要轉換為 JavaScript 的語言(例如,TypeScript),路徑應該始終引用帶有 .js
或 .mjs
擴展名的文件。
我還想指出為什么使用回調方法,而不是返回在觸發 message
事件時將解決的 promise。這是因為 worker 可以發送許多 message
事件,而不是一個。
正如你在上面的例子中所看到的,線程間的通信是基於事件的,這意味着我們設置了 worker 在發送給定事件后調用的偵聽器。
以下是最常見的事件:
worker.on('error', (error) => {});
只要 worker 中有未捕獲的異常,就會發出 error
事件。然后終止 worker,錯誤可以作為提供的回調中的第一個參數。
worker.on('exit', (exitCode) => {});
在 worker 退出時會發出 exit
事件。如果在worker中調用了 process.exit()
,那么 exitCode
將被提供給回調。如果 worker 以 worker.terminate()
終止,則代碼為1。
worker.on('online', () => {});
只要 worker 停止解析 JavaScript 代碼並開始執行,就會發出 online
事件。它不常用,但在特定情況下可以提供信息。
worker.on('message', (data) => {});
只要 worker 將數據發送到父線程,就會發出 message
事件。
現在讓我們來看看如何在線程之間共享數據。
在線程之間交換數據
要將數據發送到另一個線程,可以用 port.postMessage()
方法。它的原型如下:
port.postMessage(data[, transferList])
port 對象可以是 parentPort
,也可以是 MessagePort
的實例 —— 稍后會詳細講解。
數據參數
第一個參數 —— 這里被稱為 data
—— 是一個被復制到另一個線程的對象。它可以是復制算法所支持的任何內容。
數據由結構化克隆算法進行復制(包含function的對象引用都會報錯DataCloneError:xxxx could not be cloned)。引用自 Mozilla:
它通過遞歸輸入對象來進行克隆,同時保持之前訪問過的引用的映射,以避免無限遍歷循環。
該算法不復制函數、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式復制對象與使用 JSON 不同,因為它可以包含循環引用和類型化數組,而 JSON 不能。
由於能夠復制類型化數組,該算法可以在線程之間共享內存。
實例:
1、代碼
server.js
const express = require('express'); const ws = require('ws'); const convertMessage = require('./worker');//引入worker中的方法 const app = express() const wsServer = new ws.Server({ noServer: true }); wsServer.on('connection', (socket, req) => { socket.on('message', message => { console.log(message); }); }); const port = 3002 app.get('/test', (req, res) => { //1.接收到test請求,調用convertMessage,發起子線程 convertMessage().then(() => {
//5.converMessage resove后向客戶端發送success res.send('success') }) }) app.get('/', async (req, res) => { res.send('Hello World!') }) //啟動服務 const server = app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`) }) server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }); });
worker.js
const { Worker, workerData } = require('worker_threads') module.exports = function convertMessage() { return new Promise((resolve, reject) => { //2.子線程中執行./index.js文件 const worker = new Worker('./index.js'); worker.on('message', (message) => { //4.接收到子線程通過postMessage傳回的message console.log(message) resolve() }); worker.on('error', reject); worker.on('exit', (code) => { //子線程執行完成后觸發exit事件 if (code !== 0) { reject(new Error(`Worker stopped with exit code ${code}`)); } }) }) }
index.js
const { parentPort, workerData } = require('worker_threads') for (let i = 0; i < 100; i++) { //3.通過主線程的parentPort,向主線程發送消息 parentPort.postMessage(`index.js執行中${i}`) }
2、測試結果
后端通過node server.js啟動服務,前端通過http://localhost:3002/test發起請求:
后端log如下:
前端結果: