Nodejs進程間通信


一.場景

Node運行在單線程下,但這並不意味着無法利用多核/多機下多進程的優勢

事實上,Node最初從設計上就考慮了分布式網絡場景:

Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.

P.S.關於Node之所以叫Node,見 Why is Node.js named Node.js?

二.創建進程

通信方式與進程產生方式有關,而Node有4種創建進程的方式: spawn() , exec() , execFile() 和 fork()

spawn

const { spawn } = require('child_process'); const child = spawn('pwd'); // 帶參數的形式 // const child = spawn('find', ['.', '-type', 'f']);

spawn() 返回 ChildProcess 實例, ChildProcess 同樣基於事件機制(EventEmitter API),提供了一些事件:

  • exit :子進程退出時觸發,可以得知進程退出狀態( code 和 signal 

  • disconnect :父進程調用 child.disconnect() 時觸發

  • error :子進程創建失敗,或被 kill 時觸發

  • close :子進程的 stdio 流(標准輸入輸出流)關閉時觸發

  • message :子進程通過 process.send() 發送消息時觸發,父子進程之間可以通過這種 內置的消息機制通信

可以通過 child.stdin , child.stdout 和 child.stderr 訪問子進程的 stdio 流,這些流被關閉的時,子進程會觸發 close 事件

P.S. close 與 exit 的區別主要體現在多進程共享同一 stdio 流的場景,某個進程退出了並不意味着 stdio 流被關閉了

在子進程中, stdout/stderr 具有Readable特性,而 stdin 具有Writable特性, 與主進程的情況正好相反 

child.stdout.on('data', (data) => { console.log(`child stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`child stderr:\n${data}`); });

利用進程 stdio 流的管道特性,就可以完成更復雜的事情,例如:

const { spawn } = require('child_process'); const find = spawn('find', ['.', '-type', 'f']); const wc = spawn('wc', ['-l']); find.stdout.pipe(wc.stdin); wc.stdout.on('data', (data) => { console.log(`Number of files ${data}`); });

作用等價於 find . -type f | wc -,遞歸統計當前目錄文件數量

IPC選項

另外,通過 spawn() 方法的 stdio 選項可以建立IPC機制:

const { spawn } = require('child_process'); const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] }); child.on('message', (m) => { console.log(m); }); child.send('Here Here'); // ./ipc-child.js process.on('message', (m) => { process.send(`< ${m}`); process.send('> 不要回答x3'); });

關於 spawn() 的IPC選項的詳細信息,請查看 options.stdio

exec

spawn() 方法默認不會創建shell去執行傳入的命令(所以 性能上稍微好一點 ),而 exec() 方法會創建一個shell。另外, exec() 不是基於stream的,而是把傳入命令的執行結果暫存到buffer中,再整個傳遞給回調函數

exec() 方法的特點是 完全支持shell語法 ,可以直接傳入任意shell腳本,例如:

const { exec } = require('child_process'); exec('find . -type f | wc -l', (err, stdout, stderr) => { if (err) { console.error(`exec error: ${err}`); return; } console.log(`Number of files ${stdout}`); });

但 exec() 方法也因此存在 命令注入 的安全風險,在含有用戶輸入等動態內容的場景要特別注意。所以, exec() 方法的適用場景是:希望直接使用shell語法,並且預期輸出數據量不大(不存在內存壓力)

那么,有沒有既支持shell語法,還具有stream IO優勢的方式?

有。 兩全其美的方式 如下:

const { spawn } = require('child_process'); const child = spawn('find . -type f | wc -l', { shell: true }); child.stdout.pipe(process.stdout);

開啟 spawn() 的 shell 選項,並通過 pipe() 方法把子進程的標准輸出簡單地接到當前進程的標准輸入上,以便看到命令執行結果。實際上還有更容易的方式:

const { spawn } = require('child_process'); process.stdout.on('data', (data) => { console.log(data); }); const child = spawn('find . -type f | wc -l', { shell: true, stdio: 'inherit' });

stdio: 'inherit' 允許子進程繼承當前進程的標准輸入輸出(共享 stdin , stdout 和 stderr ),所以上例能夠通過監聽當前進程 process.stdout 的 data 事件拿到子進程的輸出結果

另外,除了 stdio 和 shell 選項, spawn() 還支持一些其它選項,如:

const child = spawn('find . -type f | wc -l', { stdio: 'inherit', shell: true, // 修改環境變量,默認process.env env: { HOME: '/tmp/xxx' }, // 改變當前工作目錄 cwd: '/tmp', // 作為獨立進程存在 detached: true });

注意 , env 選項除了以環境變量形式向子進程傳遞數據外,還可以用來實現沙箱式的環境變量隔離,默認把 process.env 作為子進程的環境變量集,子進程與當前進程一樣能夠訪問所有環境變量,如果像上例中指定自定義對象作為子進程的環境變量集,子進程就無法訪問其它環境變量

所以,想要增/刪環境變量的話,需要這樣做:

var spawn_env = JSON.parse(JSON.stringify(process.env)); // remove those env vars delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE; delete spawn_env.ELECTRON_RUN_AS_NODE; var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});

detached 選項更有意思:

const { spawn } = require('child_process'); const child = spawn('node', ['stuff.js'], { detached: true, stdio: 'ignore' }); child.unref();

以這種方式創建的獨立進程行為取決於操作系統,Windows上detached子進程將擁有自己的console窗口,而Linux上該進程會 創建新的process group (這個特性可以用來管理子進程族,實現類似於 tree-kill 的特性)

unref() 方法用來斷絕關系,這樣“父”進程可以獨立退出(不會導致子進程跟着退出),但要注意這時子進程的 stdio 也應該獨立於“父”進程,否則“父”進程退出后子進程仍會受到影響

execFile

const { execFile } = require('child_process'); const child = execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { throw error; } console.log(stdout); });

與 exec() 方法類似,但不通過shell來執行(所以性能稍好一點),所以要求傳入 可執行文件 。Windows下某些文件無法直接執行,比如 .bat 和 .cmd ,這些文件就不能用 execFile() 來執行,只能借助 exec() 或開啟了 shell 選項的 spawn()

P.S.與 exec() 一樣也 不是基於stream的 ,同樣存在輸出數據量風險

xxxSync

spawn , exec 和 execFile 都有對應的同步阻塞版本,一直等到子進程退出

const { spawnSync, execSync, execFileSync, } = require('child_process');

同步方法用來簡化腳本任務,比如啟動流程,其它時候應該避免使用這些方法

fork

fork() 是 spawn() 的變體,用來創建Node進程,最大的特點是父子進程自帶通信機制(IPC管道):

The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.

例如:

var n = child_process.fork('./child.js'); n.on('message', function(m) { console.log('PARENT got message:', m); }); n.send({ hello: 'world' }); // ./child.js process.on('message', function(m) { console.log('CHILD got message:', m); }); process.send({ foo: 'bar' });

因為 fork() 自帶通信機制的優勢,尤其適合用來拆分耗時邏輯,例如:

const http = require('http'); const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const sum = longComputation(); return res.end(`Sum is ${sum}`); } else { res.end('Ok') } }); server.listen(3000);

這樣做的致命問題是一旦有人訪問 /compute ,后續請求都無法及時處理,因為事件循環還被 longComputation 阻塞着,直到耗時計算結束才能恢復服務能力

為了避免耗時操作阻塞主進程的事件循環,可以把 longComputation() 拆分到子進程中:

// compute.js const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; // 開關,收到消息才開始做 process.on('message', (msg) => { const sum = longComputation(); process.send(sum); });

主進程開啟子進程執行 longComputation 

const http = require('http'); const { fork } = require('child_process'); const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const compute = fork('compute.js'); compute.send('start'); compute.on('message', sum => { res.end(`Sum is ${sum}`); }); } else { res.end('Ok') } }); server.listen(3000);

主進程的事件循環不會再被耗時計算阻塞,但進程數量還需要進一步限制,否則資源被進程消耗殆盡時服務能力仍會受到影響

P.S.實際上, cluster 模塊就是對多進程服務能力的封裝, 思路與這個簡單示例類似

三.通信方式

1.通過stdin/stdout傳遞json

stdin/stdout and a JSON payload

最直接的通信方式,拿到子進程的handle后,可以訪問其 stdio 流,然后約定一種 message 格式開始愉快地通信:

const { spawn } = require('child_process'); child = spawn('node', ['./stdio-child.js']); child.stdout.setEncoding('utf8'); // 父進程-發 child.stdin.write(JSON.stringify({ type: 'handshake', payload: '你好吖' })); // 父進程-收 child.stdout.on('data', function (chunk) { let data = chunk.toString(); let message = JSON.parse(data); console.log(`${message.type} ${message.payload}`); });

子進程與之類似:

// ./stdio-child.js // 子進程-收 process.stdin.on('data', (chunk) => { let data = chunk.toString(); let message = JSON.parse(data); switch (message.type) { case 'handshake': // 子進程-發 process.stdout.write(JSON.stringify({ type: 'message', payload: message.payload + ' : hoho' })); break; default: break; } });

P.S.VS Code進程間通信就采用了這種方式,具體見 access electron API from vscode extension

明顯的 限制 是需要拿到“子”進程的handle,兩個完全獨立的進程之間無法通過這種方式來通信(比如跨應用,甚至跨機器的場景)

P.S.關於stream及pipe的詳細信息,請查看 Node中的流

2.原生IPC支持

如 spawn() 及 fork() 的例子,進程之間可以借助內置的IPC機制通信

父進程:

  • process.on('message') 

  • child.send() 

子進程:

  • process.on('message') 

  • process.send() 

限制同上,同樣要有一方能夠拿到另一方的handle才行

3.sockets

借助網絡來完成進程間通信, 不僅能跨進程,還能跨機器

node-ipc 就采用這種方案,例如:

// server const ipc=require('../../../node-ipc'); ipc.config.id = 'world'; ipc.config.retry= 1500; ipc.config.maxConnections=1; ipc.serveNet( function(){ ipc.server.on( 'message', function(data,socket){ ipc.log('got a message : ', data); ipc.server.emit( socket, 'message', data+' world!' ); } ); ipc.server.on( 'socket.disconnected', function(data,socket){ console.log('DISCONNECTED\n\n',arguments); } ); } ); ipc.server.on( 'error', function(err){ ipc.log('Got an ERROR!',err); } ); ipc.server.start(); // client const ipc=require('node-ipc'); ipc.config.id = 'hello'; ipc.config.retry= 1500; ipc.connectToNet( 'world', function(){ ipc.of.world.on( 'connect', function(){ ipc.log('## connected to world ##', ipc.config.delay); ipc.of.world.emit( 'message', 'hello' ); } ); ipc.of.world.on( 'disconnect', function(){ ipc.log('disconnected from world'); } ); ipc.of.world.on( 'message', function(data){ ipc.log('got a message from world : ', data); } ); } );

P.S.更多示例見 RIAEvangelist/node-ipc

當然,單機場景下通過網絡來完成進程間通信有些浪費性能,但網絡通信的 優勢 在於跨環境的兼容性與更進一步的RPC場景

4.message queue

父子進程都通過外部消息機制來通信,跨進程的能力取決於MQ支持

即進程間不直接通信,而是通過中間層(MQ), 加一個控制層 就能獲得更多靈活性和優勢:

  • 穩定性:消息機制提供了強大的穩定性保證,比如確認送達(消息回執ACK),失敗重發/防止多發等等

  • 優先級控制:允許調整消息響應次序

  • 離線能力:消息可以被緩存

  • 事務性消息處理:把關聯消息組合成事務,保證其送達順序及完整性

P.S.不好實現?包一層能解決嗎,不行就包兩層……

比較受歡迎的有 smrchy/rsmq ,例如:

// init RedisSMQ = require("rsmq"); rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} ); // create queue rsmq.createQueue({qname:"myqueue"}, function (err, resp) { if (resp===1) { console.log("queue created") } }); // send message rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) { if (resp) { console.log("Message sent. ID:", resp); } }); // receive message rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) { if (resp.id) { console.log("Message received.", resp) } else { console.log("No messages for me...") } });

會起一個Redis server,基本原理如下:

Using a shared Redis server multiple Node.js processes can send / receive messages.

消息的收/發/緩存/持久化依靠Redis提供的能力,在此基礎上實現完整的隊列機制

5.Redis

基本思路與message queue類似:

Use Redis as a message bus/broker.

Redis自帶 Pub/Sub機制 (即發布-訂閱模式),適用於簡單的通信場景,比如一對一或一對多並且 不關注消息可靠性 的場景

另外,Redis有list結構,可以用作消息隊列,以此提高消息可靠性。一般做法是生產者 LPUSH 消息,消費者 BRPOP 消息。適用於要求消息可靠性的簡單通信場景,但缺點是消息不具狀態,且沒有ACK機制,無法滿足復雜的通信需求

P.S.Redis的Pub/Sub示例見 What’s the most efficient node.js inter-process communication library/method?

四.總結

Node進程間通信有4種方式:

  • 通過stdin/stdout傳遞json:最直接的方式,適用於能夠拿到“子”進程handle的場景,適用於關聯進程之間通信,無法跨機器

  • Node原生IPC支持:最native(地道?)的方式,比上一種“正規”一些,具有同樣的局限性

  • 通過sockets:最通用的方式,有良好的跨環境能力,但存在網絡的性能損耗

  • 借助message queue:最強大的方式,既然要通信,場景還復雜,不妨擴展出一層消息中間件,漂亮地解決各種通信問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM