簡介
創建Source
rtmp推流的時候就會創建SrsLiveSource和SrsRtcSource;
srs_error_t SrsRtmpConn::stream_service_cycle() { srs_error_t err = srs_success; SrsRequest* req = info->req; if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) { return srs_error_wrap(err, "rtmp: identify client"); } srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param); req->strip(); srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms", srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration)); // discovery vhost, resolve the vhost from config SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost); if (parsed_vhost) { req->vhost = parsed_vhost->arg0(); } if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) { return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s", req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str()); } // check vhost, allow default vhost. if ((err = check_vhost(true)) != srs_success) { return srs_error_wrap(err, "check vhost"); } srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null")); // do token traverse before serve it. // @see https://github.com/ossrs/srs/pull/239 if (true) { info->edge = _srs_config->get_vhost_is_edge(req->vhost); bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); if (info->edge && edge_traverse) { if ((err = check_edge_token_traverse_auth()) != srs_success) { return srs_error_wrap(err, "rtmp: check token traverse"); } } } // security check if ((err = security->check(info->type, ip, req)) != srs_success) { return srs_error_wrap(err, "rtmp: security check"); } // Never allow the empty stream name, for HLS may write to a file with empty name. // @see https://github.com/ossrs/srs/issues/834 if (req->stream.empty()) { return srs_error_new(ERROR_RTMP_STREAM_NAME_EMPTY, "rtmp: empty stream"); } // client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); // find a source to serve. SrsLiveSource* source = NULL; if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } srs_assert(source != NULL); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) { return srs_error_wrap(err, "rtmp: stat client"); } bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str()); source->set_cache(enabled_cache); switch (info->type) { case SrsRtmpConnPlay: { // response connection start play if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start play"); } if ((err = http_hooks_on_play()) != srs_success) { return srs_error_wrap(err, "rtmp: callback on play"); } err = playing(source); http_hooks_on_stop(); return err; } case SrsRtmpConnFMLEPublish: { if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start FMLE publish"); } return publishing(source); } case SrsRtmpConnHaivisionPublish: { if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start HAIVISION publish"); } return publishing(source); } case SrsRtmpConnFlashPublish: { if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start FLASH publish"); } return publishing(source); } default: { return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type); } } return err; }
這里RTMP的業務處理中心,比較重要,推流和播放都在這里處理,同時SrsLiveSource也是在這里創建的。
數據接受
我們在source的音頻回調這里打一下斷點,看一下調用棧,就可以知道調用順序
(gdb) bt #0 SrsLiveSource::on_audio (this=0xfcd7a0, shared_audio=0x1067b80) at src/app/srs_app_source.cpp:2140 #1 0x00000000004f2241 in SrsRtmpConn::process_publish_message (this=0xfcd890, source=0xfcd7a0, msg=0x1067b80) at src/app/srs_app_rtmp_conn.cpp:1055 #2 0x00000000004f20ef in SrsRtmpConn::handle_publish_message (this=0xfcd890, source=0xfcd7a0, msg=0x1067b80) at src/app/srs_app_rtmp_conn.cpp:1034 #3 0x00000000005a0d06 in SrsPublishRecvThread::consume (this=0x10060a0, msg=0x1067b80) at src/app/srs_app_recv_thread.cpp:376 #4 0x000000000059fe7e in SrsRecvThread::do_cycle (this=0x10060c0) at src/app/srs_app_recv_thread.cpp:133 #5 0x000000000059fcea in SrsRecvThread::cycle (this=0x10060c0) at src/app/srs_app_recv_thread.cpp:102 #6 0x000000000051fe11 in SrsFastCoroutine::cycle (this=0x10679c0) at src/app/srs_app_st.cpp:253 #7 0x000000000051fe94 in SrsFastCoroutine::pfn (arg=0x10679c0) at src/app/srs_app_st.cpp:268 #8 0x00000000006346e8 in _st_thread_main () at sched.c:363 #9 0x0000000000634f5b in st_thread_create (start=0x1005daf, arg=0x10679a0, joinable=0, stk_size=16801184) at sched.c:694
SrsRecvThread::do_cycle()
srs_error_t SrsRecvThread::do_cycle() { srs_error_t err = srs_success; while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "recv thread"); } // When the pumper is interrupted, wait then retry. if (pumper->interrupted()) { srs_usleep(timeout); continue; } SrsCommonMessage* msg = NULL; // Process the received message. if ((err = rtmp->recv_message(&msg)) == srs_success) { err = pumper->consume(msg); } if (err != srs_success) { // Interrupt the receive thread for any error. trd->interrupt(); // Notify the pumper to quit for error. pumper->interrupt(err); return srs_error_wrap(err, "recv thread"); } } return err; }
SRS有個專門的協程處理數據接受,主要關注兩個rtmp->recv_message(&msg)和pumper->consume(msg)
rtmp->recv_message(&msg):用來接受數據,最終調用SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
pumper->consume(msg):用來處理數據
SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)

srs_error_t SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg) { srs_error_t err = srs_success; // empty message if (chunk->header.payload_length <= 0) { srs_trace("get an empty RTMP message(type=%d, size=%d, time=%" PRId64 ", sid=%d)", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); *pmsg = chunk->msg; chunk->msg = NULL; return err; } srs_assert(chunk->header.payload_length > 0); // the chunk payload size. int payload_size = chunk->header.payload_length - chunk->msg->size; payload_size = srs_min(payload_size, in_chunk_size); // create msg payload if not initialized if (!chunk->msg->payload) { chunk->msg->create_payload(chunk->header.payload_length); } // read payload to buffer if ((err = in_buffer->grow(skt, payload_size)) != srs_success) { return srs_error_wrap(err, "read %d bytes payload", payload_size); } memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size); chunk->msg->size += payload_size; // got entire RTMP message? if (chunk->header.payload_length == chunk->msg->size) { *pmsg = chunk->msg; chunk->msg = NULL; return err; } return err; }
這里分別拷貝rtmp的header和body給SrsCommonMessage,到這里我們就算拿到rtmp的數據了。
音頻的話,rtmp的header的TypeID應該為8,即rtmpt.header.typeid == 0x08。
而body是按照flv的格式組織的,相當於flv的音頻tag:
- 第⼀個字節包含了⾳頻數據的參數信息,
- 第⼆個字節開始為⾳頻流數據。
我們抓一個音頻的rtmp包分析一下:
AudioTagHeader格式:
Field | Type | Value | comment |
音頻格式 | uB4 | 10 (0xaf) | 0 = Linear PCM, platform endian 11 = Speex 14 = MP3 8-Khz |
采樣率 | uB2 | 3 (0xaf) | 0 = 5.5kHz 1 = 11kHz |
采樣精度 | uB1 | 1 (0xaf) | 0 = snd8Bit |
音頻聲道 | uB1 | 1 (0xaf) | 0 = sndMono 單聲道
|
處理數據
通過pumper->consume跟蹤,發現process_publish_message()用來處理rtmp的message。
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) { srs_error_t err = srs_success; // for edge, directly proxy message to origin. if (info->edge) { if ((err = source->on_edge_proxy_publish(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: proxy publish"); } return err; } // process audio packet 語音包 typeid = 8 if (msg->header.is_audio()) { if ((err = source->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume audio"); } return err; } // process video packet 視頻包 typeid = 9 if (msg->header.is_video()) { if ((err = source->on_video(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume video"); } return err; } // process aggregate packet 統計消息 typeid = 22 if (msg->header.is_aggregate()) { if ((err = source->on_aggregate(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume aggregate"); } return err; } // process onMetaData 數據消息 typed = 18 | 15 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) { return srs_error_wrap(err, "rtmp: decode message"); } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt); if ((err = source->on_meta_data(msg, metadata)) != srs_success) { return srs_error_wrap(err, "rtmp: consume metadata"); } return err; } return err; } return err; }
這里分別處理語音、視頻等各種消息。