WebRTC進階流媒體服務器開發(三)Mediasoup源碼分析之應用層(代碼組成、Server.js、Room.js)


一:Mediasoup Demo分析

了解Mediasoup運行機制,以及如何調用Mediasoup核心庫

(一)Mediasoup Demo組成

其中mediasoup-demo為整個代碼框架:(包含所有)

app應用:提供客戶端所需要的應用代碼

broadcasters:用於廣播使用,用於推流的模塊。單向傳輸,只有去或者只有回

server端:信令服務和媒體流服務,兩者通過管道通信。細分為下面幾部分:

--->config.js:配置文件/js文件,通過js獲取一些基本信息。將配置信息交給servere.js使用

--->server.js:從config.js中去獲取基本信息,獲取信息之后去啟動基本服務,比如wensocket服務、信令服務

--->lib:server.js使用的庫文件,細分為以下幾部分

------->Room.js:所有的真正的信令處理邏輯都是在這里實現,還描述了房間相關信息

------->interactiveClient.js:運行時內部信息查詢客戶端,與客戶端交互(debug使用)

------->interactiveServer.js:運行時內部信息查詢服務端,與服務端交換(debug使用)

mediasoup C++:C++部分,用於處理流媒體傳輸,包括lib與worker兩部分

--->lib:一些js文件組成,主要用於對mediasoup的管理工作

--->worker:C++核心代碼

二:Server.js分析

(一)配置環境,從config.js獲取參數

process.title = 'mediasoup-demo-server';                            //啟動進程之后,進程的名字
process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';    //Debug環境變量

/*引入config.js,內部定義了一些參數:
https(證書位置,監聽IP、端口)、
mediasoup(CPU個數,worker進程參數定義如日志級別、端口范圍,router:room的概念在C++表示定義了音視頻編解碼參數)
webRtcTransport傳輸(IP地址、端口、輸入輸出碼率)
*/
const config = require('./config');                                    

/* eslint-disable no-console */
console.log('process.env.DEBUG:', process.env.DEBUG);
console.log('config.js:\n%s', JSON.stringify(config, null, '  '));

(二)引入模塊,初始化變量

const fs = require('fs');
const https = require('https');
const url = require('url');
const protoo = require('protoo-server');
const mediasoup = require('mediasoup');            //mediasoup庫
const express = require('express');
const bodyParser = require('body-parser');
const { AwaitQueue } = require('awaitqueue');    //同步隊列
const Logger = require('./lib/Logger');
const Room = require('./lib/Room');                //房間管理
const interactiveServer = require('./lib/interactiveServer');
const interactiveClient = require('./lib/interactiveClient');

const logger = new Logger();

// Async queue to manage rooms.
// @type {AwaitQueue}
const queue = new AwaitQueue();

// Map of Room instances indexed by roomId.
// @type {Map<Number, Room>}
const rooms = new Map();

// HTTPS server.
// @type {https.Server}
let httpsServer;

// Express application.
// @type {Function}
let expressApp;

// Protoo WebSocket server.
// @type {protoo.WebSocketServer}
let protooWebSocketServer;

// mediasoup Workers.
// @type {Array<mediasoup.Worker>}
const mediasoupWorkers = [];      //數組,存放所有創建的worker進程 // Index of next mediasoup Worker to use.
// @type {Number}
let nextMediasoupWorkerIdx = 0;

(三)進入主函數,分析主函數run方法

run(); //進入run方法,開始執行

async function run()
{
    // Open the interactive server.
    await interactiveServer();                    //啟動interactive server,用於交互---不是重點

    // Open the interactive client.
    if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
        await interactiveClient();                //啟動interactive client,用於交互---不是重點

    // Run a mediasoup Worker.
 await runMediasoupWorkers(); //將所有的需要的進程啟動,重點!!! // Create Express app.
    await createExpressApp();                    //https業務管理,主要用於broadcast---不是重點,但是內部創建了expressApp全局變量,在下面創建https服務中使用 // Run HTTPS server. await runHttpsServer(); //https server運行 // Run a protoo WebSocketServer.
 await runProtooWebSocketServer(); //啟動websocket,用於處理接受發送信令,重點!!! // Log rooms status every X seconds.
    setInterval(() =>
    {
        for (const room of rooms.values())
        {
            room.logStatus();
        }
    }, 120000);
}

(四)分析runMediasoupWorkers方法,啟動相關進程服務

/**
 * Launch as many mediasoup Workers as given in the configuration file.
 */
async function runMediasoupWorkers()
{
    const { numWorkers } = config.mediasoup;                    //從配置中獲取要啟動的進程個數,下面進行循環創建

    logger.info('running %d mediasoup Workers...', numWorkers);

    for (let i = 0; i < numWorkers; ++i)
    {
        const worker = await mediasoup.createWorker(            //底層調用fork創建子進程,傳入相關參數
            {
                logLevel   : config.mediasoup.workerSettings.logLevel,
                logTags    : config.mediasoup.workerSettings.logTags,
                rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
                rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
            });

        worker.on('died', () =>                                    //每個worker進程,監聽一個退出事件
        {
            logger.error(
                'mediasoup Worker died, exiting  in 2 seconds... [pid:%d]', worker.pid);

            setTimeout(() => process.exit(1), 2000);
        });

        mediasoupWorkers.push(worker);                            //將創建完成的worker進程,存放到數組

        // Log worker resource usage every X seconds.
        setInterval(async () =>
        {
            const usage = await worker.getResourceUsage();

            logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
        }, 120000);
    }
}

查看createWorker方法:

/**
 * Create a Worker.
 */
async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) {
    logger.debug('createWorker()');
    if (appData && typeof appData !== 'object')
        throw new TypeError('if given, appData must be an object');
    const worker = new Worker_1.Worker({          //創建worker對象
        logLevel,
        logTags,
        rtcMinPort,
        rtcMaxPort,
        dtlsCertificateFile,
        dtlsPrivateKeyFile,
        appData
    });
    return new Promise((resolve, reject) => {       //監聽worker創建是否成功
        worker.on('@success', () => {
            // Emit observer event.
            observer.safeEmit('newworker', worker);
            resolve(worker);                 //成功則直接返回
        });
        worker.on('@failure', reject);
    });
}
exports.createWorker = createWorker;            //模塊導出,其他文件可以使用

查看Worker類:

