Node.js:進程、子進程與cluster多核處理模塊


1、process對象

process對象就是處理與進程相關信息的全局對象,不需要require引用,且是EventEmitter的實例。
獲取進程信息
process對象提供了很多的API來獲取當前進程的運行信息,例如進程運行時間、內存占用、CPU占用、進程號等,具體使用如下所示:

/**
 * 獲取當前Node.js進程信息
 */
function getProcessInfo(){
	const memUsage = process.memoryUsage();//內存使用
	const cpuUsage = process.cpuUsage();//cpu使用
	const cfg = process.config;//編譯node.js的配置信息
	const env = process.env;//用戶環境
	const pwd = process.cwd();//工作目錄
	const execPath = process.execPath;//node.exe目錄
	const pf = process.platform;//運行nodejs的操作系統平台
	const release = process.release;//nodejs發行版本
	const pid = process.pid;//nodejs進程號
	const arch = process.arch;//運行nodejs的操作系統架構
	const uptime = process.uptime();//nodejs進程運行時間
	return {
		memUsage,
		cpuUsage,
		cfg,
		env,
		pwd,
		execPath,
		pf,
		release,
		pid,
		arch,
		uptime
	}
}
console.log(getProcessInfo());

process.argv獲取命令行指令參數
使用node命令執行某個腳本時,可以在指令末尾加上參數,process.argv返回一個數組,第一個元素是process.execPath,第二個元素是被執行腳本的路徑,如下所示:

var args = process.argv;
if(!args.length){
	process.exit(0);
}else{
	console.log(args.slice(2).join('\n'));
}

執行結果如下:

E:\developmentdocument\nodejsdemo>node process-example.js a b c
a
b
c

process事件
1、exit事件,當調用process.exit()方法或者事件循環隊列沒有任何工作時便會觸發該事件,監聽的回調函數的邏輯必須是同步的,否則不會執行。如下所示:

process.on('exit',(code)=>{
	console.log(code);
	setTimeout(()=>console.log(code),1000);//不會執行
});

2、uncaughtException事件,當一個沒有被捕獲的異常冒泡到事件隊列就會觸發該事件,默認打印錯誤信息並進程退出,當uncaughtException事件有一個以上的 listener 時,會阻止 Node 結束進程。但是這種做法有內存泄露的風險,所以千萬不要這么做。如下所示:

process.on('uncaughtException',(err)=>{
	console.log(err);
});
setTimeout(()=>console.log('nihao'),1000);//1秒后會執行
a();
console.log('hehe');//不會執行

3、message事件,進程間使用childProcess.send()方法進行通信,就會觸發該事件,使用如下所示:

const cp = require('child_process').fork(`${__dirname}/test.js`);
cp.on('message',(message)=>{
	console.log('got the child message:'+message);
});
cp.send('hello child!');
//test.js
process.on('message',(message)=>{
	console.log('got the parent message:'+message);
});
process.send('hello parent');

執行結果如下:

E:\developmentdocument\nodejsdemo>node process-example.js
got the child message:hello parent
got the parent message:hello child!

process.nextTick方法
將回調函數添加到下一次事件緩存隊列中,當前事件循環都執行完畢后,所有的回調函數都會被執行,如下所示:

console.log('hello world');
setTimeout(()=>console.log('settimeout'),10);
process.nextTick(()=>console.log('nexttick'));
console.log('hello nodejs');

執行結果如下所示:

E:\developmentdocument\nodejsdemo>node process-example.js
hello world
hello nodejs
nexttick
settimeout

2、child_process模塊

通過child_process模塊可以創建子進程,從而實現多進程模式,更好地利用CPU多核計算資源。該模塊提供了四種方法創建子進程,分別是child_process.spawn()child_process.exec()child_process.execFile()child_process.fork(),這四個方法都返回一個childProcess對象,該對象實現了EventEmitter的接口,帶有stdout,stdin,stderr的對象。
child_process.spawn(command[, args][, options])方法
該方法使用command指令創建一個新進程,參數含義如下:

  • command,帶執行的命令
  • args,命令行參數數組
  • options,可選參數,為一個對象

options參數主要擁有以下屬性:

  • cwd,當前工作目錄,若沒有指定,則使用當前工作目錄
  • env,命令執行環境,默認為process.env
  • argv0,如果沒有指定command,該值會被設置為command
  • stdio,子進程標准IO配置

返回值為childProcess對象,使用如下所示:

const child_process = require('child_process');
const iconv = require('iconv-lite');
const spawn = child_process.spawn;

const buffArr = [];
let buffLen = 0;

const dirs = spawn('cmd.exe',['/C','dir']);
dirs.stdout.on('data',(data)=>{
	buffArr.push(data);
	buffLen+=data.length;
});
dirs.stderr.on('end',()=>{
	console.log(iconv.decode(Buffer.concat(buffArr,buffLen),'GBK'));
});
dirs.stderr.on('error',(err)=>{
	console.log(err);
});
dirs.on('close',(code)=>{
	console.log(code);
});

