SRS之監聽端口的管理:RTMP


1. 監聽端口管理的入口函數

監聽端口的管理入口在 run_master 函數中,如下:

int run_master()
{
    ...
    
    if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
        return ret;
    }
    
    ...
}

在 run_master 函數中,調用了 SrsServer 類的成員函數 listen:

int SrsServer::listen()
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_api()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

2. rtmp 端口的管理

SrsServer 與 SrsListener 之間的關系圖

2.1 SrsServer::listen_rtmp

int SrsServer::listen_rtmp()
{
    int ret = ERROR_SUCCESS;
    
    // stream service port.
    /* 獲取配置文件中所有要監聽的端口 */
    std::vector<std::string> ip_ports = _srs_config->get_listens();
    srs_assert((int)ip_ports.size() > 0);
    
    /* 清空 listeners 容器中所有類型為 SrsListenerRtmpStream 的項 */
    close_listeners(SrsListenerRtmpStream);
    
    for (int i = 0; i < (int)ip_ports.size(); i++) {
        /* 父類 SrsListener 的指針listener 指向新構造的子類 SrsStreamListener 的對象 */
        SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
        /* 然后將該指向新構造的子類 SrsStreamListener 對象的 listener 指針放入到 
         * listeners vector 容器中 */
        listeners.push_back(listener);
        
        std::string ip;
        int port;
        /* 分割 ip 地址(若有的話)和 port 端口 */
        srs_parse_endpoint(ip_ports[i], ip, port);
        
        /* 多態:調用子類 SrsStreamListener 的成員函數 listen */
        if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
            srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
            return ret;
        }
    }
    
    return ret;
}

2.1.1 監聽的類型:SrsListenerType

// listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
    // RTMP client,
    SrsListenerRtmpStream       = 0,
    // HTTP api,
    SrsListenerHttpApi          = 1,
    // HTTP stream, HDS/HLS/DASH
    SrsListenerHttpStream       = 2,
    // UDP stream, MPEG-TS over udp
    SrsListenerMpegTsOverUdp    = 3,
    // TCP stream, RTSP stream.
    SrsListenerRtsp             = 4,
    // TCP stream, FLV stream over HTTP.
    SrsListenerFlv              = 5,
};

2.2 SrsServer::close_listeners

void SrsServer::close_listeners(SrsListenerType type)
{
    std::vector<SrsListener*>::iterator it;
    for (it = listeners.begin(); it != listeners.end();) {
        SrsListener* listener = *it;
        
        if (listener->listen_type() != type) {
            ++it;
            continue;
        }
        
        srs_freep(listener);
        it = listeners.erase(it);
    }
}

該函數是將 listeners 中所有類型值為 type 的元素移除。

2.3 SrsStreamListener 的構造

2.3.1 SrsStreamListener 類定義

/**
 * tcp listener.
 */
class SrsStreamListener : virtual public SrsListener, virtual public ISrsTcpHandler
{
private:
    SrsTcpListener* listener;
public:
    SrsStreamListener(SrsServer* server, SrsListenerType type);
    virtual ~SrsStreamListener();
public:
    virtual int listen(std::string ip, int port);
// ISrsTcpHandler
public:
    virtual int on_tcp_client(st_netfd_t stfd);
};

該類繼承自 SrsListener 和 ISrsTcpHandler。

2.3.2 SrsListener 類定義

/**
* the common tcp listener, for RTMP/HTTP server.
*/
class SrsListener
{
protected:
    /* 監聽類型:rtmp 或 http 或其他 */
    SrsListenerType type;
protected:
    /* 服務器地址 */
    std::string ip;
    /* 監聽的端口 */
    int port;
    SrsServer* server;
public:
    SrsListener(SrsServer* svr, SrsListenerType t);
    virtual ~SrsListener();
public:
    virtual SrsListenerType listen_type();
    virtual int listen(std::string i, int p) = 0;
};

2.3.3 ISrsTcpHandler 類定義

/**
* the tcp connection handler.
*/
class ISrsTcpHandler
{
public:
    ISrsTcpHandler();
    virtual ~ISrsTcpHandler();
public:
    /**
    * when got tcp client.
    */
    virtual int on_tcp_client(st_netfd_t stfd) = 0;
};

該類中定義了當接收到 tcp 客戶端連接時調用的純虛函數,由子類實現.

SrsStreamListener 和 SrsListener、ISrsTcpHandler 之間的關系圖

2.3.4 構造 SrsStreamListener 類

下面按構造 SrsStreamListener 類調用的構造函數的先后順序進行分析。

2.3.4.1 SrsListener 構造函數

SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
{
    port = 0;
    server = svr;
    type = t;
}

該 SrsListener 的構造函數僅對該類的成員進行初始化。

SrsStreamListener 的父類 ISrsTcpHandler 類的構造函數是個空殼,什么也沒做。

2.3.4.2 SrsStreamListener 構造函數

SrsStreamListener::SrsStreamListener(SrsServer* svr, SrsListenerType t) 
    : SrsListener(svr, t)
{
    listener = NULL;
}

這里僅對 listener 置初值 NULL。

2.4 srs_parse_endpoint:解析端口

void srs_parse_endpoint(string ip_port, string& ip, int& port)
{
    std::string the_port;
    srs_parse_endpoint(ip_port, ip, the_port);
    port = ::atoi(the_port.c_str());
}

該函數接着調用 srs_parse_endpoint 的另一重載函數:

void srs_parse_endpoint(string ip_port, string& ip, string& port)
{
    ip = "0.0.0.0";
    port = ip_port;
    
    /* string::npos 是一個長度參數,表示直到字符串的結束 */
    size_t pos = string::npos;
    if ((pos = pos.find(":")) != string::npos) {
        /* 分割 ip 地址和端口 */
        ip = port.substr(0, pos);
        port = port.substr(pos + 1);
    }
}

2.5 SrsStreamListener::listen

在該函數中,開始綁定並監聽端口:

int SrsStreamListener::listen(string i, int p)
{
    int ret = ERROR_SUCCESS;
    
    ip = i;
    port = p;
    
    srs_freep(listener);
    /* 構造 SrsTcpListener 類,該類中創建了一個可重復利用的線程: tcp */
    listener = new SrsTcpListener(this, ip, port);
    
    /* 開始真正的綁定和監聽端口 */
    if ((ret = listener->listen()) != ERROR_SUCCESS) {
        srs_error("tcp listen failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("listen thread current_cid=%d, "
        "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
        _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
    
    srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), 
                                                 ip.c_str(), port, listener->fd());

    return ret;
}
SrsStreamListener 與 SrsTcpListener 之間的關系圖

2.5.1 SrsTcpListener 類定義

/**
 * bind and listen tcp port, use handler to process the client.
 */
class SrsTcpListener : public ISrsReuseableThreadHandler
{
private:
    int _fd;
    st_netfd_t _stfd;
    /* 定義一個 SrsReuseableThread 類的指針變量,創建一個可被其他線程啟動或終止的線程 */
    SrsReuseableThread* pthread;
private:
    ISrsTcpHandler* handler;
    std::string ip;
    int port;
public:
    SrsTcpListener(ISrsTcpHandler* h, std::string i, int p);
    virtual ~SrsTcpListener();
public:
    virtual int fd();
public:
    virtual int listen();
// interface ISrsReusableThreadHandler.
public:
    virtual int cycle();
};

該類主要用於綁定和監聽 tcp 端口,且繼承自 ISrsReuseableThreadHandler 類。

2.5.2 ISrsReuseableThreadHandler 類定義

/**
 * the reuse thread is a thread stop and start by other thread.
 *     user can create thread and stop then start again and again,
 *     generally must provides a start and stop method, @see SrsIngester.
 *     the step to create a thread stop by other thread:
 *     1. create SrsReuseableThread field.
 *     2. must manually stop the thread when started it.
 *     for example:
 *         class SrsIngester : public ISrsReusableThreadHandler {
 *             public: SrsIngester() { 
 *                 pthread = new SrsReuseableThread("ingest", 
 *                           this, SRS_AUTO_INGESTER_SLEEP_US); 
 * 
 *             }
 *             public: virtual int start() { return pthread->start(); }
 *             public: virtual void stop() { pthread->stop(); }
 *             public: virtual int cycle() {
 *                 // check status, start ffmpeg when stopped.
 *             }
 *         };
 */
class ISrsReusableThreadHandler
{
public:
    ISrsReusableThreadHandler();
    virtual ~ISrsReusableThreadHandler();
public:
    /**
     * the cycle method for the one cycle thread.
     * @remark when the cycle has its inner loop, it must check whether
     * the thread is intrrupted.
     */
    virtual int cycle() = 0;
public:
    /**
     * other callback for handler.
     * @remark all callback is optional, handler can ignore it.
     */
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
SrsTcpListener 與 ISrsReusableThreadHandler 之間的關系圖

2.5.3 SrsTcpListener 構造函數

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{
    handler = h;
    ip = i;
    port = p;

    _fd = -1;
    _stfd = NULL;

    /* 創建一個可重復利用的線程(即由其他線程終止后可再次啟動) */
    pthread = new SrsReusableThread("tcp", this);
}

在該構造函數中,除了初始化 SrsTcpListener 類的一些成員變量外,還創建了一個可被重復使用的線程:"tcp",即可被其他線程啟動或終止的線程。

SrsTcpListener 與 SrsReusableThread 之間的關系圖

2.5.4 SrsReusableThread 類

2.5.4.1 SrsReusableThread 類定義

class SrsReusableThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread;
    ISrsReusableThreadHandler* handler;
public:
    SrsReusableThread(const char *n, ISrsReusableThreadHandler* h, 
                      int64_t interval_us = 0);
    virtual ~SrsReusableThread();
public:
    /**
     * for the reusable thread, start and stop by user.
     */
    virtual int start();
    /**
     * stop the thread, wait for the thread to terminater.
     * @remark user can stop multiple times, ignore if already stopped.
     */
    virtual void stop();
public:
    /**
     * get the context id. @see: ISrsThreadContext.get_id().
     * used for parent thread to get the id.
     * @remark when start thread, parent thread will block and wait for this id ready.
     */
    virtual int cid();
// interface internal::ISrsThreadHandler
public:
    virtual int cycle();
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
SrsReusableThread 和 internal::ISrsThreadHandler 之間的關系圖

2.5.4.2 SrsReusableThread 構造函數

SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, 
        int64_t interval_us)
{
    handler = h;
    /**
     * @n: 指定了該線程的名字
     * @this: 指定了該線程的處理者為 SrsReusableThread 
     * @interval_us: 指定了該線程每次循環后休眠的時間
     * @true: 指定該線程是 joinable 的,必須由其他線程終止該線程
     */
    pthread = new internal::SrsThread(n, this, interval_us, true);
}

2.5.4.3 SrsThread 構造函數

SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, 
        int64_t interval_us, bool joinable)
{
    _name = name;
    /* 父類 ISrsThreadHandler 指針 handler 指向子類 SrsReusableThread 對象的首地址  */
    handler = thread_handler;
    /* 每次循環后休眠的時間 */
    cycle_interval_us = interval_us;
    
    tid = NULL;
    loop = false;
    really_terminated = true;
    _cid = -1;
    _joinable = joinable;
    disposed = false;
    
    // in start(), the thread cycle method maybe stop and remove the thread itself,
    // and the thread start() is waiting for the _cid, and segment fault then.
    // @see https://github.com/ossrs/srs/issues/110
    // thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
    can_run = false;
}

該 SrsThread 的構造函數中僅初始化了該類的一些成員變量,該類沒有父類。

2.5.5 SrsTcpListener::listen

int SrsTcpListener::listen()
{
    int ret = ERROR_SUCCESS;
    
    /* 創建一個 tcp socket 套接字 */
    if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        ret = ERROR_SOCKET_CREATE;
        srs_error("create linux socket error. port=%d, ret=%d", port, ret);
        return ret;
    }
    srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
    
    /* 設置該 tcp 套接字的屬性為地址可復用 */
    int reuse_socket = 1;
    if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
        ret = ERROR_SOCKET_SETREUSE;
        srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
        return ret;
    }
    srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
    
    /* 將該端口綁定在具體的 ip 地址上 */
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip.c_str());
    if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
        ret = ERROR_SOCKET_BIND;
        srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    /* 監聽該端口 */
    if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
        ret = ERROR_SOCKET_LISTEN;
        srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    /* 構造一個 _st_netfd_t 的結構體,同時設置 _fd 為非阻塞,以便 ST 庫使用 */
    if ((_stfd = st_netfd_open_socket(_fd)) == NULL) {
        ret = ERROR_ST_OPEN_SOCKET;
        srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", 
                  ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
        srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), 
                  port, ret);
        return ret;
    }
    srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
    
    return ret;
}

2.5.6 st_netfd_open_socket

_st_netfd_t *st_netfd_open_socket(int osfd)
{
  return _st_netfd_new(osfd, 1, 1);
}

該函數中,又調用了 _st_netfd_new 函數,主要是創建並初始化一個 _st_netfd_t 結構體,同時設置 osfd 套接字為非阻塞,代碼如下:

static _st_netfd_t *_st_netfd_new(int osfd, int nonblock, int is_socket)
{
  _st_netfd_t *fd;
  int flags = 1;

  if ((*_st_eventsys->fd_new)(osfd) < 0)
    return NULL;

  if (_st_netfd_freelist) {
    fd = _st_netfd_freelist;
    _st_netfd_freelist = _st_netfd_freelist->next;
  } else {
    fd = calloc(1, sizeof(_st_netfd_t));
    if (!fd)
      return NULL;
  }

  fd->osfd = osfd;
  fd->inuse = 1;
  fd->next = NULL;

  if (nonblock) {
    /* Use just one system call */
    if (is_socket && ioctl(osfd, FIONBIO, &flags) != -1)
      return fd;
    /* Do it the Posix way */
    if ((flags = fcntl(osfd, F_GETFL, 0)) < 0 ||
	fcntl(osfd, F_SETFL, flags | O_NONBLOCK) < 0) {
      st_netfd_free(fd);
      return NULL;
    }
  }

  return fd;
}

2.5.7 SrsReusableThread::start

int SrsReusableThread::start()
{
    /* 調用 internal::SrsThread->start() 函數 */
    return pthread->start();
}

2.5.8 SrsThread::start

int SrsThread::start()
{
    int ret = ERROR_SUCCESS;
    
    /* 檢測該線程 id 是否為正值,即已經創建並運行了 */
    if (tid) {
        srs_info("thread %s already running.", _name);
        return ret;
    }
    
    /* 調用 st_thread_create 函數創建一個線程,線程函數為 thread_fun,
     * 對於 "tcp" 線程,可知 _joinable 為 1, 同時將該線程添加到 run 隊列中  */
    if ((tid = st_thread_create(thread_fun, this, (_joinable ? 1 : 0), 0)) == NULL) {
        ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
    }
    
    disposed = false;
    // we set to loop to true for thread to run.
    loop = true;
    
    // wait for cid to ready, for parent thread to get the cid.
    while (_cid < 0) {
        /* 這里會更改當前線程的狀態為 SLEEPING,並將其添加到 sleep 隊列中,
         * 然后切換上下文環境,當 run 隊列中可調度運行的線程時,則調度它們 */
        st_usleep(10 * 1000);
    }
    
    // now, cycle thread can run.
    can_run = true;
    
    return ret;
}

2.5.9 st_usleep

int st_usleep(st_utime_t usecs)
{
    _st_thread_t *me = _ST_CURRENT_THREAD();
    
    if (me->flags & _ST_FL_INTERRUPT) {
        me->flags &= ~_ST_FL_INTERRUPT;
        errno = EINTR;
        return -1;
    }
    
    if (usecs != ST_UTIME_NO_TIMEOUT) {
        /* 設置當前線程的狀態並將其添加到 sleep 隊列中 */
        me->state = _ST_ST_SLEEPING;
        _ST_ADD_SLEEPQ(me, usecs);
    } else 
        me->state = _ST_ST_SUSPENDED;
    
    _ST_SWITCH_CONTEXT(me);
    
    if (me->flags & _ST_FL_INTERRUPT) {
        me->flags &= ~_ST_FL_INTERRUPT;
        errno = EINTR;
        return -1;
    }
    
    return 0;
}

該函數主要執行流程如下:

  1. 設置當前線程的 state 為 _ST_ST_SLEEPING,並將其添加到 sleep 隊列中;
  2. 調用 _ST_SWITCH_CONTEXT,該宏又執行以下操作:
    • 先調用 setjmp(這里調用的是 md.S 中的匯編代碼)保存當前線程的上下文環境,以便當前線程的休眠時間超時時可以 longjmp 到這里,喚醒當前線程,繼續往下執行。
    • 第一次調用 setjmp 返回 0,因此接着調用 _st_vp_schedule 函數,在該函數中會檢測到若 run 隊列中有可調度運行的線程的,則會將其取出來,然后 longjmp 到該線程中執行;否則切換到 idle 線程中。

2.5.10 _ST_SWITCH_CONTEXT

/*
 * Switch away from the current thread context by saving its state and
 * calling the thread scheduler
 */
#define _ST_SWITCH_CONTEXT(_thread)       \
    ST_BEGIN_MACRO                        \
    ST_SWITCH_OUT_CB(_thread);            \
    if (!MD_SETJMP((_thread)->context)) { \
      _st_vp_schedule();                  \
    }                                     \
    ST_DEBUG_ITERATE_THREADS();           \
    ST_SWITCH_IN_CB(_thread);             \
    ST_END_MACRO

這里保存上下文環境,然后調度其他線程運行。

注意:這里 setjmp 和 longjmp 一般會使用 md.S 中的匯編代碼。

2.5.11 _st_vp_schedule

void _st_vp_schedule(void)
{
    _st_thread_t *thread;
    
    /* 若 run 隊列中有可調度運行的線程,則將其取出來,同時從 run 隊列移除它 */
    if (_ST_RUNQ.next != &_ST_RUNQ) {
        /* Pull thread off of the run queue */
        thread = _ST_THREAD_PTR(_ST_RUNQ.next);
        _ST_DEL_RUNQ(thread);
    } else {
        /* 否則調度到 idle 線程中 */
        /* If there are no threads to run, switch to the idle thread */
        thread = _st_this_vp.idle_thread;
    }
    ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
    
    /* 這里調用 longjmp 切換到待調度運行的線程上下文環境 */
    /* Resume the thread */
    thread->state = _ST_ST_RUNNING;
    _ST_RESTORE_CONTEXT(thread);
}

注,idle 線程做的操作主要就是:

  1. epoll_wait 激活監聽到 I/O 事件的線程,將其添加到 run 隊列中,等待調度;
  2. 檢測 sleep 隊列中的超時線程,若有線程的超時時間到達了,則調度該線程;

2.5.12 _ST_RESTORE_CONTEXT

/*
 * Restore a thread context that was saved by _ST_SWITCH_CONTEXT or
 * initialized by _ST_INIT_CONTEXT
 */
#define _ST_RESTORE_CONTEXT(_thread)   \
    ST_BEGIN_MACRO                     \
    _ST_SET_CURRENT_THREAD(_thread);   \
    MD_LONGJMP((_thread)->context, 1); \
    ST_END_MACRO

這里首先設置標識當前線程的全局變量指向該將要調度運行的線程,然后才 longjmp 到該線程中。

2.6 rtmp tcp 線程: SrsThread::thread_fun

在 SrsThread::start 函數中,當 SrsThread 的成員 _cid 小於 0 時,會循環調度 st_usleep 函數,將當前休眠休眠 10 * 1000 us,直到 _cid 准備好。在 st_usleep 函數中,會將當前線程的控制權讓出去,進而調度其他線程執行。因而有可能會調度到剛創建的 "tcp" 線程,該線程的回調函數為 SrsThread::thread_fun。

void *SrsThread::thread_fun(void *arg)
{
    SrsThread* obj = (SrsThread*)arg;
    srs_assert(obj);
    
    /* 進入線程循環 */
    obj->thread_cycle();
    
    /* 下面是用於 valgrind 檢測內存泄漏和非法內存操作 */
    // for valgrind to detect.
    SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
    if (ctx) {
        ctx->clear_cid();
    }
    
    st_thread_exit(NULL);
    
    return NULL;
}

2.6.1 SrsThread::thread_cycle

void SrsThread::thread_cycle()
{
    int ret = ERROR_SUCCESS;
    
    /* 生成一個該線程的 context id */
    _srs_context->generate_id();
    srs_info("thread %s cycle start", _name);
    
    /* 獲取該線程的 context id */
    _cid = _srs_context->get_id();
    
    srs_assert(handler);
    /* 父類 ISrsThreadHandler 指針 handler 調用子類對象
     * SrsReusableThread 的成員函數 on_thread_start  */
    handler->on_thread_start();
    
    // thread is running now
    really_terminated = false;
    
    // wait for cid to ready, for parent thread to get the cid.
    while (!can_run && loop) {
        /* 這里又將當前線程的控制權切換給其他線程,直到 can_run 和 loop 都為 1 時,
         * 該線程再次調度到時才會繼續往下執行真正的循環 */
        st_usleep(10 * 1000);
    }
    
    /* 當該線程的 can_run 和 loop 都為 1 時,且該線程的休眠時間到達了,才會再次
     * 調度該線程從這里繼續往下執行 */
    
    while (loop) {
        
        if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on before cycle success", _name);
        
        if ((ret = handler->cycle()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) 
            {
                srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
            }
            goto failed;
        }
        srs_info("thread %s cycle success", _name);
        
        if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on end cycle success", _name);
        
    failed:
        if (!loop) {
            break;
        }
        
        // to improve performance, donot sleep when interval is zero.
        // @see: https://github.com/ossrs/srs/issues/237
        /* 每次線程的循環執行完畢時,都將當前線程休眠 cycle_interval_us 微妙,同時調度其他線程執行 */
        if (cycle_interval_us != 0) {
            st_usleep(cycle_interval_us);
        }
    }
    
    // readly terminated now.
    really_terminated = true;
    
    handler->on_thread_stop();
    srs_info("thread %s cycle finished", _name);
}

2.6.2 SrsReusableThread::on_thread_start

該函數位於 srs_app_thread.cpp 中:

void SrsReusableThread::on_thread_start()
{
    handler->on_thread_start();
}

這里接着調用 ISrsReusableThreadHandler::on_thread_start() 函數,該函數為空。

3. 總結

配置文件中 rtmp 端口建立監聽過程:

  1. 根據監聽端口的個數,為每個待監聽的端口都構建一個 SrsStreamListener 類,然后調用 SrsStreamListener::listen 函數;
  2. 在 SrsStreamListener::listen 函數中又構造了 SrsTcpListener 類,在該類的構造函數中,創建了一個可重復使用的線程 "tcp";
  3. 接着調用 SrsTcpListener::listen 開始綁定 ip 地址和監聽端口;
  4. 最后調用 st_create_thread 創建一個線程.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM