SrsConnection類代表一個client的連接,其中封裝了st thread,用於在一個單獨的st thread里處理一個client的服務請求.
SrsConnection在 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)里創建
SrsConnection* conn = NULL; if (type == SrsListenerRtmpStream) { conn = new SrsRtmpConn(this, client_stfd); } else if (type == SrsListenerHttpApi) { #ifdef SRS_AUTO_HTTP_API conn = new SrsHttpApi(this, client_stfd, http_api_mux); #else srs_warn("close http client for server not support http-api"); srs_close_stfd(client_stfd); return ret; #endif } else if (type == SrsListenerHttpStream) { #ifdef SRS_AUTO_HTTP_SERVER conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server); #else srs_warn("close http client for server not support http-server"); srs_close_stfd(client_stfd); return ret; #endif } else { // TODO: FIXME: handler others } srs_assert(conn);
創建完成后調用 conn->start()啟動,此后這個client的請求都在這個st thread 里完成
SrsConection類
/** * the manager for connection.
* SrsServer類實現了這一接口,用來對SrsConnection類進行管理. */ class IConnectionManager { public: IConnectionManager(); virtual ~IConnectionManager(); public: /** * remove the specified connection. */ virtual void remove(SrsConnection* c) = 0; }; /** * the basic connection of SRS, * all connections accept from listener must extends from this base class, * server will add the connection to manager, and delete it when remove. */ class SrsConnection : public virtual ISrsOneCycleThreadHandler, public virtual IKbpsDelta { private: /** * each connection start a green thread, * when thread stop, the connection will be delete by server.
* 業務處理線程 */ SrsOneCycleThread* pthread; /** * the id of connection. */ int id; protected: /** * the manager object to manage the connection. */ IConnectionManager* manager; /** * the underlayer st fd handler. */ st_netfd_t stfd; /** * the ip of client. */ std::string ip; /** * whether the connection is disposed, * when disposed, connection should stop cycle and cleanup itself. */ bool disposed; /** * whether connection is expired, application definition. * when expired, the connection must never be served and quit ASAP. */ bool expired; public: SrsConnection(IConnectionManager* cm, st_netfd_t c); virtual ~SrsConnection(); public: /** * to dipose the connection. */ virtual void dispose(); /** * start the client green thread. * when server get a client from listener, * 1. server will create an concrete connection(for instance, RTMP connection), * 2. then add connection to its connection manager, * 3. start the client thread by invoke this start() * when client cycle thread stop, invoke the on_thread_stop(), which will use server * to remove the client by server->remove(this). */ virtual int start(); // interface ISrsOneCycleThreadHandler public: /** * the thread cycle function, * when serve connection completed, terminate the loop which will terminate the thread, * thread will invoke the on_thread_stop() when it terminated. */ virtual int cycle(); /** * when the thread cycle finished, thread will invoke the on_thread_stop(), * which will remove self from server, server will remove the connection from manager * then delete the connection. */ virtual void on_thread_stop(); public: /** * get the srs id which identify the client. */ virtual int srs_id(); /** * set connection to expired. */ virtual void expire(); protected: /** * for concrete connection to do the cycle.
* 具體connection 的處理函數,參見SrsRtmpConn. */ virtual int do_cycle() = 0; };
int SrsConnection::start() {
// 啟動SrsOneCycleThread類 return pthread->start(); } int SrsConnection::cycle() { int ret = ERROR_SUCCESS; //這是個全局的id生成器,也用來生成st thread的id _srs_context->generate_id(); id = _srs_context->get_id(); // 取得客戶端的ip地址 ip = srs_get_peer_ip(st_netfd_fileno(stfd)); // 調用子類如SrsRtmpConn的處理函數 ret = do_cycle(); // if socket io error, set to closed. if (srs_is_client_gracefully_close(ret)) { ret = ERROR_SOCKET_CLOSED; } // success. if (ret == ERROR_SUCCESS) { srs_trace("client finished."); } // client close peer. if (ret == ERROR_SOCKET_CLOSED) { srs_warn("client disconnect peer. ret=%d", ret); } return ERROR_SUCCESS; } void SrsConnection::on_thread_stop() { // TODO: FIXME: never remove itself, use isolate thread to do cleanup.
// 用在SrsServer類的conns向量數組中刪除此對象
manager->remove(this); } int SrsConnection::srs_id() { return id; } void SrsConnection::expire() { expired = true; }
SrsRtmpConn類
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) : SrsConnection(svr, c) { server = svr; req = new SrsRequest(); res = new SrsResponse(); skt = new SrsStSocket(c);//實現ISrsProtocolReaderWriter接口,即st socket的讀寫函數,以及讀寫超時,發送和接受到的字節數 rtmp = new SrsRtmpServer(skt);// 提供rtmp-command-protocol服務,是一個上層的面向協議,media stream的服務,
// 例如連接vhost/app,播放流,獲取視頻/音頻數據,與之對應的是 SrsRtmpClient.
refer = new SrsRefer(); bandwidth = new SrsBandwidth(); security = new SrsSecurity(); duration = 0; kbps = new SrsKbps(); kbps->set_io(skt, skt); wakable = NULL; mw_sleep = SRS_PERF_MW_SLEEP; mw_enabled = false; realtime = SRS_PERF_MIN_LATENCY_ENABLED; send_min_interval = 0; tcp_nodelay = false; client_type = SrsRtmpConnUnknown; // 在配置改變時,支持reload _srs_config->subscribe(this); }
構造函數初始化成員變量
// TODO: return detail message when error for client. int SrsRtmpConn::do_cycle() { int ret = ERROR_SUCCESS; srs_trace("RTMP client ip=%s", ip.c_str());
// 設置SrsRtmpServer里面的底層數據通訊類SrsProtocol的讀寫超時 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); // 握手 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { srs_error("rtmp handshake failed. ret=%d", ret); return ret; } srs_verbose("rtmp handshake success"); // 獲取連接的app if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { srs_error("rtmp connect vhost/app failed. ret=%d", ret); return ret; } srs_verbose("rtmp connect app success"); // set client ip to request. req->ip = ip; // 假設推流地址 rtmp://192.168.151.151/live/marstv // discovery vhost, resolve the vhost from config
req->vhost 是192.168.151.151
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
// 沒有設置 192.168.151.151 這樣的vhost parsed_vhost是默認的vhost if (parsed_vhost) { req->vhost = parsed_vhost->arg0();// req->vhost 是 __defaultVhost__ } srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str()); if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) { ret = ERROR_RTMP_REQ_TCURL; srs_error("discovery tcUrl failed. " "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d", req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret); return ret; } // check vhost
// 在 check_vhost里面會調用 http_hooks_on_connect if ((ret = check_vhost()) != ERROR_SUCCESS) { srs_error("check vhost failed. ret=%d", ret); return ret; } srs_verbose("check vhost success."); srs_trace("connect app, " "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), (req->args? "(obj)":"null")); // show client identity if(req->args) { std::string srs_version; std::string srs_server_ip; int srs_pid = 0; int srs_id = 0; SrsAmf0Any* prop = NULL; if ((prop = req->args->ensure_property_string("srs_version")) != NULL) { srs_version = prop->to_str(); } if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) { srs_server_ip = prop->to_str(); } if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) { srs_pid = (int)prop->to_number(); } if ((prop = req->args->ensure_property_number("srs_id")) != NULL) { srs_id = (int)prop->to_number(); } srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); if (srs_pid > 0) { srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); } } ret = service_cycle(); http_hooks_on_close(); return ret; }
SrsRtmpConn::do_cycle最后調用 service_cycle函數,
service_cycle有一個循環調用 stream_service_recycle處理 client_type為 SrsRtmpConnPlay(調用playing函數),
SrsRtmpConnFMLEPublish,SrsRtmpConnHaivisionPublish,SrsRtmpConnFlashPublish,
int SrsRtmpConn::service_cycle() { int ret = ERROR_SUCCESS; //發送一個SrsSetWindowAckSizePacket類型的數據包
if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) { srs_error("set window acknowledgement size failed. ret=%d", ret); return ret; } srs_verbose("set window acknowledgement size success");
//
發送一個SrsSetWindowAckSizePacket類型的數據包
if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) { srs_error("set peer bandwidth failed. ret=%d", ret); return ret; } srs_verbose("set peer bandwidth success"); // get the ip which client connected. std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd)); // 檢測虛擬主機的bandcheck指令
// do bandwidth test if connect to the vhost which is for bandwidth check. if (_srs_config->get_bw_check_enabled(req->vhost)) { return bandwidth->bandwidth_check(rtmp, skt, req, local_ip); } // do token traverse before serve it. // @see https://github.com/ossrs/srs/pull/239 if (true) { bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); if (vhost_is_edge && edge_traverse) { if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) { srs_warn("token auth failed, ret=%d", ret); return ret; } } } // set chunk size to larger. // set the chunk size before any larger response greater than 128, // to make OBS happy, @see https://github.com/ossrs/srs/issues/454 int chunk_size = _srs_config->get_chunk_size(req->vhost); if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret); return ret; }
//發送一個SrsSetChunkSizePacket類型的數據包
srs_info("set chunk_size=%d success", chunk_size);
//發送一個SrsConnectAppResPacket類型的數據包
// response the client connect ok. if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) { srs_error("response connect app failed. ret=%d", ret); return ret; } srs_verbose("response connect app success"); //發送一個SrsOnBWDonePacket類型的數據包
if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) { srs_error("on_bw_done failed. ret=%d", ret); return ret; } srs_verbose("on_bw_done success"); while (!disposed) { ret = stream_service_cycle(); // stream service must terminated with error, never success. // when terminated with success, it's user required to stop. if (ret == ERROR_SUCCESS) { continue; } // when not system control error, fatal error, return. if (!srs_is_system_control_error(ret)) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("stream service cycle failed. ret=%d", ret); } return ret; } // for republish, continue service if (ret == ERROR_CONTROL_REPUBLISH) { // set timeout to a larger value, wait for encoder to republish. rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US); rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US); srs_trace("control message(unpublish) accept, retry stream service."); continue; } // for "some" system control error, // logical accept and retry stream service. if (ret == ERROR_CONTROL_RTMP_CLOSE) { // TODO: FIXME: use ping message to anti-death of socket. // @see: https://github.com/ossrs/srs/issues/39 // set timeout to a larger value, for user paused. rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US); srs_trace("control message(close) accept, retry stream service."); continue; } // for other system control message, fatal error. srs_error("control message(%d) reject as error. ret=%d", ret, ret); return ret; } return ret; }
int SrsRtmpConn::stream_service_cycle() { int ret = ERROR_SUCCESS; SrsRtmpConnType type;
//接受client的消息,是play 或 publish
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("identify client failed. ret=%d", ret); } return ret; } req->strip(); srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration); // security check
//根據ip地址對client進行過濾,位於srs_app_security.cpp:39
//先檢查allow規則,在檢查deny規則,deny規則的優先級較高
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) { srs_error("security check failed. ret=%d", ret); return ret; } srs_info("security check ok"); // Never allow the empty stream name, for HLS may write to a file with empty name. // @see https://github.com/ossrs/srs/issues/834 if (req->stream.empty()) { ret = ERROR_RTMP_STREAM_NAME_EMPTY; srs_error("RTMP: Empty stream name not allowed, ret=%d", ret); return ret; } // client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); // find a source to serve. SrsSource* source = NULL; if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) { return ret; } srs_assert(source != NULL); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { srs_error("stat client failed. ret=%d", ret); return ret; } bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id(), source->source_id()); source->set_cache(enabled_cache); client_type = type; switch (type) { case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); // response connection start play if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; } if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) { srs_error("http hook on_play failed. ret=%d", ret); return ret; } srs_info("start to play stream %s success", req->stream.c_str()); ret = playing(source); http_hooks_on_stop(); return ret; } case SrsRtmpConnFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
//rtmp->start_fmle_publish功能
//1. 接收SrsFMLEStartPacket類型的數據包
// 將其中的transaction_id保存下來
// 2. 將transaction_id 作為參數構造SrsFMLEStartResPacket包返回給客戶端
// 3. 接收SrsCreateStreamPacket類型的數據包
// 保存其中的transaction_id
//4. 用接收的transaction_id和服務端的stream_id初始化
// SrsCreateStreamResPacket包返回個publish client
// 5.接收SrsPublishPacket
//6. 回復SrsOnStatusCallPacket 參數
// command_name --> onFCPublish
// StatusCode --> NetStream.Publish.Start
// StatusDescription --> "Started publishing stream."
//7. 在回復一個SrsOnStatusCallPacket參數
// StatusLevel --> StatusLevelStatus
// StatusCode --> StatusCodePublishStart
// StatusDescription --> 同上
// StatusClientId --> RTMP_SIG_CLIENT_ID --> "ASAICiss"
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } case SrsRtmpConnHaivisionPublish: { srs_verbose("Haivision start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; srs_info("invalid client type=%d. ret=%d", type, ret); return ret; } } return ret; }
最后調用publishing函數,代碼如下
int SrsRtmpConn::publishing(SrsSource* source) { int ret = ERROR_SUCCESS; if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { srs_error("check publish_refer failed. ret=%d", ret); return ret; } srs_verbose("check publish_refer success."); if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source, client_type != SrsRtmpConnFlashPublish, vhost_is_edge); srs_info("start to publish stream %s success", req->stream.c_str()); ret = do_publishing(source, &trd); // stop isolate recv thread trd.stop(); } // whatever the acquire publish, always release publish. // when the acquire error in the midlle-way, the publish state changed, // but failed, so we must cleanup it. // @see https://github.com/ossrs/srs/issues/474 // @remark when stream is busy, should never release it. if (ret != ERROR_SYSTEM_STREAM_BUSY) { release_publish(source, vhost_is_edge); } http_hooks_on_unpublish(); return ret; }
主要功能如下:
1. 調用SrsRefer的check函數
2. 調用 http_hooks_on_publish函數
3. 調用 acquire_publish函數
4. 如果3 成功,新建SrsPublishRecvThread實例后,執行do_publishing函數
4. call http_hooks_on_unpublish函數
int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) { int ret = ERROR_SUCCESS; SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsAutoFree(SrsPithyPrint, pprint); // start isolate recv thread. if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start isolate recv thread failed. ret=%d", ret); return ret; } // change the isolate recv thread context id, // merge its log to current thread. int receive_thread_cid = trd->get_cid(); trd->set_cid(_srs_context->get_id()); // initialize the publish timeout. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost); // set the sock options. set_sock_options(); if (true) { bool mr = _srs_config->get_mr_enabled(req->vhost); int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d", mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay, receive_thread_cid); } int64_t nb_msgs = 0; while (!disposed) { pprint->elapse(); // when source is set to expired, disconnect it. if (expired) { ret = ERROR_USER_DISCONNECT; srs_error("connection expired. ret=%d", ret); return ret; } // cond wait for timeout. if (nb_msgs == 0) { // when not got msgs, wait for a larger timeout. // @see https://github.com/ossrs/srs/issues/441 trd->wait(publish_1stpkt_timeout); } else { trd->wait(publish_normal_timeout); } // check the thread error code. if ((ret = trd->error_code()) != ERROR_SUCCESS) { if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { srs_error("recv thread failed. ret=%d", ret); } return ret; } // when not got any messages, timeout. if (trd->nb_msgs() <= nb_msgs) { ret = ERROR_SOCKET_TIMEOUT; srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d", nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret); break; } nb_msgs = trd->nb_msgs(); // reportable if (pprint->can_print()) { kbps->sample(); bool mr = _srs_config->get_mr_enabled(req->vhost); int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout ); } } return ret; }
等待SrsPublishRecvThread結束
接受包的處理流程
SrsRecvThread::cycle()調用SrsRtmpServer的recv_message函數接收類型為SrsCommonMessage的消息
call handler->handle(msg)
也就是SrsPublishRecvThread的handle方法
在轉到SrsRtmpConn的handle_publish_msg,檢查消息頭部,對命令的格式進行檢查
在轉到SrsRtmpConn的process_publish_msg
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge) { int ret = ERROR_SUCCESS; // for edge, directly proxy message to origin. if (vhost_is_edge) { if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) { srs_error("edge publish proxy msg failed. ret=%d", ret); return ret; } return ret; } // process audio packet if (msg->header.is_audio()) { if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) { srs_error("source process audio message failed. ret=%d", ret); return ret; } return ret; } // process video packet if (msg->header.is_video()) { if ((ret = source->on_video(msg)) != ERROR_SUCCESS) { srs_error("source process video message failed. ret=%d", ret); return ret; } return ret; } // process aggregate packet if (msg->header.is_aggregate()) { if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) { srs_error("source process aggregate message failed. ret=%d", ret); return ret; } return ret; } // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt); if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) { srs_error("source process onMetaData message failed. ret=%d", ret); return ret; } srs_info("process onMetaData message success."); return ret; } srs_info("ignore AMF0/AMF3 data message."); return ret; } return ret; }
根據vhost 是否是 edge
數據是 audio video aggregate 調用 SrsSource類的
on_edge_proxy_publish
on_audio
on_video
on_aggregate
on_metadata