Nodejs一直以單線程異步IO著稱,擅長IO密集型操作,不擅長CPU密集型操作。但是,新版的Nodejs,在不斷彌補這方面的短板。
在 Node 10.5.0,官方給出了一個實驗性質的模塊 worker_threads 給 Node 提供了真正的多線程能力
在 Node.js 12.11.0,worker_threads 模塊正式進入穩定版
至此,Nodejs算是了真正的多線程能力。進程是資源分配的最小單位,線程是CPU調度的最小單位。
1. Nodejs多線程種類
Node.js 中有三類線程 (child_process 和 cluster 的實現均為進程)
1. event loop的主線程
2. libuv的異步I/O線程池
3. worker_threads的線程
2. worker_threads的作用
worker_thread 相比進程的方案,他們與父線程公用一個進程 ID,可輕松與另一個線程共享內存(ArrayBuffer 或 SharedArrayBuffer),從而避免了額外的序列化和反序列化開銷。
但是 Worker Threads 對於 I/O 密集型操作是沒有太大的幫助的,因為異步的 I/O 操作比 worker 更有效率,Wokers 的主要作用是用於提升對於 CPU 密集型操作的性能。
3. worker_threads的線程
3.1 線程的基本用法
worker_threads也是master-work模型,有主線程和工作線程之分。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { module.exports = function parseJSAsync(script) { return new Promise((resolve, reject) => { const worker = new Worker(__filename, { workerData: script }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) reject(new Error(`工作線程使用退出碼 ${code} 停止`)); }); }); }; } else { const { parse } = require('一些 js 解析庫'); const script = workerData; parentPort.postMessage(parse(script)); }
3.2 線程間的通信
1. 共享內存
與child_process和cluster的進程不同,線程之間可以共享內存。使用ArrayBuffer或SharedArrayBuffer
2. parentPort
主要用於主子線程通信,通過經典的 on('message'), postMessage形式
if (isMainThread) { const worker = new Worker(__filename); worker.once('message', (message) => { console.log(message); // Prints 'Hello, world!'. }); worker.postMessage('Hello, world!'); } else { // When a message from the parent thread is received, send it back: parentPort.once('message', (message) => { parentPort.postMessage(message); }); }
3. MessageChannel
與 Web 工作線程和 cluster 模塊一樣,可以通過線程間的消息傳遞來實現雙向通信。 在內部,一個 Worker 具有一對內置的 MessagePort,在創建該 Worker 時它們已經相互關聯。 雖然父端的 MessagePort 對象沒有直接公開,但其功能是通過父線程的 Worker 對象上的 worker.postMessage() 和 worker.on('message') 事件公開的。
要創建自定義的消息傳遞通道(建議使用默認的全局通道,因為這樣可以促進關聯點的分離),用戶可以在任一線程上創建一個 MessageChannel 對象,並將該 MessageChannel 上的 MessagePort 中的一個通過預先存在的通道傳給另一個線程,例如全局的通道。
const assert = require('assert'); const { Worker, MessageChannel, MessagePort, isMainThread, parentPort } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename); const subChannel = new MessageChannel(); worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); subChannel.port2.on('message', (value) => { console.log('接收到:', value); }); } else { parentPort.once('message', (value) => { assert(value.hereIsYourPort instanceof MessagePort); value.hereIsYourPort.postMessage('工作線程正在發送此消息'); value.hereIsYourPort.close(); }); }