Nodejs以事件驱动、非阻塞式I/O的模型,擅长IO密集型操作。
早期版本提供了child_process
和cluster
(V0.6.0)来提供多进程的支持。
v10版本实验性的引入worker_threads
,Nodejs具有多线程的支持,终于在v12.11.0正式稳定
下面讲解Nodejs多进程,多线程应用,后续在补充进程,线程实现方式
child_process 进程
const spawn = require('child_process').spawn
const ls = spawn('ls',['-lh','/user']) // ls -lh /user
ls.stdout.on('data',data => {
console.log(`stdout: ${data}`)
})
ls.stderr.on('data', (data) => {
console.log(`stderr: ${data}`)
})
ls.on('close', (code) => {
console.log(`child process exited with code ${code}`)
})
- .exec()、.execFile()、.fork()底层都是通过.spawn()实现的
- 上面都是异步方法,有相应的同步方法,例如在子进程调用shell,需要使用同步的方式
下面是fork的使用,以及进程通信
// main.js
var child_process = require('child_process')
var child = child_process.fork('./child.js',{
silent: false
})
child.on('message', function (m) {
console.log('message from child: ' + JSON.stringify(m))
})
// child.js
process.on('message', function(m){
console.log('message from parent: ' + JSON.stringify(m))
})
process.send({from: 'child'})
结果
> node main.js
message from child: {"from":"child"}
message from parent: {"from":"parent"}
cluster 进程
var cluster = require('cluster')
var http = require('http')
var numCPUs = require('os').cpus().length
if (cluster.isMaster) {
console.log(`[master] master start...`)
for (var i = 0; i < numCPUs; i++) {
var worker = cluster.fork()
worker.send(`hello I am master`)
}
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died')
// cluster.fork() // 监听work进程是否退出,退出就新建进程
})
} else {
process.on('message',function(p) {
console.log(`[worker]: ${p}`)
process.send('hi i am worker')
process.exit(0) // 手动推出worker进程
})
}
worker_threads 线程
const {
isMainThread, parentPort, workerData, threadId,
MessageChannel, MessagePort, Worker
} = require('worker_threads');
function mainThread() {
const worker = new Worker(__filename, { workerData: 0 });
worker.on('exit', code => { console.log(`main: worker stopped with exit code ${code}`); });
worker.on('message', msg => {
console.log(`main: receive ${msg}`);
worker.postMessage(msg + 1);
});
}
function workerThread() {
console.log(`worker: threadId ${threadId} start with ${__filename}`);
console.log(`worker: workerDate ${workerData}`);
parentPort.on('message', msg => {
console.log(`worker: receive ${msg}`);
if (msg === 5) { process.exit(); }
parentPort.postMessage(msg);
}),
parentPort.postMessage(workerData);
}
if (isMainThread) {
mainThread();
} else {
workerThread();
}
MessageChannel 通信方式
const {
isMainThread, parentPort, workerData, threadId,
MessageChannel, MessagePort, Worker
} = require('worker_threads');
if (isMainThread) {
const worker1 = new Worker(__filename);
const worker2 = new Worker(__filename);
const subChannel = new MessageChannel();
worker1.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
worker2.postMessage({ hereIsYourPort: subChannel.port2 }, [subChannel.port2]);
} else {
parentPort.once('message', (value) => {
value.hereIsYourPort.postMessage('hello');
value.hereIsYourPort.on('message', msg => {
console.log(`thread ${threadId}: receive ${msg}`);
});
});
}