Node.js 多線程——worker_threads


Node.js 是如何工作的

Node.js 使用兩種線程:event loop 處理的主線程和 worker pool 中的幾個輔助線程。

事件循環是一種機制,它采用回調(函數)並注冊它們,准備在將來的某個時刻執行。它與相關的 JavaScript 代碼在同一個線程中運行。當 JavaScript 操作阻塞線程時,事件循環也會被阻止。

工作池是一種執行模型,它產生並處理單獨的線程,然后同步執行任務,並將結果返回到事件循環。事件循環使用返回的結果執行提供的回調。

簡而言之,它負責異步 I/O操作 —— 主要是與系統磁盤和網絡的交互。它主要由諸如 fs(I/O 密集)或 crypto(CPU 密集)等模塊使用。工作池用 libuv 實現,當 Node 需要在 JavaScript 和 C++ 之間進行內部通信時,會導致輕微的延遲,但這幾乎不可察覺。

基於這兩種機制,我們可以編寫如下代碼:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

前面提到的 fs 模塊告訴工作池使用其中一個線程來讀取文件的內容,並在完成后通知事件循環。然后事件循環獲取提供的回調函數,並用文件的內容執行它。

以上是非阻塞代碼的示例,我們不必同步等待某事的發生。只需告訴工作池去讀取文件,並用結果去調用提供的函數即可。由於工作池有自己的線程,因此事件循環可以在讀取文件時繼續正常執行。

在不需要同步執行某些復雜操作時,這一切都相安無事:任何運行時間太長的函數都會阻塞線程。如果應用程序中有大量這類功能,就可能會明顯降低服務器的吞吐量,甚至完全凍結它。在這種情況下,無法繼續將工作委派給工作池。

在需要對數據進行復雜的計算時(如AI、機器學習或大數據)無法真正有效地使用 Node.js,因為操作阻塞了主(且唯一)線程,使服務器無響應。在 Node.js v10.5.0 發布之前就是這種情況,在這一版本增加了對多線程的支持。

worker_threads

worker_threads 模塊允許我們創建功能齊全的多線程 Node.js 程序。

thread worker 是在單獨的線程中生成的一段代碼(通常從文件中取出)。

注意,術語 thread workerworker 和 thread 經常互換使用,他們都指的是同一件事。

要想使用 thread worker,必須導入 worker_threads 模塊。讓我們先寫一個函數來幫助我們生成這些thread worker,然后再討論它們的屬性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要創建一個 worker,首先必須創建一個 Worker 類的實例。它的第一個參數提供了包含 worker 的代碼的文件的路徑;第二個參數提供了一個名為 workerData 的包含一個屬性的對象。這是我們希望線程在開始運行時可以訪問的數據。

請注意:不管你是用的是 JavaScript, 還是最終要轉換為 JavaScript 的語言(例如,TypeScript),路徑應該始終引用帶有 .js 或 .mjs 擴展名的文件。

我還想指出為什么使用回調方法,而不是返回在觸發 message 事件時將解決的 promise。這是因為 worker 可以發送許多 message 事件,而不是一個。

正如你在上面的例子中所看到的,線程間的通信是基於事件的,這意味着我們設置了 worker 在發送給定事件后調用的偵聽器。

以下是最常見的事件:

worker.on('error', (error) => {});

只要 worker 中有未捕獲的異常,就會發出 error 事件。然后終止 worker,錯誤可以作為提供的回調中的第一個參數。

worker.on('exit', (exitCode) => {});

在 worker 退出時會發出 exit 事件。如果在worker中調用了 process.exit(),那么 exitCode 將被提供給回調。如果 worker 以 worker.terminate() 終止,則代碼為1。

worker.on('online', () => {});

只要 worker 停止解析 JavaScript 代碼並開始執行,就會發出 online 事件。它不常用,但在特定情況下可以提供信息。

worker.on('message', (data) => {});

只要 worker 將數據發送到父線程,就會發出 message 事件。

現在讓我們來看看如何在線程之間共享數據。

在線程之間交換數據

要將數據發送到另一個線程,可以用 port.postMessage() 方法。它的原型如下:

port.postMessage(data[, transferList])

port 對象可以是 parentPort,也可以是 MessagePort 的實例 —— 稍后會詳細講解。

數據參數

第一個參數 —— 這里被稱為 data —— 是一個被復制到另一個線程的對象。它可以是復制算法所支持的任何內容。

數據由結構化克隆算法進行復制(包含function的對象引用都會報錯DataCloneError:xxxx could not be cloned)。引用自 Mozilla:

它通過遞歸輸入對象來進行克隆,同時保持之前訪問過的引用的映射,以避免無限遍歷循環。

該算法不復制函數、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式復制對象與使用 JSON 不同,因為它可以包含循環引用和類型化數組,而 JSON 不能。

由於能夠復制類型化數組,該算法可以在線程之間共享內存。

實例:

1、代碼

server.js

const express = require('express');
const ws = require('ws');
const convertMessage = require('./worker');//引入worker中的方法

const app = express()
const wsServer = new ws.Server({ noServer: true });
wsServer.on('connection', (socket, req) => {
    socket.on('message', message => {
        console.log(message);
    });
});
const port = 3002
app.get('/test', (req, res) => {
    //1.接收到test請求,調用convertMessage,發起子線程
    convertMessage().then(() => {
//5.converMessage resove后向客戶端發送success res.send(
'success') }) }) app.get('/', async (req, res) => { res.send('Hello World!') }) //啟動服務 const server = app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`) }) server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }); });

 

worker.js

const { Worker, workerData } = require('worker_threads')

module.exports = function convertMessage() {
    return new Promise((resolve, reject) => {
        //2.子線程中執行./index.js文件
        const worker = new Worker('./index.js');
        worker.on('message', (message) => {
            //4.接收到子線程通過postMessage傳回的message
            console.log(message)
            resolve()
        });
        worker.on('error', reject);
        worker.on('exit', (code) => {
            //子線程執行完成后觸發exit事件
            if (code !== 0) {
                reject(new Error(`Worker stopped with exit code ${code}`));
            }
        })
    })
}

index.js

const { parentPort, workerData } = require('worker_threads')

for (let i = 0; i < 100; i++) {    
    //3.通過主線程的parentPort,向主線程發送消息
    parentPort.postMessage(`index.js執行中${i}`)
}

2、測試結果

后端通過node server.js啟動服務,前端通過http://localhost:3002/test發起請求:

后端log如下:

 

前端結果:

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM