前言
最近用Egg作為底層框架開發項目,好奇其多進程模型的管理實現,於是學習了解了一些東西,順便記錄下來。文章如有錯誤, 請輕噴
為什么需要多進程
伴隨科技的發展, 現在的服務器基本上都是多核cpu
的了。然而,Node是一個單進程單線程
語言(對於開發者來說是單線程,實際上不是)。我們都知道,cpu的調度單位是線程
,而基於Node的特性,那么我們每次只能利用一個cpu。這樣不僅僅利用率極低,而且容錯更是不能接受(出錯時會崩潰整個程序)。所以,Node有了cluster來協助我們充分利用服務器的資源。
cluster工作原理
關於cluster的工作原理推薦大家看這篇文章,這里簡單總結一下:
- 子進程的端口監聽會被
hack
掉,而是統一由master的內部TCP監聽
,所以不會出現多個子進程監聽同一端口而報錯的現象。 請求統一經過master的內部TCP
,TCP的請求處理邏輯中,會挑選一個worker進程向其發送一個newconn內部消息,隨消息發送客戶端句柄
。(這里的挑選有兩種方式,第一種是除Windows外所有平台的默認方法循環法,即由主進程負責監聽端口,接收新連接后再將連接循環分發給工作進程。在分發中使用了一些內置技巧防止工作進程任務過載。第二種是主進程創建監聽socket后發送給感興趣的工作進程,由工作進程負責直接接收連接。)- worker進程收到句柄后,
創建客戶端實例(net.socket)執行具體的業務邏輯
,然后返回。
如圖:
圖引用出處
多進程模型
先看一下Egg官方文檔的進程模型
+--------+ +-------+
| Master |<-------->| Agent |
+--------+ +-------+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
類型 | 進程數量 | 作用 | 穩定性 | 是否運行業務代碼 |
---|---|---|---|---|
Master | 1 | 進程管理,進程間消息轉發 | 非常高 | 否 |
Agent | 1 | 后台運行工作(長連接客戶端) | 高 | 少量 |
Worker | 一般為cpu核數 | 執行業務代碼 | 一般 | 是 |
大致上就是利用Master
作為主線程,啟動Agent
作為秘書進程協助Worker
處理一些公共事務(日志之類),啟動Worker
進程執行真正的業務代碼。
多進程的實現
流程相關代碼
首先從Master
入手,這里暫時認為Master是最頂級的進程(事實上還有一個parent
進程,待會再說)。
/**
* start egg app
* @method Egg#startCluster
* @param {Object} options {@link Master}
* @param {Function} callback start success callback
*/
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
先從Master的構造函數
看起
constructor(options) {
super();
// 初始化參數
this.options = parseOptions(options);
// worker進程的管理類 詳情見 Manager及Messenger篇
this.workerManager = new Manager();
// messenger類, 詳情見 Manager及Messenger篇
this.messenger = new Messenger(this);
// 設置一個ready事件 詳情見get-ready npm包
ready.mixin(this);
// 是否為生產環境
this.isProduction = isProduction();
this.agentWorkerIndex = 0;
// 是否關閉
this.closed = false;
...
接下來看的是ready的回調函數及注冊的各類事件:
this.ready(() => {
// 將開始狀態設置為true
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s (%sms)%s',
frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
// 發送egg-ready至各個進程並觸發相關事件
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
this.workerManager.startCheck();
});
// 注冊各類事件
this.on('agent-exit', this.onAgentExit.bind(this));
this.on('agent-start', this.onAgentStart.bind(this));
...
// 檢查端口並 Fork一個Agent
detectPort((err, port) => {
...
this.forkAgentWorker();
}
});
}
綜上, 可以看到Master的構造函數主要是初始化和注冊各類相應的事件
, 最后運行的是forkAgentWorker
函數, 該函數的關鍵代碼可以看到:
const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
// 通過child_process執行一個Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
繼續到agent_worker.js
上面看,agent_worker
實例化一個agent
對象,agent_worker.js
有一句關鍵代碼:
agent.ready(() => {
agent.removeListener('error', startErrorHandler); // 清除錯誤監聽的事件
process.send({ action: 'agent-start', to: 'master' }); // 向master發送一個agent-start的動作
});
可以看到, agent_worker.js
中的代碼向master
發出了一個信息, 動作為agent-start
, 再回到Master
中, 可以看到其注冊了兩個事件, 分別為once的forkAppWorkers和 on的onAgentStart
this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));
先看onAgentStart
函數, 這個函數相對簡單, 就是一些信息的傳遞:
onAgentStart() {
this.agentWorker.status = 'started';
// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
}
this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
}
this.messenger.send({ action: 'agent-start', to: 'app' });
this.logger.info('[master] agent_worker#%s:%s started (%sms)',
this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}
然后會執行forkAppWorkers
函數,該函數主要是借助cfork包fork
對應的工作進程, 並注冊一系列相關的監聽事件,
...
cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env
refork: this.isProduction,
});
...
// 觸發app-start事件
cluster.on('listening', (worker, address) => {
this.messenger.send({
action: 'app-start',
data: { workerPid: worker.process.pid, address },
to: 'master',
from: 'app',
});
});
可以看到forkAppWorkers
函數在監聽Listening
事件時,會觸發master
上的app-start
事件。
this.on('app-start', this.onAppStart.bind(this));
...
// master ready回調觸發
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}
// ready回調 發送egg-ready狀態到各個進程
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
總結下:
- Master.constructor: 先執行Master的構造函數, 里面有個detect函數被執行
- Detect: Detect => forkAgentWorker()
- forkAgentWorker: 獲取Agent進程, 向master觸發agent-start事件
- 執行onAgentStart函數, 執行forkAppWorker函數(once)
- onAgentStart => 發送各類信息, forkAppWorker => 向master觸發 app-start事件
- App-start事件 觸發 onAppStart()方法
- onAppStart => 設置ready(true) => 執行ready的回調函數
- Ready() = > 發送egg-ready到各個進程並觸發相關事件, 執行startCheck()函數
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
進程守護
根據官方文檔,進程守護主要是依賴於graceful和egg-cluster這兩個庫。
未捕獲異常
- 關閉異常 Worker 進程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請求。
- Master 立刻 fork 一個新的 Worker 進程,保證在線的『工人』總數不變。
- 異常 Worker 等待一段時間,處理完已經接受的請求后退出。
+---------+ +---------+
| Worker | | Master |
+---------+ +----+----+
| uncaughtException |
+------------+ |
| | | +---------+
| <----------+ | | Worker |
| | +----+----+
| disconnect | fork a new worker |
+-------------------------> + ---------------------> |
| wait... | |
| exit | |
+-------------------------> | |
| | |
die | |
| |
| |
由執行的app文件可知, app
實際上是繼承於Application類, 該類下面調用了graceful()
。
onServer(server) {
......
graceful({
server: [ server ],
error: (err, throwErrorCount) => {
......
},
});
......
}
繼續看graceful
, 可以看到它捕獲了process.on('uncaughtException')
事件, 並在回調函數里面關閉TCP
連接, 關閉本身進程, 斷開與master
的IPC
通道。
process.on('uncaughtException', function (err) {
......
// 對http連接設置 Connection: close響應頭
servers.forEach(function (server) {
if (server instanceof http.Server) {
server.on('request', function (req, res) {
// Let http server set `Connection: close` header, and close the current request socket.
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if (!res._header) {
res.setHeader('Connection', 'close');
}
});
}
});
// 設置一個定時函數關閉子進程, 並退出本身進程
// make sure we close down within `killTimeout` seconds
var killtimer = setTimeout(function () {
console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
if (process.env.NODE_ENV !== 'test') {
// kill children by SIGKILL before exit
killChildren(function() {
// 退出本身進程
process.exit(1);
});
}
}, killTimeout);
// But don't keep the process open just for that!
// If there is no more io waitting, just let process exit normally.
if (typeof killtimer.unref === 'function') {
// only worked on node 0.10+
killtimer.unref();
}
var worker = options.worker || cluster.worker;
// cluster mode
if (worker) {
try {
// 關閉TCP連接
for (var i = 0; i < servers.length; i++) {
var server = servers[i];
server.close();
}
} catch (er1) {
......
}
try {
// 關閉ICP通道
worker.disconnect();
} catch (er2) {
......
}
}
});
ok, 關閉了IPC
通道后, 我們繼續看cfork
文件, 即上面提到的fork worker
的包, 里面監聽了子進程的disconnect
事件, 他會根據條件判斷是否重新fork
一個新的子進程
cluster.on('disconnect', function (worker) {
......
// 存起該pid
disconnects[worker.process.pid] = utility.logDate();
if (allow()) {
// fork一個新的子進程
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
});
一般來說, 這個時候會繼續等待一會然后就執行了上面說到的定時函數了, 即退出進程
。
OOM、系統異常
關於這種系統異常
, 有時候在子進程中是不能捕獲到
的, 我們只能在master中進行處理, 也就是cfork
包。
cluster.on('exit', function (worker, code, signal) {
// 是程序異常的話, 會通過上面提到的uncatughException重新fork一個子進程, 所以這里就不需要了
var isExpected = !!disconnects[worker.process.pid];
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
// 是master殺死的子進程, 無需fork
if (worker.disableRefork) {
// worker is killed by master
return;
}
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
cluster.emit('unexpectedExit', worker, code, signal);
});
進程間通信(IPC)
上面一直提到各種進程間通信,細心的你可能已經發現 cluster 的 IPC 通道只存在於 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉發。
廣播消息: agent => all workers
+--------+ +-------+
| Master |<---------| Agent |
+--------+ +-------+
/ | \
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
指定接收方: one worker => another worker
+--------+ +-------+
| Master |----------| Agent |
+--------+ +-------+
^ |
send to / |
worker 2 / |
/ |
/ v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
在master
中, 可以看到當agent和app被fork時
, 會監聽他們的信息, 同時將信息轉化成一個對象:
agentWorker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'agent';
this.messenger.send(msg);
});
worker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'app';
this.messenger.send(msg);
});
可以看到最后調用的是messenger.send
, 而messengeer.send就是根據from和to來決定將信息發送到哪里
send(data) {
if (!data.from) {
data.from = 'master';
}
......
// app -> master
// agent -> master
if (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return;
}
// parent -> master -> agent
// app -> master -> agent,可能不指定 to
if (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
master
則是直接根據action
信息emit
對應的注冊事件
sendToMaster(data) {
this.master.emit(data.action, data.data);
}
而agent和worker則是通過一個sendmessage
包, 實際上就是調用下面類似的方法
// 將信息傳給子進程
agent.send(data)
worker.send(data)
最后, 在agent和app都繼承的基礎類EggApplication
上, 調用了Messenger
類, 該類內部的構造函數如下:
constructor() {
super();
......
this._onMessage = this._onMessage.bind(this);
process.on('message', this._onMessage);
}
_onMessage(message) {
if (message && is.string(message.action)) {
// 和master一樣根據action信息emit對應的注冊事件
this.emit(message.action, message.data);
}
}
總結一下:
思路就是利用事件機制和IPC通道來達到各個進程之間的通信。
其他
學習過程中有遇到一個timeout.unref()的函數, 關於該函數推薦大家參考這個問題的6樓回答
總結
從前端思維轉到后端思維其實還是很吃力的,加上Egg的進程管理實現確實非常厲害, 所以花了很多時間在各種api和思路思考上。