執行結果如下:

正在 Ping www.qq.com [14.17.32.211] 具有 32 字節的數據:
來自 14.17.32.211 的回復: 字節=32 時間=2ms TTL=55
來自 14.17.32.211 的回復: 字節=32 時間=2ms TTL=55
來自 14.17.32.211 的回復: 字節=32 時間=3ms TTL=55
來自 14.17.32.211 的回復: 字節=32 時間=3ms TTL=55
14.17.32.211 的 Ping 統計信息:
數據包: 已發送 = 4,已接收 = 4,丟失 = 0 (0% 丟失),
往返行程的估計時間(以毫秒為單位):
最短 = 2ms,最長 = 3ms,平均 = 2ms

如果輸出碰到亂碼的時候,可以借助iconv-lite進行轉碼即可,使用npm install iconv-lite --save
child_process.exec(command[, options][, callback])方法
新建一個shell執行command指令,並緩存產生的輸出結果,方法參數含義如下:

  • command,待執行的指令,帶獨立的參數
  • options,對象,擁有cwd,env,encoding,shell,maxBuffer等屬性
  • callback,回調函數,參數為(error,stdout,stderr),如果執行成功,error則為null,否則為Error的實例。

返回值也是childProcess對象,該方法與child_process.spawn()方法的區別在於,使用回調函數獲得子進程的輸出數據,會先將數據緩存在內存中,等待子進程執行完畢之后,再將所有的數據buffer交給回調函數,如果該數據大小超過了maxBuffer(默認為200KB),則會拋出錯誤。雖然可以通過參數maxBuffer來設置子進程的緩存大小,但是不建議這么做,因為exec()方法不合適創建返回大量數據的進程,應該就返回一些狀態碼。
使用如下所示:

exec('netstat /ano | find /C /I "tcp"',(err,stdout,stderr)=>{
	if(err) throw err;
	console.log(stdout);
	console.log(stderr);
});

child_process.execFile(file[, args][, options][, callback])方法
類似與child_process.exec()方法,不同之處是不會創建一個shell,而是直接使用指定的可執行文件創建一個新進程,更有效率,使用如下所示:

execFile('mysql',['--version'],(err,stdout,stderr)=>{
	if(err) throw err;
	console.log(stdout);
	console.log(stderr);
});

child_process.fork(modulePath[, args][, options])方法
創建一個子進程執行module,並與子進程建立IPC通道進行通信,方法返回一個childProcess對象,作為子進程的句柄,通過send()方法向子進程發送信息,監聽message事件接收子進程的消息,子進程亦同理。使用如下所示:

const fibonacci = fork('./fibonacci.js');
const n = 10;
fibonacci.on('message',(msg)=>{
	console.log(`fibonacci ${n} is:${msg.result}`);
});
fibonacci.send({n:n});
//fibonacci.js
function fibonacci(n,ac1=1,ac2=1){
	return n<=2?ac2:fibonacci(n-1,ac2,ac1+ac2);
}
process.on('message',(msg)=>{
	process.send({result:fibonacci(msg.n)})
});

child.disconnect()方法
關閉父子進程之間的IPC通道,之后父子進程不能執行通信,並會立即觸發disconnect事件,使用如下所示:

const fibonacci = fork('./fibonacci.js');
const n = 10;
fibonacci.on('message',(msg)=>{
	console.log(`fibonacci ${n} is:${msg.result}`);
	fibonacci.disconnect();
});
fibonacci.on('disconnect',()=>{
	console.log('與子進程斷開連接.');
});
fibonacci.send({n:n});
//fibonacci.js
function fibonacci(n,ac1=1,ac2=1){
	return n<=2?ac2:fibonacci(n-1,ac2,ac1+ac2);
}
process.on('message',(msg)=>{
	process.send({result:fibonacci(msg.n)})
});

執行結果:

fibonacci 10 is:55
與子進程斷開連接.

子進程主要用來做CPU密集型的工作,如fibonacci數列的計算,canvas像素處理等。

3、cluster多核處理模塊

Node.js是單線程運行的,不管你的機器有多少個內核,只能用到其中的一個,為了能利用多核計算資源,需要使用多進程來處理應用。cluster模塊讓我們可以很容易地創建一個負載均衡的集群,自動分配CPU多核資源。
使用如下所示:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;
if(cluster.isMaster){
	for(let i=0;i<cpuNums;i++){
		cluster.fork();
	}
	cluster.on('exit',(worker)=>{
		console.log(`worker${worker.id} exit.`)
	});
	cluster.on('fork',(worker)=>{
		console.log(`fork:worker${worker.id}`)
	});
	cluster.on('listening',(worker,addr)=>{
		console.log(`worker${worker.id} listening on ${addr.address}:${addr.port}`)
	});
	cluster.on('online',(worker)=>{
		console.log(`worker${worker.id} is online now`)
	});
}else{
	http.createServer((req,res)=>{
		console.log(cluster.worker.id);
		res.writeHead(200);
		res.end('hello world');
	}).listen(3000,'127.0.0.1');
}

