SRS之RTMP連接處理線程conn:接收客戶端推流


SRS之RTMP的TCP線程 分析可知,SRS 接受客戶端的連接后創建了一個線程:conn,用於處理與客戶端的 RTMP 連接。

本文的分析是基於該配置文件的:

listen              1935;
max_connections     1000;
daemon              off;
srs_log_tank        console;
vhost __defaultVhost__ {
}

該配置文件僅使能 rtmp 直播推流功能。

1. 關系圖

2. RTMP 連接處理線程 conn 之主循環

2.1 conn 的線程函數: SrsThread::thread_fun

void* SrsThread::thread_fun(void* arg)
{
    SrsThread* obj = (SrsThread*)arg;
    srs_assert(obj);
    
    /* 這里進入線程的主循環開始處理與客戶端的請求 */
    obj->thread_cycle();
    
    // for valgrind to detect.
    SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
    if (ctx) {
        ctx->clear_cid();
    }
    
    st_thread_exit(NULL);
    
    return NULL;
}

2.2 線程主循環:SrsThread::thread_cycle

void SrsThread::thread_cycle()
{
    int ret = ERROR_SUCCESS;
    
    _srs_context->generate_id();
    srs_info("thread %s cycle start", _name);
    
    _cid = _srs_context->get_id();
    
    srs_assert(handler);
    /* handler 是指向 ISrsThreadHandler 類對象的指針,由圖 1 知,
     * handler 指向子類 SrsOneCycleThread 對象的 this 指針,因此,
     * 這里調用的是 SrsOneCycleThread 的 on_thread_start 函數 */
    handler->on_thread_start();
    
    // thread is running now.
    really_terminated = false;
    
    // wait for cid to ready, for parent thread to get the cid.
    while (!can_run && loop) {
        st_usleep(10 * 1000);
    }
    
    while (loop) {
        /* 調用子類 SrsOneCycleThread 的 on_before_cycle 函數 */
        if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on before cycle success", _name);
        
        /* 這里真正開始處理與客戶端的 RTMP 通信 */
        if ((ret = handler->cycle()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) 
            {
                srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
            }
            goto failed;
        }
        srs_info("thread %s cycle success", _name);
        
        if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on end cycle success", _name);
        
    failed:
        if (!loop) {
            break;
        }
        
        // to improve performance, donot sleep when interval is zero.
        // @see: https://github.com/ossrs/srs/issues/237
        if (cycle_interval_us != 0) {
            st_usleep(cycle_interval_us);
        }
    }
    
    // readly terminated now.
    really_terminated = true;
    
    handler->on_thread_stop();
    srs_info("thread %s cycle finished", _name);
}

2.3 SrsOneCycleThread::on_thread_start

void SrsOneCycleThread::on_thread_start()
{
    /* handler 是 ISrsOneCycleThreadHandler 類的指針,該類的子類為 
     * SrsConnection,但是該類沒有實現 on_thread_start 函數,因此
     * 仍然調用父類的 on_thread_start */
    handler->on_thread_start();
}

void ISrsOneCycleThreadHandler::on_thread_start()
{
}

2.4 SrsOneCycleThread::on_before_cycle

int SrsOneCycleThread::on_before_cycle()
{
    /* 子類 SrsConnection 沒有實現 on_before_cycle 函數,因此
     * 調用父類 ISrsOneCycleThreadHandler 的 on_before_cycle */
    return handler->on_before_cycle();
}

int ISrsOneCycleThreadHandler::on_before_cycle()
{
    return ERROR_SUCCESS;
}

3. RTMP 事件處理循環

3.1 SrsOneCycleThread::cycle

int SrsOneCycleThread::cycle()
{
    /* 調用子類 SrsConnection 實現的 cycle() 函數 */
    int ret = handler->cycle();
    pthread->stop_loop();
    return ret;
}

3.2 SrsConnection::cycle

int SrsConnection::cycle()
{
    int ret = ERROR_SUCCESS;
    
    _srs_context->generate_id();
    id = _srs_context->get_id();
    
    /* 獲取客戶端的 ip 地址 */
    ip = srs_get_peer_ip(st_netfd_fileno(stfd));
    
    /* 調用子類的 SrsRtmpConn 實現的 do_cycle 函數開始進行 RTMP 連接請求處理 */
    ret = do_cycle();
    
    // if socket io error, set to closed.
    if (srs_is_client_gracefully_close(ret)) {
        ret = ERROR_SOCKET_CLOSED;
    }
    
    // sucess.
    if (ret == ERROR_SUCCESS) {
        srs_trace("client finished.");
    }
    
    // client close peer.
    if (ret == ERROR_SOCKET_CLOSED) {
        srs_warn("client disconnect peer. ret=%d", ret);
    }
    
    return ERROR_SUCCESS;
}

SrsRtmpConn、SrsRtmpServer 和 SrsProtocol 之間的關系圖

3.3 SrsRtmpConn::do_cycle

位於 srs_app_rtmp_conn.cpp。

// the timeout to send data to client,
// if timeout, close the connection.
#define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL)

// the timeout to wait client data,
// if timeout, close the connection.
#define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL)



// TODO: return detail message when error for client.
int SrsRtmpConn::do_cycle()
{
    int ret = ERROR_SUCCESS;
    
    srs_trace("RTMP client ip=%s", ip.c_str());
    
    /* 設置 rtmp 發送/接收數據時的超時時間 */
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
    
    /* 開始進行 RTMP 連接的 handshake 過程 */
    if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
        srs_error("rtmp handshake failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp handshake success");
    
    /* handshake 成功后,開始接收並解析客戶端發送的第一個 RTMP 消息:connect,該消息中
     * 指明了客戶端想要連接的 app,解析信息保存在 req 中 */
    if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
        srs_error("rtmp connect vhost/app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp connect app success");
    
    // set client ip to request.
    req->ip = ip;
    
    /* 如果在配置中沒有找到與 req->vhost 相同的,則使用默認的 vhost,
     * 即 __defaultVhost__  */
    // discovery vhost, resolve the vhost from config
    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
    if (parsed_vhost) {
        req->vhost = parsed_vhost->arg0();
    }
    
    srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", 
             req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
    
    if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) 
    {
        ret = ERROR_RTMP_REQ_TCUTL;
        srs_error("discovery tcUrl failed. "
            "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
            req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), 
            req->port.c_str(), req->app.c_str(), ret);
        return ret;
    }
    
    // check vhost
    if ((ret = check_vhost()) != ERROR_SUCCESS) {
        srs_error("check vhost failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check vhost success.");
    
    srs_trace("connect app, "
        "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if (req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
            srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    
    ret = service_cycle();
    
    http_hooks_on_close();
    
    return ret;
}

3.5 rtmp->handshake()

成功 accept 客戶端的連接請求,開始與客戶端進行 handshake 過程。具體可以參考 SRS之RTMP handshake

3.6 rtmp->connect_app

handshake 成功后,開始接收並解析客戶端發送的第一個 RTMP 命令消息:connect。該消息指明了客戶端想要連接服務器的某個 application。具體分析過程如 SRS之SrsRtmpServer::connect_app詳解

3.7 SrsRtmpConn::check_vhost

int SrsRtmpConn::check_vhost()
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(req != NULL);
    
    /* 獲取配置文件中 vhost 配置塊的內容 */
    SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost);
    if (vhost == NULL) {
        ret = ERROR_RTMP_VHOST_NOT_FOUND;
        srs_error("vhost %s not found. ret=%d", req->vhost.c_str(), ret);
        return ret;
    }
    
    /* 若 vhost 配置塊為空,則默認使能 */
    if (!_srs_config->get_vhost_enabled(req->vhost)) {
        ret = ERROR_RTMP_VHOST_NOT_FOUND;
        srs_error("vhost %s disabled. ret=%d", req->vhost.c_str(), ret);
        return ret;
    }
    
    if (req->vhost != vhost->arg0()) {
        srs_trace("vhost change from %s to %s", req->vhost.c_str(), vhost->arg0().c_str());
        req->vhost = vhost->arg0();
    }
    
    if ((ret = refer->check(req->pageUrl, _srs_config->get_refer(req->vhost))) != ERROR_SUCCESS) {
        srs_error("check refer failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check refer success.");
    
    if ((ret = http_hooks_on_connect()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

這里暫不具體分析,因為配置文件 vhost 為 defaultVhost,即沒有使能相關功能。

3.8 SrsRtmpConn::service_cycle

當檢測客戶端發送的 connect 有效的時候,開始調用該函數服務 client。具體分析過程見 SrsRtmpConn::service_cycle


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM