補充:JS部分和C++代碼見
WebRTC進階流媒體服務器開發(三)Mediasoup源碼分析之應用層(代碼組成、Server.js、Room.js)
mediasoup C++:C++部分,用於處理流媒體傳輸,包括lib與worker兩部分
--->lib:一些js文件組成,主要用於對mediasoup的管理工作
--->worker:C++核心代碼

一:Mediasoup啟動詳解---JS部分
見:WebRTC進階流媒體服務器開發(三)Mediasoup源碼分析之應用層(代碼組成、Server.js、Room.js)
(一)啟動文件server.js
run(); async function run() {// Run a mediasoup Worker. await runMediasoupWorkers(); //啟動進程 }
async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; for (let i = 0; i < numWorkers; ++i) //創建進程,按照CPU核數 { const worker = await mediasoup.createWorker( //創建進程 { logLevel : config.mediasoup.workerSettings.logLevel, logTags : config.mediasoup.workerSettings.logTags, rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort), rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort) }); worker.on('died', () => { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); //放入數組中
// Log worker resource usage every X seconds.
setInterval(async () =>
{
const usage = await worker.getResourceUsage(); //定時去獲取worker進程的信息----內部包含通過channel通信過程!!!
logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
}, 120000);
}
}
(二)調用index.js中的createWorker方法創建進程
async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) { logger.debug('createWorker()'); if (appData && typeof appData !== 'object') throw new TypeError('if given, appData must be an object'); const worker = new Worker_1.Worker({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }); return new Promise((resolve, reject) => { worker.on('@success', () => { // Emit observer event. observer.safeEmit('newworker', worker); resolve(worker); }); worker.on('@failure', reject); }); } exports.createWorker = createWorker;
(二)調用Worker.js中的Worker類創建進程
該文件存在於:
類方法概括:
class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter { constructor()——構造函數 get pid()——獲得Worker進程的ID get closed()——確認Worker是否關閉 get appData()——返回custom的數據 set appData()——當設置無效時拋出異常信息 get observer()——開啟觀察者模式 close()——關閉Worker async dump()——轉存Worker async getResourceUsage()——獲得worker進程資源使用信息 async updateSettings()——更新設置 async createRouter()——創建房間 }
類主要方法分析:
const workerBin = process.env.MEDIASOUP_WORKER_BIN //判斷是否存在環境變量 ? process.env.MEDIASOUP_WORKER_BIN : process.env.MEDIASOUP_BUILDTYPE === 'Debug' //是否為debug ? path.join(__dirname, '..', 'worker', 'out', 'Debug', 'mediasoup-worker') : path.join(__dirname, '..', 'worker', 'out', 'Release', 'mediasoup-worker'); //查看該目錄下的文件,是啟動文件!!!

class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter { constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) { //調用構造函數 super(); // Closed flag. this._closed = false; // Routers set. this._routers = new Set(); // Observer instance. this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter(); logger.debug('constructor()'); let spawnBin = workerBin; let spawnArgs = []; if (process.env.MEDIASOUP_USE_VALGRIND === 'true') { spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind'; if (process.env.MEDIASOUP_VALGRIND_OPTIONS) { spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/)); } spawnArgs.push(workerBin); } if (typeof logLevel === 'string' && logLevel) spawnArgs.push(`--logLevel=${logLevel}`); for (const logTag of (Array.isArray(logTags) ? logTags : [])) { if (typeof logTag === 'string' && logTag) spawnArgs.push(`--logTag=${logTag}`); } if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort)) spawnArgs.push(`--rtcMinPort=${rtcMinPort}`); if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort)) spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`); if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile) spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`); if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile) spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`); logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' ')); this._child = child_process_1.spawn( //底層由libuv提供,用於啟動進程!!!! spawnBin, //spawn將啟動這個程序 spawnArgs, //上面配置的其他參數 // options { //其他可選參數 env: { //定義的環境 MEDIASOUP_VERSION: '3.7.11', //版本 ...process.env }, detached: false, //啟動的進程,與node是否是分離開的---當關閉node之后,是否關閉其他子進程
// fd 0 (stdin) : Just ignore it.
// fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff.
// fd 2 (stderr) : Same as stdout.
// fd 3 (channel) : Producer Channel fd.
// fd 4 (channel) : Consumer Channel fd.
// fd 5 (channel) : Producer PayloadChannel fd.
// fd 6 (channel) : Consumer PayloadChannel fd.
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'], //輸入輸出重定向,主要通過管道通信。用於C++與JS進行通信 windowsHide: true }); this._pid = this._child.pid; //下面兩個Channel與C++進行通信!!!,可以查看后面C++代碼,發現兩個個數、類型一致!!!
this._channel = new Channel_1.Channel({ producerSocket: this._child.stdio[3], consumerSocket: this._child.stdio[4], pid: this._pid }); this._payloadChannel = new PayloadChannel_1.PayloadChannel({ // NOTE: TypeScript does not like more than 5 fds. // @ts-ignore producerSocket: this._child.stdio[5], // @ts-ignore consumerSocket: this._child.stdio[6] }); this._appData = appData; let spawnDone = false; // Listen for 'running' notification. this._channel.once(String(this._pid), (event) => { if (!spawnDone && event === 'running') { spawnDone = true; logger.debug('worker process running [pid:%s]', this._pid); this.emit('@success'); } }); this._child.on('exit', (code, signal) => { this._child = undefined; this.close(); if (!spawnDone) { spawnDone = true; if (code === 42) { logger.error('worker process failed due to wrong settings [pid:%s]', this._pid); this.emit('@failure', new TypeError('wrong settings')); } else { logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal); this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`)); } } else { logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal); this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`)); } }); this._child.on('error', (error) => { this._child = undefined; this.close(); if (!spawnDone) { spawnDone = true; logger.error('worker process failed [pid:%s]: %s', this._pid, error.message); this.emit('@failure', error); } else { logger.error('worker process error [pid:%s]: %s', this._pid, error.message); this.safeEmit('died', error); } }); // Be ready for 3rd party worker libraries logging to stdout. this._child.stdout.on('data', (buffer) => { for (const line of buffer.toString('utf8').split('\n')) { if (line) workerLogger.debug(`(stdout) ${line}`); } }); // In case of a worker bug, mediasoup will log to stderr. this._child.stderr.on('data', (buffer) => { for (const line of buffer.toString('utf8').split('\n')) { if (line) workerLogger.error(`(stderr) ${line}`); } }); } close() { if (this._closed) return; logger.debug('close()'); this._closed = true; // Kill the worker process. if (this._child) { // Remove event listeners but leave a fake 'error' hander to avoid // propagation. this._child.removeAllListeners('exit'); this._child.removeAllListeners('error'); this._child.on('error', () => { }); this._child.kill('SIGTERM'); this._child = undefined; } // Close the Channel instance. this._channel.close(); // Close the PayloadChannel instance. this._payloadChannel.close(); // Close every Router. for (const router of this._routers) { router.workerClosed(); } this._routers.clear(); // Emit observer event. this._observer.safeEmit('close'); } async createRouter({ mediaCodecs, appData = {} } = {}) { logger.debug('createRouter()'); if (appData && typeof appData !== 'object') throw new TypeError('if given, appData must be an object'); // This may throw. const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs); const internal = { routerId: uuid_1.v4() }; await this._channel.request('worker.createRouter', internal); const data = { rtpCapabilities }; const router = new Router_1.Router({ internal, data, channel: this._channel, payloadChannel: this._payloadChannel, appData }); this._routers.add(router); router.on('@close', () => this._routers.delete(router)); // Emit observer event. this._observer.safeEmit('newrouter', router); return router; } } exports.Worker = Worker;
至此,在JS部分實現了多進程的啟動!!!
二:Mediasoup啟動詳解---C++部分
下面分析進程中實現的主要業務:
(一)進入C++源文件目錄

(二)分析main方法
static constexpr int ConsumerChannelFd{ 3 }; static constexpr int ProducerChannelFd{ 4 }; static constexpr int PayloadConsumerChannelFd{ 5 }; static constexpr int PayloadProducerChannelFd{ 6 }; int main(int argc, char* argv[]) { // Ensure we are called by our Node library. if (!std::getenv("MEDIASOUP_VERSION")) //先檢測版本問題,如果為null,則表示不是由nodejs產生的,而是進入目錄中使用命令行直接產生的(但是沒有設置環境變量是無法啟動的) { MS_ERROR_STD("you don't seem to be my real father!"); std::_Exit(EXIT_FAILURE); } std::string version = std::getenv("MEDIASOUP_VERSION"); //獲取版本信息 auto statusCode = run_worker( //開始運行進程 argc, argv, version.c_str(), ConsumerChannelFd, ProducerChannelFd, PayloadConsumerChannelFd, PayloadProducerChannelFd); switch (statusCode) //判斷返回碼 { case 0: std::_Exit(EXIT_SUCCESS); case 1: std::_Exit(EXIT_FAILURE); case 42: std::_Exit(42); } }
(三)分析run_worker方法,運行worker進程
extern "C" int run_worker( int argc, char* argv[], const char* version, int consumerChannelFd, int producerChannelFd, int payloadConsumeChannelFd, int payloadProduceChannelFd) { // Initialize libuv stuff (we need it for the Channel). DepLibUV::ClassInit(); //使用libUV,進行全局初始化 // Channel socket (it will be handled and deleted by the Worker). Channel::ChannelSocket* channel{ nullptr }; //通道socket,與前面的 // PayloadChannel socket (it will be handled and deleted by the Worker). PayloadChannel::PayloadChannelSocket* payloadChannel{ nullptr }; try { channel = new Channel::ChannelSocket(consumerChannelFd, producerChannelFd); //創建數據通道,js與C++進行通信 } catch (const MediaSoupError& error) { MS_ERROR_STD("error creating the Channel: %s", error.what()); return 1; } try { payloadChannel = new PayloadChannel::PayloadChannelSocket(payloadConsumeChannelFd, payloadProduceChannelFd); //創建通道 } catch (const MediaSoupError& error) { MS_ERROR_STD("error creating the RTC Channel: %s", error.what()); return 1; } // Initialize the Logger. Logger::ClassInit(channel); //初始化日志 try { Settings::SetConfiguration(argc, argv); } catch (const MediaSoupTypeError& error) { MS_ERROR_STD("settings error: %s", error.what()); // 42 is a custom exit code to notify "settings error" to the Node library. return 42; } catch (const MediaSoupError& error) { MS_ERROR_STD("unexpected settings error: %s", error.what()); return 1; } MS_DEBUG_TAG(info, "starting mediasoup-worker process [version:%s]", version); #if defined(MS_LITTLE_ENDIAN) MS_DEBUG_TAG(info, "little-endian CPU detected"); #elif defined(MS_BIG_ENDIAN) MS_DEBUG_TAG(info, "big-endian CPU detected"); #else MS_WARN_TAG(info, "cannot determine whether little-endian or big-endian"); #endif #if defined(INTPTR_MAX) && defined(INT32_MAX) && (INTPTR_MAX == INT32_MAX) MS_DEBUG_TAG(info, "32 bits architecture detected"); #elif defined(INTPTR_MAX) && defined(INT64_MAX) && (INTPTR_MAX == INT64_MAX) MS_DEBUG_TAG(info, "64 bits architecture detected"); #else MS_WARN_TAG(info, "cannot determine 32 or 64 bits architecture"); #endif Settings::PrintConfiguration(); DepLibUV::PrintVersion(); try { // Initialize static stuff. 初始化靜態事務 DepOpenSSL::ClassInit(); DepLibSRTP::ClassInit(); DepUsrSCTP::ClassInit(); DepLibWebRTC::ClassInit(); Utils::Crypto::ClassInit(); RTC::DtlsTransport::ClassInit(); RTC::SrtpSession::ClassInit(); Channel::ChannelNotifier::ClassInit(channel); PayloadChannel::PayloadChannelNotifier::ClassInit(payloadChannel); #ifdef MS_EXECUTABLE { // Ignore some signals. IgnoreSignals(); } #endif // Run the Worker. Worker worker(channel, payloadChannel); //運行具體worker // Free static stuff. DepLibSRTP::ClassDestroy(); Utils::Crypto::ClassDestroy(); DepLibWebRTC::ClassDestroy(); RTC::DtlsTransport::ClassDestroy(); DepUsrSCTP::ClassDestroy(); DepLibUV::ClassDestroy(); // Wait a bit so pending messages to stdout/Channel arrive to the Node // process. uv_sleep(200); return 0; } catch (const MediaSoupError& error) { MS_ERROR_STD("failure exit: %s", error.what()); return 1; } }
三:進程間通信
(一)常見通信方法

管道:匿名管道進程之間必須是父子關系,有名管道可以應用於非父子關系的進程之間通信(比如socket.XX)
socket:遠端和本地通信可以通過socket通信,那么必然進程之間可以通訊。
共享內存和信號:對應數據和事件
...
(二)匿名管道(半雙工)
管道創建(含兩個文件描述符,用於讀、寫)時機,在創建子進程之前創建管道:

創建子進程:會拷貝文件描述符,3讀,4寫,兩個進程同時寫入、讀取混亂!,所以需要關閉部分不想要的操作(鎖

關閉部分描述符,實現半雙工,父子進程通信

(三)socket(全雙工)--注意箭頭
對於socket,由於是全雙工,所以進程通信之間的緩沖區與進程之間通信使用雙向箭頭:

同上面匿名管道所說,依舊混亂,所以我們依舊需要關閉部分描述符:

父進程通過描述符4發送、接受數據,子進程通過描述符3發送、接收數據!父子皆可讀寫,mediasoup使用socket通信!!
四:mediasoup下的channel創建詳細過程
(一)接:一(二)worker類構造函數
this._child = child_process_1.spawn( //js線程調用,底層由libuv提供,用於啟動進程!!!! spawnBin, //spawn將啟動這個程序 spawnArgs, //上面配置的其他參數 // options { //其他可選參數 env: { //定義的環境 MEDIASOUP_VERSION: '3.7.11', //版本 ...process.env }, detached: false, //啟動的進程,與node是否是分離開的---當關閉node之后,是否關閉其他子進程 // fd 0 (stdin) : Just ignore it. // fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff. // fd 2 (stderr) : Same as stdout. // fd 3 (channel) : Producer Channel fd. // fd 4 (channel) : Consumer Channel fd. // fd 5 (channel) : Producer PayloadChannel fd. // fd 6 (channel) : Consumer PayloadChannel fd. stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'], //輸入輸出重定向,主要通過管道通信。用於C++與JS進行通信---根據這個創建文件描述符 windowsHide: true }); this._pid = this._child.pid; //下面兩個Channel與C++進行通信!!!,可以查看后面C++代碼,發現兩個個數、類型一致!!! this._channel = new Channel_1.Channel({ // 創建的管道交給channel處理 producerSocket: this._child.stdio[3], //存放創建完成的文件描述符號 stdio[3] 用來發送數據給子進程,stdio[4] 用來接收數據。 最后管道處理都交給了Channel 類來處理了。 consumerSocket: this._child.stdio[4], pid: this._pid //進程ID }); this._payloadChannel = new PayloadChannel_1.PayloadChannel({ // NOTE: TypeScript does not like more than 5 fds. // @ts-ignore producerSocket: this._child.stdio[5], // @ts-ignore consumerSocket: this._child.stdio[6] });
補充:child_process.spawn方法

其中options.stdio 選項:用於配置在父進程和子進程之間建立的管道。
默認情況下,子進程的 stdin、 stdout 和 stderr 會被重定向到 ChildProcess 對象上相應的 subprocess.stdin、subprocess.stdout 和 subprocess.stderr 流。 這相當於將 options.stdio 設置為 ['pipe', 'pipe', 'pipe']。
當在父進程和子進程之間建立 IPC 通道,並且子進程是 Node.js 進程時,則子進程啟動時不會指向 IPC 通道(使用 unref()),直到子進程為 'disconnect' 事件或 'message' 事件注冊了事件句柄。 這使得子進程可以正常退出而不需要通過開放的 IPC 通道保持打開該進程。

worker類中的getResourceUsage方法,獲取資源情況,會去調用channel與C++進程通信!!
async getResourceUsage() { logger.debug('getResourceUsage()'); return this._channel.request('worker.getResourceUsage'); //這里可以查看與C++的通信過程!!! }
(二)分析channel.js構造函數---NodeJs和 C++ 管道通信的過程
根據上述管道了解知道,producerSocket stdio[3] 用來發送數據給子進程,consumerSocket stdio[4] 用來接收數據。傳入了Channel類構造函數作為參數!
下面考慮兩個方法,分別是構造函數和runMediasoupWorkers worker.getResourceUsage();對應的Channel類中request方法
class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter { /** * @private */ constructor({ producerSocket, consumerSocket, pid }) { //構造函數,設置事件監聽 super(); // Closed flag. this._closed = false; // Next id for messages sent to the worker process. this._nextId = 0; // Map of pending sent requests. this._sents = new Map(); logger.debug('constructor()'); this._producerSocket = producerSocket; this._consumerSocket = consumerSocket; // Read Channel responses/notifications from the worker. //處理data事件:用於接受底層C++發來的信令數據 this._consumerSocket.on('data', (buffer) => { //監聽事件,回調函數里監聽或者接受數據,真正的處理有效數據其實在 this._processMessage(JSON.parse(nsPayload)); 函數中。 if (!this._recvBuffer) { this._recvBuffer = buffer; } else { this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length); } if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) { logger.error('receiving buffer is full, discarding all data into it'); // Reset the buffer and exit. this._recvBuffer = undefined; return; } while (true) // eslint-disable-line no-constant-condition { let nsPayload; try { nsPayload = netstring.nsPayload(this._recvBuffer); } catch (error) { logger.error('invalid netstring data received from the worker process: %s', String(error)); // Reset the buffer and exit. this._recvBuffer = undefined; return; } // Incomplete netstring message. if (nsPayload === -1) return; try { // We can receive JSON messages (Channel messages) or log strings. switch (nsPayload[0]) { // 123 = '{' (a Channel JSON messsage). case 123: //處理真正有效的數據!!!!!!!! this._processMessage(JSON.parse(nsPayload.toString('utf8'))); //字符串解析為JSON數據 break; // 68 = 'D' (a debug log). case 68: logger.debug(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`); break; // 87 = 'W' (a warn log). case 87: logger.warn(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`); break; // 69 = 'E' (an error log). case 69: logger.error(`[pid:${pid} ${nsPayload.toString('utf8', 1)}`); break; // 88 = 'X' (a dump log). case 88: // eslint-disable-next-line no-console console.log(nsPayload.toString('utf8', 1)); break; default: // eslint-disable-next-line no-console console.warn(`worker[pid:${pid}] unexpected data: %s`, nsPayload.toString('utf8', 1)); } } catch (error) { logger.error('received invalid message from the worker process: %s', String(error)); } // Remove the read payload from the buffer. this._recvBuffer = this._recvBuffer.slice(netstring.nsLength(this._recvBuffer)); if (!this._recvBuffer.length) { this._recvBuffer = undefined; return; } } }); this._consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process'))); this._consumerSocket.on('error', (error) => (logger.error('Consumer Channel error: %s', String(error)))); this._producerSocket.on('end', () => (logger.debug('Producer Channel ended by the worker process'))); this._producerSocket.on('error', (error) => (logger.error('Producer Channel error: %s', String(error)))); } async request(method, internal, data) { //通過channel與C++進程進行通信!!!! this._nextId < 4294967295 ? ++this._nextId : (this._nextId = 1); const id = this._nextId; logger.debug('request() [method:%s, id:%s]', method, id); if (this._closed) throw new errors_1.InvalidStateError('Channel closed'); const request = { id, method, internal, data }; //JSON數據 const ns = netstring.nsWrite(JSON.stringify(request)); //將JSON數據轉字符串傳輸!!! if (Buffer.byteLength(ns) > NS_MESSAGE_MAX_LEN) throw new Error('Channel request too big'); // This may throw if closed or remote side ended. this._producerSocket.write(ns); 將數據寫入管道 這里真正的是進行管道寫入 return new Promise((pResolve, pReject) => { //異步 const timeout = 1000 * (15 + (0.1 * this._sents.size)); const sent = { id: id, method: method, resolve: (data2) => { if (!this._sents.delete(id)) return; clearTimeout(sent.timer); pResolve(data2); }, reject: (error) => { if (!this._sents.delete(id)) return; clearTimeout(sent.timer); pReject(error); }, timer: setTimeout(() => { if (!this._sents.delete(id)) return; pReject(new Error('Channel request timeout')); }, timeout), close: () => { clearTimeout(sent.timer); pReject(new errors_1.InvalidStateError('Channel closed')); } };
//注意_sents是一個Map數據類型!!! this._sents.set(id, sent); //用異步方式去監聽底層是否接受並處理信息,這里的確認結果和接收中的邏輯相匹配 會通過this._sents.set(id, sent); sent的里的resolve 或者 pReject 返回。發送之后會保存在一個Map對象里,等待后續消息確認回來根據對應的id進行處理。 }); }
//從下面哪段代碼中可以看出,其中處理信令又種方式一種msg帶id 一種不帶 其原因是一種是 消息確認 和 事件通知 區別。
//其中上層發送信令給底層會暫時保存起來消息確認需要攜帶id,上層才能通過id來確定是哪條信令完成。
//如果是不帶id,那么屬於事件通知,最終會調用 this.emit(msg.targetId, msg.event, msg.data); 發送出去。 _processMessage(msg) { //是JSON數據 if (msg.id) { const sent = this._sents.get(msg.id); //去處理_sents全局字典數據 if (!sent) { logger.error('received response does not match any sent request [id:%s]', msg.id); return; } if (msg.accepted) { logger.debug('request succeeded [method:%s, id:%s]', sent.method, sent.id); sent.resolve(msg.data); //去調用異步方法,request請求成功后去調用 } else if (msg.error) { logger.warn('request failed [method:%s, id:%s]: %s', sent.method, sent.id, msg.reason); switch (msg.error) { case 'TypeError': sent.reject(new TypeError(msg.reason)); break; default: sent.reject(new Error(msg.reason)); } } else { logger.error('received response is not accepted nor rejected [method:%s, id:%s]', sent.method, sent.id); } } else if (msg.targetId && msg.event) { setImmediate(() => this.emit(msg.targetId, msg.event, msg.data)); } else { logger.error('received message is not a response nor a notification'); } } } exports.Channel = Channel;
(三)C++端---接二(三)run_worker
JS首先組成JSON格式的命令最后將它轉成字符串 通過channel通道傳給C++端,C++有個接收管道接收到數據之后,再轉成JSON,最后再解析成Request(c++類) 中的一些字段,根據Methodid去處理相對應的信令。處理完消息后再生成字符串的發送給上層去確認。 通知事件是由底層主動發起的通知。
extern "C" int run_worker( int argc, char* argv[], const char* version, int consumerChannelFd, int producerChannelFd, int payloadConsumeChannelFd, int payloadProduceChannelFd) { // Initialize libuv stuff (we need it for the Channel). DepLibUV::ClassInit(); // Channel socket (it will be handled and deleted by the Worker). Channel::ChannelSocket* channel{ nullptr }; // PayloadChannel socket (it will be handled and deleted by the Worker). PayloadChannel::PayloadChannelSocket* payloadChannel{ nullptr }; try { channel = new Channel::ChannelSocket(consumerChannelFd, producerChannelFd); } catch (const MediaSoupError& error) { MS_ERROR_STD("error creating the Channel: %s", error.what()); return 1; } try { payloadChannel = new PayloadChannel::PayloadChannelSocket(payloadConsumeChannelFd, payloadProduceChannelFd); }
1.channelSocket.cpp分析,消費者、生產者通道繼承自UnixStreamSocket,通過write寫入數據到socket,上傳到JS層(最后面消息確認分析)!!通過read從JS層讀取數據到C++層(這里分析)
namespace Channel { /* Static. */ // netstring length for a 4194304 bytes payload. static constexpr size_t NsMessageMaxLen{ 4194313 }; static constexpr size_t NsPayloadMaxLen{ 4194304 }; /* Instance methods. */ ChannelSocket::ChannelSocket(int consumerFd, int producerFd) : consumerSocket(consumerFd, NsMessageMaxLen, this), producerSocket(producerFd, NsMessageMaxLen) { MS_TRACE_STD(); this->writeBuffer = static_cast<uint8_t*>(std::malloc(NsMessageMaxLen)); }
ConsumerSocket::ConsumerSocket(int fd, size_t bufferSize, Listener* listener) : ::UnixStreamSocket(fd, bufferSize, ::UnixStreamSocket::Role::CONSUMER), listener(listener) { MS_TRACE_STD(); }
UnixStreamSocket::UnixStreamSocket(int fd, size_t bufferSize, UnixStreamSocket::Role role) : bufferSize(bufferSize), role(role) { MS_TRACE_STD(); int err; this->uvHandle = new uv_pipe_t; //調用libUV庫中對象 this->uvHandle->data = static_cast<void*>(this); err = uv_pipe_init(DepLibUV::GetLoop(), this->uvHandle, 0); //初始化 if (err != 0) { delete this->uvHandle; this->uvHandle = nullptr; MS_THROW_ERROR_STD("uv_pipe_init() failed: %s", uv_strerror(err)); } err = uv_pipe_open(this->uvHandle, fd); //打開pipe,關聯fd if (err != 0) { uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose)); MS_THROW_ERROR_STD("uv_pipe_open() failed: %s", uv_strerror(err)); } if (this->role == UnixStreamSocket::Role::CONSUMER) { // Start reading. err = uv_read_start( //開始讀數據 reinterpret_cast<uv_stream_t*>(this->uvHandle), //pipe static_cast<uv_alloc_cb>(onAlloc), //分配空間 static_cast<uv_read_cb>(onRead)); //接收數據 if (err != 0) { uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose)); MS_THROW_ERROR_STD("uv_read_start() failed: %s", uv_strerror(err)); } } // NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb(). }
inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) //buf是存放空間,nread是要讀取數據大小 { auto* socket = static_cast<UnixStreamSocket*>(handle->data); //類型轉換,獲取UnixStreamSocket if (socket) socket->OnUvRead(nread, buf); //真正從buf中讀取數據 }
inline void UnixStreamSocket::OnUvRead(ssize_t nread, const uv_buf_t* /*buf*/) { MS_TRACE_STD(); if (nread == 0) return; // Data received. if (nread > 0) { // Update the buffer data length. this->bufferDataLen += static_cast<size_t>(nread); // Notify the subclass. UserOnUnixStreamRead(); //通知子類 } // Peer disconnected. else if (nread == UV_EOF || nread == UV_ECONNRESET) { this->isClosedByPeer = true; // Close local side of the pipe. Close(); // Notify the subclass. UserOnUnixStreamSocketClosed(); } // Some error. else { MS_ERROR_STD("read error, closing the pipe: %s", uv_strerror(nread)); this->hasError = true; // Close the socket. Close(); // Notify the subclass. UserOnUnixStreamSocketClosed(); } }
void ConsumerSocket::UserOnUnixStreamRead() { MS_TRACE_STD(); // Be ready to parse more than a single message in a single chunk. while (true) { if (IsClosed()) return; size_t readLen = this->bufferDataLen - this->msgStart; char* msgStart = nullptr; size_t msgLen; int nsRet = netstring_read( //讀取數據 reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen); if (nsRet != 0) //出錯,判斷原因 { switch (nsRet) { } // Error, so reset and exit the parsing loop. this->msgStart = 0; this->bufferDataLen = 0; return; } // If here it means that msgStart points to the beginning of a message // with msgLen bytes length, so recalculate readLen. readLen = reinterpret_cast<const uint8_t*>(msgStart) - (this->buffer + this->msgStart) + msgLen + 1; //真正的數據讀取 this->listener->OnConsumerSocketMessage(this, msgStart, msgLen); //機械數據為JSON數據 // If there is no more space available in the buffer and that is because // the latest parsed message filled it, then empty the full buffer. if ((this->msgStart + readLen) == this->bufferSize) { this->msgStart = 0; this->bufferDataLen = 0; } // If there is still space in the buffer, set the beginning of the next // parsing to the next position after the parsed message. else { this->msgStart += readLen; } // If there is more data in the buffer after the parsed message // then parse again. Otherwise break here and wait for more data. if (this->bufferDataLen > this->msgStart) { continue; } break; } }
解析Json為字符串
void ChannelSocket::OnConsumerSocketMessage(ConsumerSocket* /*consumerSocket*/, char* msg, size_t msgLen) { MS_TRACE_STD(); try { json jsonMessage = json::parse(msg, msg + msgLen); //解析為Json數據 auto* request = new Channel::ChannelRequest(this, jsonMessage); //將JSon數據轉換為5元組,存放在類成員變量中 // Notify the listener. try { this->listener->OnChannelRequest(this, request); //listener是worker,將上面獲取的5元組,傳入其OnChannelRequest方法中去!!!! } catch (const MediaSoupTypeError& error) { request->TypeError(error.what()); } catch (const MediaSoupError& error) { request->Error(error.what()); } // Delete the Request. delete request; } catch (const json::parse_error& error) { MS_ERROR_STD("JSON parsing error: %s", error.what()); } catch (const MediaSoupError& error) { MS_ERROR_STD("discarding wrong Channel request"); } }
2.channelRequest.cpp
ChannelRequest::ChannelRequest(Channel::ChannelSocket* channel, json& jsonRequest) //元組解析 : channel(channel) { MS_TRACE(); auto jsonIdIt = jsonRequest.find("id"); //Id胡獲取 if (jsonIdIt == jsonRequest.end() || !Utils::Json::IsPositiveInteger(*jsonIdIt)) MS_THROW_ERROR("missing id"); this->id = jsonIdIt->get<uint32_t>(); auto jsonMethodIt = jsonRequest.find("method"); //方法獲取 if (jsonMethodIt == jsonRequest.end() || !jsonMethodIt->is_string()) MS_THROW_ERROR("missing method"); this->method = jsonMethodIt->get<std::string>(); auto methodIdIt = ChannelRequest::string2MethodId.find(this->method); if (methodIdIt == ChannelRequest::string2MethodId.end()) { Error("unknown method"); MS_THROW_ERROR("unknown method '%s'", this->method.c_str()); } this->methodId = methodIdIt->second; auto jsonInternalIt = jsonRequest.find("internal"); //internal獲取 if (jsonInternalIt != jsonRequest.end() && jsonInternalIt->is_object()) this->internal = *jsonInternalIt; else this->internal = json::object(); auto jsonDataIt = jsonRequest.find("data"); //信令數據 if (jsonDataIt != jsonRequest.end() && jsonDataIt->is_object()) this->data = *jsonDataIt; else this->data = json::object(); }
3.worker.cpp中OnChannelRequest方法
inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request) { MS_TRACE(); MS_DEBUG_DEV( "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id); switch (request->methodId) //根據methodId進行相關處理!!! { case Channel::ChannelRequest::MethodId::WORKER_CLOSE: { if (this->closed) return; MS_DEBUG_DEV("Worker close request, stopping"); Close(); break; } case Channel::ChannelRequest::MethodId::WORKER_DUMP: { json data = json::object(); FillJson(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE: { json data = json::object(); FillJsonResourceUsage(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS: { Settings::HandleRequest(request); break; } case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER: { std::string routerId; // This may throw. SetNewRouterIdFromInternal(request->internal, routerId); auto* router = new RTC::Router(routerId); this->mapRouters[routerId] = router; MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str()); request->Accept(); break; } case Channel::ChannelRequest::MethodId::ROUTER_CLOSE: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); // Remove it from the map and delete it. this->mapRouters.erase(router->id); delete router; MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str()); request->Accept(); break; } // Any other request must be delivered to the corresponding Router. default: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); router->HandleRequest(request); break; } } }
以上方法解析流程,實現了worker進程從啟動,到處理信令消息的過程!!
五:MediaSoup 消息確認與事件通知
消息的確認是指上層給mediasoup底層發送消息時,底層處理完要發送消息確認給上層處理結果。
事件通知是底層的一些操作導致狀態變化要通知到到上層進行操作同步。簡單初步的看下C++是如何執行消息確認與事件通知的。
(一)返回信令確認消息給上層
見worker.cpp中OnChannelRequest方法,處理請求之后,使用Request->Accept(data);返回確認消息
inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request) { MS_TRACE(); MS_DEBUG_DEV( "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id); switch (request->methodId) //根據methodId進行相關處理!!! { case Channel::ChannelRequest::MethodId::WORKER_CLOSE: { if (this->closed) return; MS_DEBUG_DEV("Worker close request, stopping"); Close(); break; } case Channel::ChannelRequest::MethodId::WORKER_DUMP: { json data = json::object(); FillJson(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE: { json data = json::object(); FillJsonResourceUsage(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS: { Settings::HandleRequest(request); break; } case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER: { std::string routerId; // This may throw. SetNewRouterIdFromInternal(request->internal, routerId); auto* router = new RTC::Router(routerId); this->mapRouters[routerId] = router; MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str()); request->Accept(); break; } case Channel::ChannelRequest::MethodId::ROUTER_CLOSE: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); // Remove it from the map and delete it. this->mapRouters.erase(router->id); delete router; MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str()); request->Accept(); break; } // Any other request must be delivered to the corresponding Router. default: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); router->HandleRequest(request); break; } } }
2.查看ChannelRequest.cpp中的Accept方法
void ChannelRequest::Accept() { MS_TRACE(); MS_ASSERT(!this->replied, "request already replied"); this->replied = true; json jsonResponse = json::object(); jsonResponse["id"] = this->id; jsonResponse["accepted"] = true; this->channel->Send(jsonResponse); } void ChannelRequest::Accept(json& data) { MS_TRACE(); MS_ASSERT(!this->replied, "request already replied"); this->replied = true; json jsonResponse = json::object(); jsonResponse["id"] = this->id; //request id jsonResponse["accepted"] = true; //狀態 if (data.is_structured()) jsonResponse["data"] = data; this->channel->Send(jsonResponse); //發送確認消息給上層 }
3.查看ChannelSocket.cpp中send方法,向管道中寫入數據
void ChannelSocket::Send(json& jsonMessage) { MS_TRACE_STD(); if (this->producerSocket.IsClosed()) return; std::string message = jsonMessage.dump(); //json數據轉string if (message.length() > NsPayloadMaxLen) { MS_ERROR_STD("mesage too big"); return; } SendImpl(message.c_str(), message.length()); }
inline void ChannelSocket::SendImpl(const void* nsPayload, size_t nsPayloadLen) { MS_TRACE_STD(); size_t nsNumLen; if (nsPayloadLen == 0) { nsNumLen = 1; this->writeBuffer[0] = '0'; this->writeBuffer[1] = ':'; this->writeBuffer[2] = ','; } else //對數據進行判斷,將數據寫入writeBuffer中去,一會寫入管道中去!!(實際是socket) { nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1))); std::sprintf(reinterpret_cast<char*>(this->writeBuffer), "%zu:", nsPayloadLen); std::memcpy(this->writeBuffer + nsNumLen + 1, nsPayload, nsPayloadLen); this->writeBuffer[nsNumLen + nsPayloadLen + 1] = ','; } size_t nsLen = nsNumLen + nsPayloadLen + 2; this->producerSocket.Write(this->writeBuffer, nsLen); //向管道中寫入數據,從而JS層可以收到數據 }
4.由前面可以知道,是從消費者socket讀取數據,返回確認消息通過生產者socket寫入即可上傳給JS層!!!
ProducerSocket::ProducerSocket(int fd, size_t bufferSize) : ::UnixStreamSocket(fd, bufferSize, ::UnixStreamSocket::Role::PRODUCER) //生產者socket父類為UnixStreamSocket { MS_TRACE_STD(); }
5.查看UnixStreamSocket.cpp文件,寫入write方法,向socket寫入數據!!!重點
void UnixStreamSocket::Write(const uint8_t* data, size_t len) { MS_TRACE_STD(); if (this->closed) return; if (len == 0) return; // First try uv_try_write(). In case it can not directly send all the given data // then build a uv_req_t and use uv_write(). uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len); //數據寫入buffer中去!!! int written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1); //初始化this->uvHandle // All the data was written. Done. if (written == static_cast<int>(len)) { return; } // Cannot write any data at first time. Use uv_write(). else if (written == UV_EAGAIN || written == UV_ENOSYS) { // Set written to 0 so pendingLen can be properly calculated. written = 0; } // Any other error. else if (written < 0) { MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written)); // Set written to 0 so pendingLen can be properly calculated. written = 0; } size_t pendingLen = len - written; auto* writeData = new UvWriteData(pendingLen); writeData->req.data = static_cast<void*>(writeData); std::memcpy(writeData->store, data + written, pendingLen); buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen); int err = uv_write( //libUV中寫入數據 &writeData->req, reinterpret_cast<uv_stream_t*>(this->uvHandle), //this->ubHandle被初始化了,寫入到這個數據pipe中去!!!!向JS層寫入數據 &buffer, //數據自愛buffer中!!! 1, static_cast<uv_write_cb>(onWrite)); //回調,處理寫入結果!!! if (err != 0) { MS_ERROR_STD("uv_write() failed: %s", uv_strerror(err)); // Delete the UvSendData struct. delete writeData; } }
下面處理寫入后的結果!
inline static void onWrite(uv_write_t* req, int status) { auto* writeData = static_cast<UnixStreamSocket::UvWriteData*>(req->data); auto* handle = req->handle; auto* socket = static_cast<UnixStreamSocket*>(handle->data); // Just notify the UnixStreamSocket when error. if (socket && status != 0) socket->OnUvWriteError(status); // Delete the UvWriteData struct. delete writeData; }
(二)給上層發送通知:使用Notifier類
1.Notifier類在run_worker函數里初始化
extern "C" int run_worker( int argc, char* argv[], const char* version, int consumerChannelFd, int producerChannelFd, int payloadConsumeChannelFd, int payloadProduceChannelFd) { // Initialize static stuff. DepOpenSSL::ClassInit(); DepLibSRTP::ClassInit(); DepUsrSCTP::ClassInit(); DepLibWebRTC::ClassInit(); Utils::Crypto::ClassInit(); RTC::DtlsTransport::ClassInit(); RTC::SrtpSession::ClassInit(); Channel::ChannelNotifier::ClassInit(channel); PayloadChannel::PayloadChannelNotifier::ClassInit(payloadChannel); }
2.在ChannelNotifier.cpp中查看通知事件:
namespace Channel { /* Class variables. */ thread_local Channel::ChannelSocket* ChannelNotifier::channel{ nullptr }; /* Static methods. */ void ChannelNotifier::ClassInit(Channel::ChannelSocket* channel) { MS_TRACE(); ChannelNotifier::channel = channel; } void ChannelNotifier::Emit(const std::string& targetId, const char* event) //和下面區別就在於是否有數據,與Accept響應相似 { MS_TRACE(); MS_ASSERT(ChannelNotifier::channel, "channel unset"); json jsonNotification = json::object(); jsonNotification["targetId"] = targetId; jsonNotification["event"] = event; ChannelNotifier::channel->Send(jsonNotification); } void ChannelNotifier::Emit(const std::string& targetId, const char* event, json& data) { MS_TRACE(); MS_ASSERT(ChannelNotifier::channel, "channel unset"); json jsonNotification = json::object(); jsonNotification["targetId"] = targetId; jsonNotification["event"] = event; jsonNotification["data"] = data; ChannelNotifier::channel->Send(jsonNotification); //向pipe中寫入數據,將JSON變為字符串之后寫入!!! } } // namespace Channel
3.舉例,在哪使用Emit方法,向JS層發送數據?WebRtcTransport.cpp文件,不止下面這一個會向上通知,這里只舉這一種!
inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/) //狀態發生變化,回調使用 { MS_TRACE(); MS_DEBUG_TAG(ice, "ICE completed"); // Notify the Node WebRtcTransport. json data = json::object(); data["iceState"] = "completed"; //設置Json數據 Channel::ChannelNotifier::Emit(this->id, "icestatechange", data); //狀態發送變化,上傳通知到JS層 // If ready, run the DTLS handler. MayRunDtlsTransport(); // If DTLS was already connected, notify the parent class. if (this->dtlsTransport->GetState() == RTC::DtlsTransport::DtlsState::CONNECTED) { RTC::Transport::Connected(); } }
無論是事件通知上層或者返回消息,兩者都是通過管道傳給上層,最終都調用channel->send()!!!回到了(一)3.中去了