執行結果:

fork:worker1
fork:worker2
fork:worker3
fork:worker4
worker1 is online now
worker2 is online now
worker3 is online now
worker1 listening on 127.0.0.1:3000
worker4 is online now
worker2 listening on 127.0.0.1:3000
worker3 listening on 127.0.0.1:3000
worker4 listening on 127.0.0.1:3000

cluster工作原理
如上代碼所示,master是控制進程,worker是執行進程,每個worker都是使用child_process.fork()函數創建的,因此worker與master之間通過IPC進行通信。
當worker調用用server.listen()方法時會向master進程發送一個消息,讓它創建一個服務器socket,做好監聽並分享給該worker。如果master已經有監聽好的socket,就跳過創建和監聽的過程,直接分享。換句話說,所有的worker監聽的都是同一個socket,當有新連接進來的時候,由負載均衡算法選出一個worker進行處理。
cluster對象的屬性和方法
cluster.isMaster:標志是否master進程,為true則是
cluster.isWorker:標志是否worker進程,為true則是
cluster.worker:獲得當前的worker對象,在master進程中使用無效
cluster.workers: 獲得集群中所有存活的worker對象,子啊worker進程使用無效
cluster.fork(): 創建工作進程worker
cluster.disconnect([callback]): 斷開所有worker進程通信
*cluster對象的事件
Event: 'fork': 監聽創建worker進程事件
Event: 'online': 監聽worker創建成功事件
Event: 'listening': 監聽worker進程進入監聽事件
Event: 'disconnect': 監聽worker斷開事件
Event: 'exit': 監聽worker退出事件
Event: 'message':監聽worker進程發送消息事件
使用如下所示:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;
/*process.env.NODE_DEBUG='net';*/
if(cluster.isMaster){
	for(let i=0;i<cpuNums;i++){
		cluster.fork();
	}
	cluster.on('exit',(worker)=>{
		console.log(`worker${worker.id} exit.`)
	});
	cluster.on('fork',(worker)=>{
		console.log(`fork:worker${worker.id}`)
	});

	cluster.on('disconnect',(worker)=>{
		console.log(`worker${worker.id} is disconnected.`)
	});
	cluster.on('listening',(worker,addr)=>{
		console.log(`worker${worker.id} listening on ${addr.address}:${addr.port}`)
	});
	cluster.on('online',(worker)=>{
		console.log(`worker${worker.id} is online now`)
	});

	cluster.on('message',(worker,msg)=>{
		console.log(`got the worker${worker.id}'s msg:${msg}`);
	});

	Object.keys(cluster.workers).forEach((id)=>{
		cluster.workers[id].send(`hello worker${id}`);
	});
}else{
	process.on('message',(msg)=>{
		console.log('worker'+cluster.worker.id+' got the master msg:'+msg);
	});
	process.send('hello master, I am worker'+cluster.worker.id);
	http.createServer((req,res)=>{
		res.writeHead(200);
		res.end('hello world'+cluster.worker.id);
	}).listen(3000,'127.0.0.1');
}

執行結果如下:

fork:worker1
fork:worker2
fork:worker3
fork:worker4
worker1 is online now
worker2 is online now
got the worker1's msg:hello master, I am worker1
worker1 got the master msg:hello worker1
worker1 listening on 127.0.0.1:3000
worker4 is online now
got the worker2's msg:hello master, I am worker2
worker2 got the master msg:hello worker2
worker3 is online now
worker2 listening on 127.0.0.1:3000
got the worker4's msg:hello master, I am worker4
worker4 got the master msg:hello worker4
worker4 listening on 127.0.0.1:3000
got the worker3's msg:hello master, I am worker3
worker3 got the master msg:hello worker3
worker3 listening on 127.0.0.1:3000

在win7環境下,cluster負載均衡情況,如下所示:
服務端代碼:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;

if(cluster.isMaster){
	var i = 0;
	const widArr = [];
	for(let i=0;i<cpuNums;i++){
		cluster.fork();
	}
	cluster.on('message',(worker,msg)=>{
		if(msg === 'ex'){
			i++;
			widArr.push(worker.id);
			(i>=80)&&(process.exit(0));
		}
	});
	process.on('exit', (code) => {
		console.log(analyzeArr(widArr));
	});
	//統計每個worker被調用的次數
	function analyzeArr(arr) {
		let obj = {};
		arr.forEach((id, idx, arr) => {
			obj['work' + id] = obj['work' + id] !== void 0 ? obj['work' + id] + 1 : 1;
		});
		return obj;
	}

}else{
	http.createServer((req,res)=>{
		console.log(`worker${cluster.worker.id}`);
		process.send('ex');
		res.writeHead(200);
		res.end('hello world'+cluster.worker.id);
	}).listen(3000,'127.0.0.1');
}

使用Apache的AB命令進行測試,並發40,總共80:C:\Users\learn>ab -c 40 -n 80 http://127.0.0.1:3000/
測試結果:

{ work4: 19, work3: 20, work1: 19, work2: 22 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM