一:Mediasoup Demo分析
了解Mediasoup運行機制,以及如何調用Mediasoup核心庫
(一)Mediasoup Demo組成
其中mediasoup-demo為整個代碼框架:(包含所有)
app應用:提供客戶端所需要的應用代碼
broadcasters:用於廣播使用,用於推流的模塊。單向傳輸,只有去或者只有回
server端:信令服務和媒體流服務,兩者通過管道通信。細分為下面幾部分:
--->config.js:配置文件/js文件,通過js獲取一些基本信息。將配置信息交給servere.js使用
--->server.js:從config.js中去獲取基本信息,獲取信息之后去啟動基本服務,比如wensocket服務、信令服務
--->lib:server.js使用的庫文件,細分為以下幾部分
------->Room.js:所有的真正的信令處理邏輯都是在這里實現,還描述了房間相關信息
------->interactiveClient.js:運行時內部信息查詢客戶端,與客戶端交互(debug使用)
------->interactiveServer.js:運行時內部信息查詢服務端,與服務端交換(debug使用)
mediasoup C++:C++部分,用於處理流媒體傳輸,包括lib與worker兩部分
--->lib:一些js文件組成,主要用於對mediasoup的管理工作
--->worker:C++核心代碼
二:Server.js分析
(一)配置環境,從config.js獲取參數
process.title = 'mediasoup-demo-server'; //啟動進程之后,進程的名字 process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*'; //Debug環境變量 /*引入config.js,內部定義了一些參數: https(證書位置,監聽IP、端口)、 mediasoup(CPU個數,worker進程參數定義如日志級別、端口范圍,router:room的概念在C++表示定義了音視頻編解碼參數) webRtcTransport傳輸(IP地址、端口、輸入輸出碼率) */ const config = require('./config'); /* eslint-disable no-console */ console.log('process.env.DEBUG:', process.env.DEBUG); console.log('config.js:\n%s', JSON.stringify(config, null, ' '));
(二)引入模塊,初始化變量
const fs = require('fs'); const https = require('https'); const url = require('url'); const protoo = require('protoo-server'); const mediasoup = require('mediasoup'); //mediasoup庫 const express = require('express'); const bodyParser = require('body-parser'); const { AwaitQueue } = require('awaitqueue'); //同步隊列 const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); //房間管理 const interactiveServer = require('./lib/interactiveServer'); const interactiveClient = require('./lib/interactiveClient'); const logger = new Logger(); // Async queue to manage rooms. // @type {AwaitQueue} const queue = new AwaitQueue(); // Map of Room instances indexed by roomId. // @type {Map<Number, Room>} const rooms = new Map(); // HTTPS server. // @type {https.Server} let httpsServer; // Express application. // @type {Function} let expressApp; // Protoo WebSocket server. // @type {protoo.WebSocketServer} let protooWebSocketServer; // mediasoup Workers. // @type {Array<mediasoup.Worker>} const mediasoupWorkers = []; //數組,存放所有創建的worker進程 // Index of next mediasoup Worker to use. // @type {Number} let nextMediasoupWorkerIdx = 0;
(三)進入主函數,分析主函數run方法
run(); //進入run方法,開始執行 async function run() { // Open the interactive server. await interactiveServer(); //啟動interactive server,用於交互---不是重點 // Open the interactive client. if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1') await interactiveClient(); //啟動interactive client,用於交互---不是重點 // Run a mediasoup Worker. await runMediasoupWorkers(); //將所有的需要的進程啟動,重點!!! // Create Express app. await createExpressApp(); //https業務管理,主要用於broadcast---不是重點,但是內部創建了expressApp全局變量,在下面創建https服務中使用 // Run HTTPS server. await runHttpsServer(); //https server運行 // Run a protoo WebSocketServer. await runProtooWebSocketServer(); //啟動websocket,用於處理接受發送信令,重點!!! // Log rooms status every X seconds. setInterval(() => { for (const room of rooms.values()) { room.logStatus(); } }, 120000); }
(四)分析runMediasoupWorkers方法,啟動相關進程服務
/** * Launch as many mediasoup Workers as given in the configuration file. */ async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; //從配置中獲取要啟動的進程個數,下面進行循環創建 logger.info('running %d mediasoup Workers...', numWorkers); for (let i = 0; i < numWorkers; ++i) { const worker = await mediasoup.createWorker( //底層調用fork創建子進程,傳入相關參數 { logLevel : config.mediasoup.workerSettings.logLevel, logTags : config.mediasoup.workerSettings.logTags, rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort), rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort) }); worker.on('died', () => //每個worker進程,監聽一個退出事件 { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); //將創建完成的worker進程,存放到數組 // Log worker resource usage every X seconds. setInterval(async () => { const usage = await worker.getResourceUsage(); logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage); }, 120000); } }
查看createWorker方法:
/** * Create a Worker. */ async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) { logger.debug('createWorker()'); if (appData && typeof appData !== 'object') throw new TypeError('if given, appData must be an object'); const worker = new Worker_1.Worker({ //創建worker對象 logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }); return new Promise((resolve, reject) => { //監聽worker創建是否成功 worker.on('@success', () => { // Emit observer event. observer.safeEmit('newworker', worker); resolve(worker); //成功則直接返回 }); worker.on('@failure', reject); }); } exports.createWorker = createWorker; //模塊導出,其他文件可以使用
查看Worker類:
class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter { /** * @private * @emits died - (error: Error) * @emits @success * @emits @failure - (error: Error) */ constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) { //構造函數 super(); // Closed flag. this._closed = false; // Routers set. this._routers = new Set(); // Observer instance. this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter(); logger.debug('constructor()'); let spawnBin = workerBin; let spawnArgs = []; if (process.env.MEDIASOUP_USE_VALGRIND === 'true') { spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind'; if (process.env.MEDIASOUP_VALGRIND_OPTIONS) { spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/)); } spawnArgs.push(workerBin); } if (typeof logLevel === 'string' && logLevel) spawnArgs.push(`--logLevel=${logLevel}`); for (const logTag of (Array.isArray(logTags) ? logTags : [])) { if (typeof logTag === 'string' && logTag) spawnArgs.push(`--logTag=${logTag}`); } if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort)) spawnArgs.push(`--rtcMinPort=${rtcMinPort}`); if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort)) spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`); if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile) spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`); if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile) spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`); logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' ')); this._child = child_process_1.spawn( //傳入參數,啟動進程 // command spawnBin, // args spawnArgs, // options { env: { MEDIASOUP_VERSION: '3.7.11', // Let the worker process inherit all environment variables, useful // if a custom and not in the path GCC is used so the user can set // LD_LIBRARY_PATH environment variable for runtime. ...process.env }, detached: false, // fd 0 (stdin) : Just ignore it. // fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff. // fd 2 (stderr) : Same as stdout. // fd 3 (channel) : Producer Channel fd. // fd 4 (channel) : Consumer Channel fd. // fd 5 (channel) : Producer PayloadChannel fd. // fd 6 (channel) : Consumer PayloadChannel fd. stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'], windowsHide: true }); this._pid = this._child.pid; this._channel = new Channel_1.Channel({ producerSocket: this._child.stdio[3], consumerSocket: this._child.stdio[4], pid: this._pid }); this._payloadChannel = new PayloadChannel_1.PayloadChannel({ // NOTE: TypeScript does not like more than 5 fds. // @ts-ignore producerSocket: this._child.stdio[5], // @ts-ignore consumerSocket: this._child.stdio[6] }); this._appData = appData; let spawnDone = false; // Listen for 'running' notification. this._channel.once(String(this._pid), (event) => { if (!spawnDone && event === 'running') { spawnDone = true; logger.debug('worker process running [pid:%s]', this._pid); this.emit('@success'); } }); this._child.on('exit', (code, signal) => { this._child = undefined; this.close(); if (!spawnDone) { spawnDone = true; if (code === 42) { logger.error('worker process failed due to wrong settings [pid:%s]', this._pid); this.emit('@failure', new TypeError('wrong settings')); } else { logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal); this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`)); } } else { logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal); this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`)); } }); this._child.on('error', (error) => { this._child = undefined; this.close(); if (!spawnDone) { spawnDone = true; logger.error('worker process failed [pid:%s]: %s', this._pid, error.message); this.emit('@failure', error); } else { logger.error('worker process error [pid:%s]: %s', this._pid, error.message); this.safeEmit('died', error); } }); // Be ready for 3rd party worker libraries logging to stdout. this._child.stdout.on('data', (buffer) => { for (const line of buffer.toString('utf8').split('\n')) { if (line) workerLogger.debug(`(stdout) ${line}`); } }); // In case of a worker bug, mediasoup will log to stderr. this._child.stderr.on('data', (buffer) => { for (const line of buffer.toString('utf8').split('\n')) { if (line) workerLogger.error(`(stderr) ${line}`); } }); } /** * Worker process identifier (PID). */ get pid() { return this._pid; } /** * Whether the Worker is closed. */ get closed() { return this._closed; } /** * App custom data. */ get appData() { return this._appData; } /** * Invalid setter. */ set appData(appData) { throw new Error('cannot override appData object'); } /** * Observer. * * @emits close * @emits newrouter - (router: Router) */ get observer() { return this._observer; } /** * Close the Worker. */ close() { if (this._closed) return; logger.debug('close()'); this._closed = true; // Kill the worker process. if (this._child) { // Remove event listeners but leave a fake 'error' hander to avoid // propagation. this._child.removeAllListeners('exit'); this._child.removeAllListeners('error'); this._child.on('error', () => { }); this._child.kill('SIGTERM'); this._child = undefined; } // Close the Channel instance. this._channel.close(); // Close the PayloadChannel instance. this._payloadChannel.close(); // Close every Router. for (const router of this._routers) { router.workerClosed(); } this._routers.clear(); // Emit observer event. this._observer.safeEmit('close'); } /** * Dump Worker. */ async dump() { logger.debug('dump()'); return this._channel.request('worker.dump'); } /** * Get mediasoup-worker process resource usage. */ async getResourceUsage() { logger.debug('getResourceUsage()'); return this._channel.request('worker.getResourceUsage'); } /** * Update settings. */ async updateSettings({ logLevel, logTags } = {}) { logger.debug('updateSettings()'); const reqData = { logLevel, logTags }; await this._channel.request('worker.updateSettings', undefined, reqData); } /** * Create a Router. */ async createRouter({ mediaCodecs, appData = {} } = {}) { logger.debug('createRouter()'); if (appData && typeof appData !== 'object') throw new TypeError('if given, appData must be an object'); // This may throw. const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs); const internal = { routerId: uuid_1.v4() }; await this._channel.request('worker.createRouter', internal); const data = { rtpCapabilities }; const router = new Router_1.Router({ internal, data, channel: this._channel, payloadChannel: this._payloadChannel, appData }); this._routers.add(router); router.on('@close', () => this._routers.delete(router)); // Emit observer event. this._observer.safeEmit('newrouter', router); return router; } } exports.Worker = Worker;
(五)分析runHttpsServer方法,啟動https服務
/** * Create a Node.js HTTPS server. It listens in the IP and port given in the * configuration file and reuses the Express application as request listener. */ async function runHttpsServer() { logger.info('running an HTTPS server...'); // HTTPS server for the protoo WebSocket server. const tls = { cert : fs.readFileSync(config.https.tls.cert), key : fs.readFileSync(config.https.tls.key) }; httpsServer = https.createServer(tls, expressApp); //傳入證書和expressApp,創建https服務,存放在全局變量中,在后面使用websocket時使用 await new Promise((resolve) => { httpsServer.listen( Number(config.https.listenPort), config.https.listenIp, resolve); //進行監聽端口 }); }
(六)分析runProtooWebSocketServer方法,用於處理接受發送信令
/** * Create a protoo WebSocketServer to allow WebSocket connections from browsers. */ async function runProtooWebSocketServer() { logger.info('running protoo WebSocketServer...'); // Create the protoo WebSocket server. protooWebSocketServer = new protoo.WebSocketServer(httpsServer, //創建websocket對象,依賴於前面創建的httpsServer { maxReceivedFrameSize : 960000, // 960 KBytes. maxReceivedMessageSize : 960000, fragmentOutgoingMessages : true, fragmentationThreshold : 960000 }); // Handle connections from clients. protooWebSocketServer.on('connectionrequest', (info, accept, reject) => //偵聽connectionrequest事件,處理請求 { // The client indicates the roomId and peerId in the URL query. const u = url.parse(info.request.url, true); const roomId = u.query['roomId']; //請求參數包含roomid const peerId = u.query['peerId']; //用戶id if (!roomId || !peerId) { reject(400, 'Connection request without roomId and/or peerId'); return; } logger.info( 'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]', roomId, peerId, info.socket.remoteAddress, info.origin); // Serialize this code into the queue to avoid that two peers connecting at // the same time with the same roomId create two separate rooms with same // roomId. queue.push(async () => //放入同步隊列,防止沖突 { const room = await getOrCreateRoom({ roomId }); //如果是第一個用戶,則創建房間,不然就加入房間 // Accept the protoo WebSocket connection. const protooWebSocketTransport = accept(); room.handleProtooConnection({ peerId, protooWebSocketTransport }); //各種消息處理,后面分析Room.js會分析這個方法 }) .catch((error) => { logger.error('room creation or room joining failed:%o', error); reject(error); }); }); }
三:Room.js分析
(一)Mediasoup基本概念
Room/Router:在業務層稱為Room,在C++層稱為Router。
Transport/WebRtcTransport:Transport是基類,WebRtcTransport是子類,所以還有其他Transport的子類。Transport是客戶端與服務端建立連接的管理層
Produce/Consume:生產者/消費者,每個用戶本身是一個生產者,同時又是多個用戶的消費者。數據傳輸通過Transport進行傳輸。通過Transport,可以將數據上傳到流媒體服務器,同樣流媒體服務器可以通過Transport下發數據到消費者。
(二)Room主要邏輯
(三)Mediasoup支持的信令
createWebRtcTransport:建立WebRTC連接,在服務端創建一個與客戶端對等的點(含有信息,比如IP、端口),有了這個點之后才能建立連接
connectWebRtcTransport:真正的與客戶端建立連接,數據可以開始傳輸
setConsumerPreferedLayers:設置更喜歡的層,比如simulcast分層,選取其中最合適的分辨率傳輸
requestConsumerKeyFrame:請求關鍵幀,避免花屏,比如一個新的用戶加入視頻,如果不及時請求IDR幀,那么可能獲取的P、B幀無法解析,導致花屏
如果有特殊需要,可以設置自己定義的信令!!!
(四)代碼分析
class Room extends EventEmitter { static async create({ mediasoupWorker, roomId }) { logger.info('create() [roomId:%s]', roomId); // Create a protoo Room instance. const protooRoom = new protoo.Room(); //protoo是websocket庫,實現兩種功能,一個是實現房間管理(socket.io中也有這個概念),一個是websocket功能 // Router media codecs. const { mediaCodecs } = config.mediasoup.routerOptions; //獲取媒體流編解碼信息 // Create a mediasoup Router. const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); //創建Router,后面在new Room的時候將protoo.Room和C++中的Router綁定在一起了 // Create a mediasoup AudioLevelObserver. const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver( //音頻音量信息 { maxEntries : 1, threshold : -80, interval : 800 }); const bot = await Bot.create({ mediasoupRouter }); return new Room( //關聯信息,創建Room,調用構造函數 { roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot }); } constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot }) { super(); this.setMaxListeners(Infinity); this._roomId = roomId; this._closed = false; this._protooRoom = protooRoom; this._broadcasters = new Map(); this._mediasoupRouter = mediasoupRouter; this._audioLevelObserver = audioLevelObserver; this._bot = bot; this._networkThrottled = false; this._handleAudioLevelObserver(); //處理音頻音量事件 global.audioLevelObserver = this._audioLevelObserver; global.bot = this._bot; } handleProtooConnection({ peerId, consume, protooWebSocketTransport }) //由server.js中runProtooWebSocketServer調用,用於處理客戶端的連接 { const existingPeer = this._protooRoom.getPeer(peerId); if (existingPeer) //如果用戶已經存在,則關閉,重新進入 { logger.warn( 'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]', peerId); existingPeer.close(); } let peer; try { peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport); //創建一個新的peer用戶 } catch (error) { logger.error('protooRoom.createPeer() failed:%o', error); } // Not joined after a custom protoo 'join' request is later received. //設置用戶信息 peer.data.consume = consume; peer.data.joined = false; peer.data.displayName = undefined; peer.data.device = undefined; peer.data.rtpCapabilities = undefined; peer.data.sctpCapabilities = undefined; // Have mediasoup related maps ready even before the Peer joins since we // allow creating Transports before joining. peer.data.transports = new Map(); peer.data.producers = new Map(); peer.data.consumers = new Map(); peer.data.dataProducers = new Map(); peer.data.dataConsumers = new Map(); peer.on('request', (request, accept, reject) => //監聽request信令 { logger.debug( 'protoo Peer "request" event [method:%s, peerId:%s]', request.method, peer.id); this._handleProtooRequest(peer, request, accept, reject) //調用私有方法,處理請求。內部實現狀態機switch...case...處理各種狀態(信令,來自於request.method) .catch((error) => { logger.error('request failed:%o', error); reject(error); }); }); peer.on('close', () => { if (this._closed) return; logger.debug('protoo Peer "close" event [peerId:%s]', peer.id); // If the Peer was joined, notify all Peers. if (peer.data.joined) { for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })) { otherPeer.notify('peerClosed', { peerId: peer.id }) .catch(() => {}); } } for (const transport of peer.data.transports.values()) { transport.close(); } // If this is the latest Peer in the room, close the room. if (this._protooRoom.peers.length === 0) { logger.info( 'last Peer in the room left, closing the room [roomId:%s]', this._roomId); this.close(); } }); } }