Nodejs 多进程多线程和线程通信以及应用和原理


Nodejs以事件驱动、非阻塞式I/O的模型,擅长IO密集型操作。

早期版本提供了child_processcluster(V0.6.0)来提供多进程的支持。
v10版本实验性的引入worker_threads,Nodejs具有多线程的支持,终于在v12.11.0正式稳定

下面讲解Nodejs多进程,多线程应用,后续在补充进程,线程实现方式

child_process 进程

const spawn = require('child_process').spawn
const ls = spawn('ls',['-lh','/user']) // ls -lh /user
ls.stdout.on('data',data => {
    console.log(`stdout: ${data}`)
})
ls.stderr.on('data', (data) => {
    console.log(`stderr: ${data}`)
})
ls.on('close', (code) => {
    console.log(`child process exited with code ${code}`)
})
  • .exec()、.execFile()、.fork()底层都是通过.spawn()实现的
  • 上面都是异步方法,有相应的同步方法,例如在子进程调用shell,需要使用同步的方式

下面是fork的使用,以及进程通信

// main.js
var child_process = require('child_process')
var child = child_process.fork('./child.js',{
    silent: false
})
child.on('message', function (m) {
    console.log('message from child: ' + JSON.stringify(m))
})
// child.js
process.on('message', function(m){
    console.log('message from parent: ' + JSON.stringify(m))
})
process.send({from: 'child'})

结果

> node main.js
message from child: {"from":"child"}
message from parent: {"from":"parent"}

cluster 进程

var cluster = require('cluster')
var http = require('http')
var numCPUs = require('os').cpus().length
if (cluster.isMaster) {
    console.log(`[master] master start...`)
    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork()
        worker.send(`hello I am master`)
    }
    cluster.on('exit', function (worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died')
        // cluster.fork() // 监听work进程是否退出,退出就新建进程
    })
} else {
    process.on('message',function(p) {
        console.log(`[worker]: ${p}`)
        process.send('hi i am worker')
        process.exit(0) // 手动推出worker进程
    })
}

worker_threads 线程

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');
function mainThread() {
  const worker = new Worker(__filename, { workerData: 0 });
  worker.on('exit', code => { console.log(`main: worker stopped with exit code ${code}`); });
  worker.on('message', msg => {
    console.log(`main: receive ${msg}`);
    worker.postMessage(msg + 1);
  });
}
function workerThread() {
  console.log(`worker: threadId ${threadId} start with ${__filename}`);
  console.log(`worker: workerDate ${workerData}`);
  parentPort.on('message', msg => {
    console.log(`worker: receive ${msg}`);
    if (msg === 5) { process.exit(); }
    parentPort.postMessage(msg);
  }),
  parentPort.postMessage(workerData);
}
 
if (isMainThread) {
  mainThread();
} else {
  workerThread();
}

MessageChannel 通信方式

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');
 
if (isMainThread) {
  const worker1 = new Worker(__filename);
  const worker2 = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker1.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  worker2.postMessage({ hereIsYourPort: subChannel.port2 }, [subChannel.port2]);
} else {
  parentPort.once('message', (value) => {
    value.hereIsYourPort.postMessage('hello');
    value.hereIsYourPort.on('message', msg => {
      console.log(`thread ${threadId}: receive ${msg}`);
    });
  });
}

参考
Nodejs进阶:如何玩转子进程(child_process)


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM