一.場景
Node運行在單線程下,但這並不意味着無法利用多核/多機下多進程的優勢
事實上,Node最初從設計上就考慮了分布式網絡場景:
Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.
P.S.關於Node之所以叫Node,見 Why is Node.js named Node.js?
二.創建進程
通信方式與進程產生方式有關,而Node有4種創建進程的方式: spawn()
, exec()
, execFile()
和 fork()
spawn
const { spawn } = require('child_process'); const child = spawn('pwd'); // 帶參數的形式 // const child = spawn('find', ['.', '-type', 'f']);
spawn()
返回 ChildProcess
實例, ChildProcess
同樣基於事件機制(EventEmitter API),提供了一些事件:
-
exit
:子進程退出時觸發,可以得知進程退出狀態(code
和signal
) -
disconnect
:父進程調用child.disconnect()
時觸發 -
error
:子進程創建失敗,或被kill
時觸發 -
close
:子進程的stdio
流(標准輸入輸出流)關閉時觸發 -
message
:子進程通過process.send()
發送消息時觸發,父子進程之間可以通過這種 內置的消息機制通信
可以通過 child.stdin
, child.stdout
和 child.stderr
訪問子進程的 stdio
流,這些流被關閉的時,子進程會觸發 close
事件
P.S. close
與 exit
的區別主要體現在多進程共享同一 stdio
流的場景,某個進程退出了並不意味着 stdio
流被關閉了
在子進程中, stdout/stderr
具有Readable特性,而 stdin
具有Writable特性, 與主進程的情況正好相反 :
child.stdout.on('data', (data) => { console.log(`child stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`child stderr:\n${data}`); });
利用進程 stdio
流的管道特性,就可以完成更復雜的事情,例如:
const { spawn } = require('child_process'); const find = spawn('find', ['.', '-type', 'f']); const wc = spawn('wc', ['-l']); find.stdout.pipe(wc.stdin); wc.stdout.on('data', (data) => { console.log(`Number of files ${data}`); });
作用等價於 find . -type f | wc -l
,遞歸統計當前目錄文件數量
IPC選項
另外,通過 spawn()
方法的 stdio
選項可以建立IPC機制:
const { spawn } = require('child_process'); const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] }); child.on('message', (m) => { console.log(m); }); child.send('Here Here'); // ./ipc-child.js process.on('message', (m) => { process.send(`< ${m}`); process.send('> 不要回答x3'); });
關於 spawn()
的IPC選項的詳細信息,請查看 options.stdio
exec
spawn()
方法默認不會創建shell去執行傳入的命令(所以 性能上稍微好一點 ),而 exec()
方法會創建一個shell。另外, exec()
不是基於stream的,而是把傳入命令的執行結果暫存到buffer中,再整個傳遞給回調函數
exec()
方法的特點是 完全支持shell語法 ,可以直接傳入任意shell腳本,例如:
const { exec } = require('child_process'); exec('find . -type f | wc -l', (err, stdout, stderr) => { if (err) { console.error(`exec error: ${err}`); return; } console.log(`Number of files ${stdout}`); });
但 exec()
方法也因此存在 命令注入 的安全風險,在含有用戶輸入等動態內容的場景要特別注意。所以, exec()
方法的適用場景是:希望直接使用shell語法,並且預期輸出數據量不大(不存在內存壓力)
那么,有沒有既支持shell語法,還具有stream IO優勢的方式?
有。 兩全其美的方式 如下:
const { spawn } = require('child_process'); const child = spawn('find . -type f | wc -l', { shell: true }); child.stdout.pipe(process.stdout);
開啟 spawn()
的 shell
選項,並通過 pipe()
方法把子進程的標准輸出簡單地接到當前進程的標准輸入上,以便看到命令執行結果。實際上還有更容易的方式:
const { spawn } = require('child_process'); process.stdout.on('data', (data) => { console.log(data); }); const child = spawn('find . -type f | wc -l', { shell: true, stdio: 'inherit' });
stdio: 'inherit'
允許子進程繼承當前進程的標准輸入輸出(共享 stdin
, stdout
和 stderr
),所以上例能夠通過監聽當前進程 process.stdout
的 data
事件拿到子進程的輸出結果
另外,除了 stdio
和 shell
選項, spawn()
還支持一些其它選項,如:
const child = spawn('find . -type f | wc -l', { stdio: 'inherit', shell: true, // 修改環境變量,默認process.env env: { HOME: '/tmp/xxx' }, // 改變當前工作目錄 cwd: '/tmp', // 作為獨立進程存在 detached: true });
注意 , env
選項除了以環境變量形式向子進程傳遞數據外,還可以用來實現沙箱式的環境變量隔離,默認把 process.env
作為子進程的環境變量集,子進程與當前進程一樣能夠訪問所有環境變量,如果像上例中指定自定義對象作為子進程的環境變量集,子進程就無法訪問其它環境變量
所以,想要增/刪環境變量的話,需要這樣做:
var spawn_env = JSON.parse(JSON.stringify(process.env)); // remove those env vars delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE; delete spawn_env.ELECTRON_RUN_AS_NODE; var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});
detached
選項更有意思:
const { spawn } = require('child_process'); const child = spawn('node', ['stuff.js'], { detached: true, stdio: 'ignore' }); child.unref();
以這種方式創建的獨立進程行為取決於操作系統,Windows上detached子進程將擁有自己的console窗口,而Linux上該進程會 創建新的process group (這個特性可以用來管理子進程族,實現類似於 tree-kill 的特性)
unref()
方法用來斷絕關系,這樣“父”進程可以獨立退出(不會導致子進程跟着退出),但要注意這時子進程的 stdio
也應該獨立於“父”進程,否則“父”進程退出后子進程仍會受到影響
execFile
const { execFile } = require('child_process'); const child = execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { throw error; } console.log(stdout); });
與 exec()
方法類似,但不通過shell來執行(所以性能稍好一點),所以要求傳入 可執行文件 。Windows下某些文件無法直接執行,比如 .bat
和 .cmd
,這些文件就不能用 execFile()
來執行,只能借助 exec()
或開啟了 shell
選項的 spawn()
P.S.與 exec()
一樣也 不是基於stream的 ,同樣存在輸出數據量風險
xxxSync
spawn
, exec
和 execFile
都有對應的同步阻塞版本,一直等到子進程退出
const { spawnSync, execSync, execFileSync, } = require('child_process');
同步方法用來簡化腳本任務,比如啟動流程,其它時候應該避免使用這些方法
fork
fork()
是 spawn()
的變體,用來創建Node進程,最大的特點是父子進程自帶通信機制(IPC管道):
The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.
例如:
var n = child_process.fork('./child.js'); n.on('message', function(m) { console.log('PARENT got message:', m); }); n.send({ hello: 'world' }); // ./child.js process.on('message', function(m) { console.log('CHILD got message:', m); }); process.send({ foo: 'bar' });
因為 fork()
自帶通信機制的優勢,尤其適合用來拆分耗時邏輯,例如:
const http = require('http'); const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const sum = longComputation(); return res.end(`Sum is ${sum}`); } else { res.end('Ok') } }); server.listen(3000);
這樣做的致命問題是一旦有人訪問 /compute
,后續請求都無法及時處理,因為事件循環還被 longComputation
阻塞着,直到耗時計算結束才能恢復服務能力
為了避免耗時操作阻塞主進程的事件循環,可以把 longComputation()
拆分到子進程中:
// compute.js const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; // 開關,收到消息才開始做 process.on('message', (msg) => { const sum = longComputation(); process.send(sum); });
主進程開啟子進程執行 longComputation
:
const http = require('http'); const { fork } = require('child_process'); const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const compute = fork('compute.js'); compute.send('start'); compute.on('message', sum => { res.end(`Sum is ${sum}`); }); } else { res.end('Ok') } }); server.listen(3000);
主進程的事件循環不會再被耗時計算阻塞,但進程數量還需要進一步限制,否則資源被進程消耗殆盡時服務能力仍會受到影響
P.S.實際上, cluster
模塊就是對多進程服務能力的封裝, 思路與這個簡單示例類似
三.通信方式
1.通過stdin/stdout傳遞json
stdin/stdout and a JSON payload
最直接的通信方式,拿到子進程的handle后,可以訪問其 stdio
流,然后約定一種 message
格式開始愉快地通信:
const { spawn } = require('child_process'); child = spawn('node', ['./stdio-child.js']); child.stdout.setEncoding('utf8'); // 父進程-發 child.stdin.write(JSON.stringify({ type: 'handshake', payload: '你好吖' })); // 父進程-收 child.stdout.on('data', function (chunk) { let data = chunk.toString(); let message = JSON.parse(data); console.log(`${message.type} ${message.payload}`); });
子進程與之類似:
// ./stdio-child.js // 子進程-收 process.stdin.on('data', (chunk) => { let data = chunk.toString(); let message = JSON.parse(data); switch (message.type) { case 'handshake': // 子進程-發 process.stdout.write(JSON.stringify({ type: 'message', payload: message.payload + ' : hoho' })); break; default: break; } });
P.S.VS Code進程間通信就采用了這種方式,具體見 access electron API from vscode extension
明顯的 限制 是需要拿到“子”進程的handle,兩個完全獨立的進程之間無法通過這種方式來通信(比如跨應用,甚至跨機器的場景)
P.S.關於stream及pipe的詳細信息,請查看 Node中的流
2.原生IPC支持
如 spawn()
及 fork()
的例子,進程之間可以借助內置的IPC機制通信
父進程:
-
process.on('message')
收 -
child.send()
發
子進程:
-
process.on('message')
收 -
process.send()
發
限制同上,同樣要有一方能夠拿到另一方的handle才行
3.sockets
借助網絡來完成進程間通信, 不僅能跨進程,還能跨機器
node-ipc 就采用這種方案,例如:
// server const ipc=require('../../../node-ipc'); ipc.config.id = 'world'; ipc.config.retry= 1500; ipc.config.maxConnections=1; ipc.serveNet( function(){ ipc.server.on( 'message', function(data,socket){ ipc.log('got a message : ', data); ipc.server.emit( socket, 'message', data+' world!' ); } ); ipc.server.on( 'socket.disconnected', function(data,socket){ console.log('DISCONNECTED\n\n',arguments); } ); } ); ipc.server.on( 'error', function(err){ ipc.log('Got an ERROR!',err); } ); ipc.server.start(); // client const ipc=require('node-ipc'); ipc.config.id = 'hello'; ipc.config.retry= 1500; ipc.connectToNet( 'world', function(){ ipc.of.world.on( 'connect', function(){ ipc.log('## connected to world ##', ipc.config.delay); ipc.of.world.emit( 'message', 'hello' ); } ); ipc.of.world.on( 'disconnect', function(){