class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    /**
     * @private
     * @emits died - (error: Error)
     * @emits @success
     * @emits @failure - (error: Error)
     */ constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {  //構造函數
        super();
        // Closed flag.
        this._closed = false;
        // Routers set.
        this._routers = new Set();
        // Observer instance.
        this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
        logger.debug('constructor()');
        let spawnBin = workerBin;
        let spawnArgs = [];
        if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
            spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
            if (process.env.MEDIASOUP_VALGRIND_OPTIONS) {
                spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/));
            }
            spawnArgs.push(workerBin);
        }
        if (typeof logLevel === 'string' && logLevel)
            spawnArgs.push(`--logLevel=${logLevel}`);
        for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
            if (typeof logTag === 'string' && logTag)
                spawnArgs.push(`--logTag=${logTag}`);
        }
        if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))
            spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
        if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))
            spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
        if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)
            spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
        if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)
            spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
        logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
        this._child = child_process_1.spawn(          //傳入參數,啟動進程 // command
        spawnBin, 
        // args
        spawnArgs, 
        // options
        {
            env: {
                MEDIASOUP_VERSION: '3.7.11',
                // Let the worker process inherit all environment variables, useful
                // if a custom and not in the path GCC is used so the user can set
                // LD_LIBRARY_PATH environment variable for runtime.
                ...process.env
            },
            detached: false,
            // fd 0 (stdin)   : Just ignore it.
            // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
            // fd 2 (stderr)  : Same as stdout.
            // fd 3 (channel) : Producer Channel fd.
            // fd 4 (channel) : Consumer Channel fd.
            // fd 5 (channel) : Producer PayloadChannel fd.
            // fd 6 (channel) : Consumer PayloadChannel fd.
            stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
            windowsHide: true
        });
        this._pid = this._child.pid;
        this._channel = new Channel_1.Channel({
            producerSocket: this._child.stdio[3],
            consumerSocket: this._child.stdio[4],
            pid: this._pid
        });
        this._payloadChannel = new PayloadChannel_1.PayloadChannel({
            // NOTE: TypeScript does not like more than 5 fds.
            // @ts-ignore
            producerSocket: this._child.stdio[5],
            // @ts-ignore
            consumerSocket: this._child.stdio[6]
        });
        this._appData = appData;
        let spawnDone = false;
        // Listen for 'running' notification.
        this._channel.once(String(this._pid), (event) => {
            if (!spawnDone && event === 'running') {
                spawnDone = true;
                logger.debug('worker process running [pid:%s]', this._pid);
                this.emit('@success');
            }
        });
        this._child.on('exit', (code, signal) => {
            this._child = undefined;
            this.close();
            if (!spawnDone) {
                spawnDone = true;
                if (code === 42) {
                    logger.error('worker process failed due to wrong settings [pid:%s]', this._pid);
                    this.emit('@failure', new TypeError('wrong settings'));
                }
                else {
                    logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                    this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                }
            }
            else {
                logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
            }
        });
        this._child.on('error', (error) => {
            this._child = undefined;
            this.close();
            if (!spawnDone) {
                spawnDone = true;
                logger.error('worker process failed [pid:%s]: %s', this._pid, error.message);
                this.emit('@failure', error);
            }
            else {
                logger.error('worker process error [pid:%s]: %s', this._pid, error.message);
                this.safeEmit('died', error);
            }
        });
        // Be ready for 3rd party worker libraries logging to stdout.
        this._child.stdout.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('\n')) {
                if (line)
                    workerLogger.debug(`(stdout) ${line}`);
            }
        });
        // In case of a worker bug, mediasoup will log to stderr.
        this._child.stderr.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('\n')) {
                if (line)
                    workerLogger.error(`(stderr) ${line}`);
            }
        });
    }
    /**
     * Worker process identifier (PID).
     */
    get pid() {
        return this._pid;
    }
    /**
     * Whether the Worker is closed.
     */
    get closed() {
        return this._closed;
    }
    /**
     * App custom data.
     */
    get appData() {
        return this._appData;
    }
    /**
     * Invalid setter.
     */
    set appData(appData) {
        throw new Error('cannot override appData object');
    }
    /**
     * Observer.
     *
     * @emits close
     * @emits newrouter - (router: Router)
     */
    get observer() {
        return this._observer;
    }
    /**
     * Close the Worker.
     */
    close() {
        if (this._closed)
            return;
        logger.debug('close()');
        this._closed = true;
        // Kill the worker process.
        if (this._child) {
            // Remove event listeners but leave a fake 'error' hander to avoid
            // propagation.
            this._child.removeAllListeners('exit');
            this._child.removeAllListeners('error');
            this._child.on('error', () => { });
            this._child.kill('SIGTERM');
            this._child = undefined;
        }
        // Close the Channel instance.
        this._channel.close();
        // Close the PayloadChannel instance.
        this._payloadChannel.close();
        // Close every Router.
        for (const router of this._routers) {
            router.workerClosed();
        }
        this._routers.clear();
        // Emit observer event.
        this._observer.safeEmit('close');
    }
    /**
     * Dump Worker.
     */
    async dump() {
        logger.debug('dump()');
        return this._channel.request('worker.dump');
    }
    /**
     * Get mediasoup-worker process resource usage.
     */
    async getResourceUsage() {
        logger.debug('getResourceUsage()');
        return this._channel.request('worker.getResourceUsage');
    }
    /**
     * Update settings.
     */
    async updateSettings({ logLevel, logTags } = {}) {
        logger.debug('updateSettings()');
        const reqData = { logLevel, logTags };
        await this._channel.request('worker.updateSettings', undefined, reqData);
    }
    /**
     * Create a Router.
     */
    async createRouter({ mediaCodecs, appData = {} } = {}) {
        logger.debug('createRouter()');
        if (appData && typeof appData !== 'object')
            throw new TypeError('if given, appData must be an object');
        // This may throw.
        const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);
        const internal = { routerId: uuid_1.v4() };
        await this._channel.request('worker.createRouter', internal);
        const data = { rtpCapabilities };
        const router = new Router_1.Router({
            internal,
            data,
            channel: this._channel,
            payloadChannel: this._payloadChannel,
            appData
        });
        this._routers.add(router);
        router.on('@close', () => this._routers.delete(router));
        // Emit observer event.
        this._observer.safeEmit('newrouter', router);
        return router;
    }
}
exports.Worker = Worker;

(五)分析runHttpsServer方法,啟動https服務

/**
 * Create a Node.js HTTPS server. It listens in the IP and port given in the
 * configuration file and reuses the Express application as request listener.
 */
async function runHttpsServer()
{
    logger.info('running an HTTPS server...');

    // HTTPS server for the protoo WebSocket server.
    const tls =
    {
        cert : fs.readFileSync(config.https.tls.cert),
        key  : fs.readFileSync(config.https.tls.key)
    };
 httpsServer = https.createServer(tls, expressApp);    //傳入證書和expressApp,創建https服務,存放在全局變量中,在后面使用websocket時使用 await new Promise((resolve) =>
    {
 httpsServer.listen( Number(config.https.listenPort), config.https.listenIp, resolve);  //進行監聽端口
    });
}

(六)分析runProtooWebSocketServer方法,用於處理接受發送信令

/**
 * Create a protoo WebSocketServer to allow WebSocket connections from browsers.
 */
async function runProtooWebSocketServer()
{
    logger.info('running protoo WebSocketServer...');

    // Create the protoo WebSocket server. protooWebSocketServer = new protoo.WebSocketServer(httpsServer,  //創建websocket對象,依賴於前面創建的httpsServer
        {
            maxReceivedFrameSize     : 960000, // 960 KBytes.
            maxReceivedMessageSize   : 960000,
            fragmentOutgoingMessages : true,
            fragmentationThreshold   : 960000
        });

    // Handle connections from clients.
 protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>  //偵聽connectionrequest事件,處理請求
    {
        // The client indicates the roomId and peerId in the URL query.
        const u = url.parse(info.request.url, true);
        const roomId = u.query['roomId'];              //請求參數包含roomid
        const peerId = u.query['peerId'];              //用戶id

        if (!roomId || !peerId)
        {
            reject(400, 'Connection request without roomId and/or peerId');

            return;
        }

        logger.info(
            'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
            roomId, peerId, info.socket.remoteAddress, info.origin);

        // Serialize this code into the queue to avoid that two peers connecting at
        // the same time with the same roomId create two separate rooms with same
        // roomId. queue.push(async () =>            //放入同步隊列,防止沖突
        {
            const room = await getOrCreateRoom({ roomId });  //如果是第一個用戶,則創建房間,不然就加入房間 // Accept the protoo WebSocket connection.
            const protooWebSocketTransport = accept();
 room.handleProtooConnection({ peerId, protooWebSocketTransport });  //各種消息處理,后面分析Room.js會分析這個方法
        })
            .catch((error) =>
            {
                logger.error('room creation or room joining failed:%o', error);

                reject(error);
            });
    });
}

三:Room.js分析

(一)Mediasoup基本概念

Room/Router:在業務層稱為Room,在C++層稱為Router。

Transport/WebRtcTransport:Transport是基類,WebRtcTransport是子類,所以還有其他Transport的子類。Transport是客戶端與服務端建立連接的管理層

Produce/Consume:生產者/消費者,每個用戶本身是一個生產者,同時又是多個用戶的消費者。數據傳輸通過Transport進行傳輸。通過Transport,可以將數據上傳到流媒體服務器,同樣流媒體服務器可以通過Transport下發數據到消費者。

(二)Room主要邏輯

(三)Mediasoup支持的信令

createWebRtcTransport:建立WebRTC連接,在服務端創建一個與客戶端對等的點(含有信息,比如IP、端口),有了這個點之后才能建立連接

connectWebRtcTransport:真正的與客戶端建立連接,數據可以開始傳輸

setConsumerPreferedLayers:設置更喜歡的層,比如simulcast分層,選取其中最合適的分辨率傳輸

requestConsumerKeyFrame:請求關鍵幀,避免花屏,比如一個新的用戶加入視頻,如果不及時請求IDR幀,那么可能獲取的P、B幀無法解析,導致花屏

如果有特殊需要,可以設置自己定義的信令!!!

(四)代碼分析

class Room extends EventEmitter
{
    static async create({ mediasoupWorker, roomId })
    {
        logger.info('create() [roomId:%s]', roomId);

        // Create a protoo Room instance. const protooRoom = new protoo.Room(); //protoo是websocket庫,實現兩種功能,一個是實現房間管理(socket.io中也有這個概念),一個是websocket功能 // Router media codecs. const { mediaCodecs } = config.mediasoup.routerOptions; //獲取媒體流編解碼信息 // Create a mediasoup Router. const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); //創建Router,后面在new Room的時候將protoo.Room和C++中的Router綁定在一起了 // Create a mediasoup AudioLevelObserver. const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver( //音頻音量信息
            {
                maxEntries : 1,
                threshold  : -80,
                interval   : 800
            });

        const bot = await Bot.create({ mediasoupRouter });

        return new Room( //關聯信息,創建Room,調用構造函數
            {
                roomId,
                protooRoom,
                mediasoupRouter,
                audioLevelObserver,
                bot
            });
    }


    constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot })
    {
        super();
        this.setMaxListeners(Infinity);
        this._roomId = roomId;
        this._closed = false;
        this._protooRoom = protooRoom;

        this._broadcasters = new Map();

        this._mediasoupRouter = mediasoupRouter;
        this._audioLevelObserver = audioLevelObserver;

        this._bot = bot;

        this._networkThrottled = false;
        this._handleAudioLevelObserver(); //處理音頻音量事件
        global.audioLevelObserver = this._audioLevelObserver;
        global.bot = this._bot;
    }


 handleProtooConnection({ peerId, consume, protooWebSocketTransport }) //由server.js中runProtooWebSocketServer調用,用於處理客戶端的連接
    {
        const existingPeer = this._protooRoom.getPeer(peerId);

        if (existingPeer) //如果用戶已經存在,則關閉,重新進入
        {
            logger.warn(
                'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
                peerId);

            existingPeer.close();
        }

        let peer;

        try
        { peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport); //創建一個新的peer用戶
        }
        catch (error)
        {
            logger.error('protooRoom.createPeer() failed:%o', error);
        }

        // Not joined after a custom protoo 'join' request is later received.
        //設置用戶信息
        peer.data.consume = consume;
        peer.data.joined = false;
        peer.data.displayName = undefined;
        peer.data.device = undefined;
        peer.data.rtpCapabilities = undefined;
        peer.data.sctpCapabilities = undefined;

        // Have mediasoup related maps ready even before the Peer joins since we
        // allow creating Transports before joining.
        peer.data.transports = new Map();
        peer.data.producers = new Map();
        peer.data.consumers = new Map();
        peer.data.dataProducers = new Map();
        peer.data.dataConsumers = new Map();
 peer.on('request', (request, accept, reject) => //監聽request信令
        {
            logger.debug(
                'protoo Peer "request" event [method:%s, peerId:%s]',
                request.method, peer.id);

            this._handleProtooRequest(peer, request, accept, reject) //調用私有方法,處理請求。內部實現狀態機switch...case...處理各種狀態(信令,來自於request.method)
                .catch((error) =>
                {
                    logger.error('request failed:%o', error);

                    reject(error);
                });
        });

        peer.on('close', () =>
        {
            if (this._closed)
                return;

            logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);

            // If the Peer was joined, notify all Peers.
            if (peer.data.joined)
            {
                for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                {
                    otherPeer.notify('peerClosed', { peerId: peer.id })
                        .catch(() => {});
                }
            }

            for (const transport of peer.data.transports.values())
            {
                transport.close();
            }

            // If this is the latest Peer in the room, close the room.
            if (this._protooRoom.peers.length === 0)
            {
                logger.info(
                    'last Peer in the room left, closing the room [roomId:%s]',
                    this._roomId);

                this.close();
            }
        });
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM