首先使用 obs 推流符合如下流程:參考自 Hanvision Makito X cann't publish to SRS. .
FFMPEG:
C/S: Handshake
C: ConnectApp() tcUrl=xxx
S: Ack Size 2500,000
S: Set Peer Bandwidth 2500,000
S: Set Chunk Size 60,000
C: Set Chunk Size 60,000
S: ConnectApp() _result
S: onBWDone()
C: releaseStream+FCPublish(s0)
C: createStream()
S: releaseStream _result
C: _checkbw()
S: FCPublish() _result
S: createStream() _result
C: publish(s0)
S: onFCPublish()
S: onStatus()
下面的分析是繼服務器發送 onBWDone 后,進入 while 循環開始執行 stream_service_cycle。
1. SrsRtmpConn::stream_service_cycle
int SrsRtmpConn::stream_service_cycle()
{
int ret = ERROR_SUCCESS;
/* the rtmp client type: play/publish/unknown */
SrsRtmpConnType type;
/* 首先鑒別客戶端請求的類型,是play/publish 或其他,還有播放/推流的流名稱 */
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration))
!= ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("identify client failed. ret=%d", ret);
}
return ret;
}
req->strip();
srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f",
srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
/* 只有當配置文件中使能了 security 配置項,才會真正進入到該 check 函數進行
* 一系列的檢測 */
/* allow all if security disabled. */
// secutity check
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
srs_error("security check failed. ret=%d", ret);
return ret;
}
srs_info("security check ok");
/* SRS 不允許請求的流名稱為空 */
// Never allow the empty stream name, for HLS may write to a file with empty name.
// @see https://github.com/ossrs/srs/issues/834:
// SRS2 crashed for TS encoder assert failed
if (req->stream.empty()) {
ret = ERROR_RTMP_STREAM_NAME_EMPTY;
srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
return ret;
}
/* 設置服務器 send/recv 的超時時間 */
// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
/* 首先根據 vhost/app/stream 構造一個 stream_url,然后根據該 stream_url 在 SrsSource::pool
* 中查找是否存在一個 stream_url 對應的 SrsSource,若能找到,則直接返回該 SrsSource,否則,
* 新構造一個 SrsSource,並將其按 stream_url 放到 SrsSource::pool map 容器中 */
// find a source to serve.
SrsSource* source = NULL;
if ((ret = SrsSorce::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(source != NULL);
/* 構造統計類,將統計當前的 vhost、stream 等信息 */
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS)
{
srs_error("stat client failed. ret=%d", ret);
return ret;
}
/* 若 vhost 中沒有配置 mode,則返回 false */
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
/* 默認開始 gop_cache */
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
source->source_id(), source->source_id());
/* 根據 enabled_cache 設置是否啟動 gop_cache,為 true,則啟動 */
source->set_cache(enabled_cache);
/* 根據鑒別到的客戶端的類型:play 或者 publish,開始進行相應的處理 */
/* The type of client, play or publish. */
client_type = type;
switch (type) {
case SrsRtmpConnPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str());
// response connection start play
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to play stream failed. ret=%d", ret);
return ret;
}
if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
srs_error("http hook on_play failed. ret=%d", ret);
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str());
ret = playing(source);
http_hooks_on_stop();
return ret;
}
/* 由前面知,若 obs 推流的話為該類型 */
case SrsRtmpConnFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
/* 該函數主要是接收並響應一系列消息:
* C: FCPublish
* S: FCPublish response
* C: createStream
* S: createStream response
* C: publish
* S: publish response onFCPublish(NetStream.Publish.Start)
* S: publish response onStatus(NetStream.Publish.Start) */
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
/* 服務器響應客戶端的publish消息后,即開始進入接收客戶端推流的
* metadata、video、audio等數據的處理 */
return publishing(source);
}
case SrsRtmpConnHaivisionPublish: {
srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
srs_verbose("flash start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("flash start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
default: {
ret = ERROR_SYSTEM_CLIENT_INVALID;
srs_info("invalid client type=%d. ret=%d", type, ret);
return ret;
}
}
return ret;
}
2. SrsRtmpServer::identify_client
該函數是對客戶端請求進行鑒定,以便做出相應的處理。
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type,
string& stream_name, double& duration)
{
type = SrsRtmpConnUnknown;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
/* 接收一個完整的消息 */
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("recv identify client message failed. ret=%d", ret);
}
return ret;
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() ||
h.is_windonw_ackledgenemt_size() || h.is_user_control_message()) {
continue;
}
/* 若不為 amf 類型的消息,則忽略該消息,繼續接收下一個消息 */
if (!h.is_amf0_commnad() && !h.is_amf3_command()) {
srs_trace("identify ignore message except "
"AMF0/AMF3 command message. type=%#x", h.message_type);
continue;
}
SrsPacket* pkt = NULL;
/* 對接收到的 amf 命令消息進行解碼,解碼后的數據保存在 pkt 指向的子類中 */
if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("identify decode message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);
/* 下面是通過 dynamic_cast 動態轉換嘗試將 pkt 轉為指定的類型,
* 若不為 NULL,則表明接收到的消息即為所要的消息 */
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
srs_info("identify client by create stream, play or flash publish.");
return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt),
stream_id, type, stream_name, duration);
}
/* 當接收到的是 releaseStream/FCPublish/FCUnpublish 這三個中的一個時,
* 構造的類都為 SrsFMLEStartPacket */
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
srs_info("identify client by releaseStream, fmle publish");
/* 這里即可確定 client 的類型為 publish */
return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt),
type, stream_name);
}
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
srs_info("level0 identify client by play.");
return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type,
stream_name, duration);
}
/* call msg,
* support response null first,
* @see https://github.com/ossrs/srs/issues/106
* TODO: FIXME: response in right way, or forward in edge mode. */
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
if (call) {
SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
res->command_object = SrsAmf0Any::null();
res->response = SrsAmf0Any::null();
if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) &&
!srs_is_client_gracefully_close(ret)) {
srs_warn("response call failed. ret=%d", ret);
}
return ret;
}
/* For encoder of Haivision, it always send a _checkbe call message.
* @remark the next message is createStream, so we continue to identify it.
* @see https://github.com/ossrs/srs/issues/844 */
if (call->command_name == "_checkbw") {
srs_info("Havision encoder identified.");
continue;
}
continue;
}
srs_trace("ignore AMF0/AMF3 command message.");
}
return ret;
}
2.1 SrsProtocol::recv_message
int SrsProtocol::recv_message(SrsCommonMessage* pmsg)
{
*pmsg = NULL;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
/* 從 socket 中讀取一個消息 */
if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("recv interlaced message failed. ret=%d", ret);
}
srs_freep(msg);
return ret;
}
srs_verbose("entire msg received");
if (!msg) {
srs_info("got empty message without error.");
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg);
continue;
}
/* 若接收到的是一些control消息,如 set chunk size 等,則更改上下文信息,
* 其他的消息如音視頻或amf類型的則不做處理 */
if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
srs_error("hook the received msg failed. ret=%d", ret);
srs_freep(msg);
return ret;
}
srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64,
msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length,
msg->header.timestamp);
*pmsg = msg;
break;
}
return ret;
}
2.1.1 SrsProtocol::recv_interlaced_message
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// chunk stream basic header.
char fmt = 0;
int cid = 0;
/* 讀取 chunk 的基本頭 */
if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read basic header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
// thd cid must not negative.
srs_assert(cid >= 0);
/* 構造一個塊流緩存,因為一個 RTMP 消息可能包含多個塊,
* 因此這里使用該塊流緩存將多個塊的信息保存起來,直到
* 接收到一個完整的消息為止 */
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
/* SrsChunkStream** cs_cache:
* SrsProtocol 類在構造的時候就為 cs_cache 數組分配了 SRS_PERF_CHUNK_STREAM_CACHE(16)
* 個 SrsChunkStream 元素空間,當 chunk 的 cid 不超過該數組最大值時,可以直接從該數組中
* 取出一個項,即 SrsChunkStream 使用 */
// use chunk stream cache to get the chunk info.
// @see https://github.com/ossrs/srs/issues/249
if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
// chunk stream cache hit.
srs_verbose("cs-cache hit, cid=%d", cid);
// already init, use it directly
chunk = cs_cache[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
} else {
// chunk stream cache miss, use map.
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
}
/* 根據 fmt 接收 chunk 的消息頭 */
// chunk stream message header
if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read message header success. "
"fmt=%d, ext_time=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
// read msg payload from chunk stream.
SrsCommonMessage* msg = NULL;
if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message payload failed. ret=%d", ret);
}
return ret;
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
srs_verbose("get partial message success. size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
(msg? msg->size : (chunk->msg? chunk->msg->size : 0)),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
*pmsg = msg;
srs_info("get entire message success. size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
(msg? msg->size : (chunk->msg? chunk->msg->size : 0)),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
2.2 SrsProtocol::decode_message
int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
*ppacket = NULL;
int ret = ERROR_SUCCESS;
srs_assert(msg != NULL);
srs_assert(msg->payload != NULL);
srs_assert(msg->size > 0);
SrsStream stream;
/* 將消息負載的數據使用 SrsStream 類進行初始化 */
// initialize the decode stream for all message,
// it's ok for the initialize if fase and without memory copy.
if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
srs_error("initialize stream failed. ret=%d", ret);
return ret;
}
srs_verbose("decode stream initialized success");
// decode the packet.
SrsPacket* packet = NULL;
if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
srs_freep(packet);
return ret;
}
// set to output ppacket only when success.
*ppacket = packet;
return ret;
}
2.2.1 SrsProtocol::do_decode_message
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
SrsPacket** packet)
{
int ret = ERROR_SUCCESS;
SrsPacket* packet = NULL;
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command()
|| header.is_amf0_data() || header.is_amf3_data()) {
srs_verbose("start to decode AMF0/AMF3 command message.");
// skip 1bytes to decode the amf3 command.
if (header.is_amf3_command) && stream->require(1)) {
srs_verbose("skip 1bytes to decode AMF3 command");
stream->skip(1);
}
// amf0 command message.
// need to read the command name.
std::string command;
/* 讀取消息的命令名 */
if ((ret = srs_amfo_read_string(stream, command)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
// result/error packet
if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
double transactionId = 0.0;
if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
// reset stream, for header read completed.
stream->skip(-1 * stream->pos());
if (header.is_amf3_command()) {
stream->skip(1);
}
// find the call name
if (requests.find(transactionId) == requests.end()) {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
return ret;
}
std::string request_name = requests[transactionId];
srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 response command(%s message).",
request_name.c_str());
*ppacket = packet = new SrsConnectAppResPacket();
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 response command(%s message).",
request_name.c_str());
*ppacket = packet = new SrsCreateStreamResPacket(0, 0);
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
|| request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
|| request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 response command(%s message).",
request_name.c_str());
*ppacket = packet = new SrsFMLEStartResPacket(0);
return packet->decode(stream);
} else {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. "
"request_name=%s, transactionId=%.2f, ret=%d",
request_name.c_str(), transactionId, ret);
return ret;
}
}
// reset to zero(amf3 to 1) to restart decode.
stream->skip(-1 * stream->pos());
if (header.is_amf3_command()) {
stream->skip(1);
}
/* 根據消息的命令名來構造對應的類,然后進行解碼 */
// decode command object.
if (command == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
*ppacket = packet = new SrsConnectAppPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 command(createStream message).");
*ppacket = packet = new SrsCreateStreamPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_PLAY) {
srs_info("decode the AMF0/AMF3 command(paly message).");
*ppacket = packet = new SrsPlayPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PAUSE) {
srs_info("decode the AMF0/AMF3 command(pause message).");
*ppacket = packet = new SrsPausePacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
*ppacket = packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
*ppacket = packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(publish message).");
*ppacket = packet = new SrsPublishPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 command(unpublish message).");
*ppacket = packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == SRS_CONSTS_RTMP_SET_DATAFRAME ||
command == SRS_CONSTS_RTMP_ON_METADATA) {
srs_info("decode the AMF0/AMF3 data(onMetaData message).");
*ppacket = packet = new SrsOnMetaDataPacket();
return packet->decode(stream);
} else if(command == SRS_BW_CHECK_FINISHED
|| command == SRS_BW_CHECK_PLAYING
|| command == SRS_BW_CHECK_PUBLISHING
|| command == SRS_BW_CHECK_STARTING_PLAY
|| command == SRS_BW_CHECK_STARTING_PUBLISH
|| command == SRS_BW_CHECK_START_PLAY
|| command == SRS_BW_CHECK_START_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PLAY
|| command == SRS_BW_CHECK_STOP_PLAY
|| command == SRS_BW_CHECK_STOP_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PUBLISH
|| command == SRS_BW_CHECK_FINAL)
{
srs_info("decode the AMF0/AMF3 band width check message.");
*ppacket = packet = new SrsBandwidthPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
srs_info("decode the AMF0/AMF3 closeStream message.");
*ppacket = packet = new SrsCloseStreamPacket();
return packet->decode(stream);
} else if (header.is_amf0_command() || header.is_amf3_command()) {
srs_info("decode the AMF0/AMF3 call message.");
*ppacket = packet = new SrsCallPacket();
return packet->decode(stream);
}
// default packet to drop message.
srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
*ppacket = packet = new SrsPacket();
return ret;
} else if (header.is_user_control_message()) {
srs_verbose("start to decode user control message.");
*ppacket = packet = new SrsUserControlPacket();
return packet->decode(stream);
} else if(header.is_window_ackledgement_size()) {
srs_verbose("start to decode set ack window size message.");
*ppacket = packet = new SrsSetWindowAckSizePacket();
return packet->decode(stream);
} else if(header.is_set_chunk_size()) {
srs_verbose("start to decode set chunk size message.");
*ppacket = packet = new SrsSetChunkSizePacket();
return packet->decode(stream);
} else {
if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) {
srs_trace("drop unknown message, type=%d", header.message_type);
}
}
return ret;
}
由開始的流程知,服務器發送 onBWDone 后,接下來會接收到客戶端的 releaseStream 命令。對於 releaseStream/PublishStream/FCPublish/FCUnpublish 等都是使用 SrsFMLEStartPacket 類來構造的。
2.2.2 構造 SrsFMLEStartPacket 類
/**
* FMLE start publish: ReleaseStream/PublishStream/FCPublish/FCUnpublish
*/
SrsFMLEStartPacket::SrsFMLEStartPacket()
{
/* 命令名:releaseStream */
command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM;
/* the transaction ID to get the response. */
transaction_id = 0;
/**
* If there exists any command info this is set, else this is set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
command_object = SrsAmf0Any::null();
}
2.2.3 SrsFMLEStartPacket::decode
開始解析 releaseStream 消息的負載。
int SrsFMLEStartPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
/* 讀取該消息的命令名稱 */
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty()
|| (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
&& command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
&& command_name != RTMP_AMF0_COMMAND_UNPUBLISH))
{
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode FMLE start packet success");
return ret;
}
recv:releaseStream('live')

2.3 SrsRtmpServer::identify_fmle_publish_client
當解析接收到的消息為 releaseStream 的時候,會調用該函數。
int SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req,
SrsRtmpConnType& type, string& stream_name)
{
int ret = ERROR_SUCCESS;
/* 鑒別 client 的類型為 publish */
type = SrsRtmpConnFMLEPublish;
/* 客戶端 publish 的流名稱 */
stream_name = req->stream_name;
/* 下面是對 releaseStream 消息的響應 */
// releaseStream response
if (true) {
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send releaseStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send releaseStream response message success.");
}
return ret;
}
該函數中是對 releaseStream 的響應。 發送的包如下圖:
send: response for releaseStream

2.3.1 構造 SrsFMLEStartResPacket 類
/**
* response for SrsFMLEStartPacket.
*/
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
/* 響應消息的名稱:_result */
command_name = RTMP_AMF0_COMMAND_RESULT;
transaction_id = _transaction_id;
/**
* If there exists any command info this is set, else this is set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
command_object = SrsAmf0Any::null();
/**
* the optional args, set to undefined.
* @remark, never be NULL, an AMF0 undefined instance.
*/
args = SrsAmf0Any::undefined();
}
2.3.2 SrsFMLEStartResPacket::encode_packet
構建 releaseStream response 消息的負載.
int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");
srs_info("encode FMLE start response packet success.");
return ret;
}
3. SrsSource::fetch_or_create
/**
* create source when fetch from cache failed.
* @param r the client request.
* @param h the event handler for source.
* @param pps the matches source, if success never be NULL.
*/
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
SrsSource* source = NULL;
/* 根據 vhost/app/stream 組成的 stream_url 在 pool 中查找是否存在
* 該與之對應的 SrsSource,若不存在,則返回 NULL */
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
/* vhost/app/stream 生成一個 stream_url */
string stream_url = r->get_stream_url();
string vhost = r->vhost;
/* 必須確定在 pool 中沒有 stream_url 對應的項 */
// should always not exists for create a source.
srs_assert(pool.find(stream_url) == pool.end());
/* 構建一個新的 SrsSource */
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
/* 將該新生成的 source 放入到 pool 中 */
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
/* 通過 pps 返回該新生成的 source */
*pps = source;
return ret;
}
3.1 SrsSource::fetch
/**
* get the exists source, NULL when not exists.
* update the request and return the exists source.
*/
SrsSource* SrsSource::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
/* get the stream identify, vhost/app/stream. */
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
/* we always update the request of resource,
* for origin auth is on, the token in request maybe invalid,
* and we only need to update the token of request, it's simple. */
source->_req->update_auth(r);
return source;
}
3.2 構造 SrsSource 類
構造一個直播流源。
/**
* the time jitter algorithm:
* 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
* 2. zero, only ensure stream start at zero, ignore timestamp jitter.
* 3. off, disable the time jitter algorithm, like atc.
*/
enum SrsRtmpJitterAlgorithm
{
SrsRtmpJitterAlgorithmFULL = 0x01,
SrsRtmpJitterAlgorithmZERO,
SrsRtmpJitterAlgorithmOFF
};
SrsSource::SrsSource()
{
/* _req: deep copy of client request. */
_req = NULL;
/* the time jitter algorithm for vhost: vhost 的時間抖動算法 */
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
/* whether use interlaced/mixed algorithm to correct timestamp.
* 這里初始化禁止 */
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_HLS
/* 構造一個 hls handler */
hls = new SrsHls();
#endif
#ifdef SRS_AUTO_DVR
/* 構造一個 dvr handler */
dvr = new SrsDvr();
#endif
#ifdef SRS_AUTO_TRANSCODE
/* 構造一個 transcoding handler */
encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
hds = new SrsHds(this);
#endif
/**
* cache_sh_video: the cached video sequence header.
* cache_sh_audio: the cached audio sequence header.
*/
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
/* can publish, true when is not streaming */
_can_publish = true;
/**
* source id,
* for publish, it's the publish client id.
* for edge, it's the edge ingest id.
* when source id changed, for example, the edge reconnect,
* invoke the on_source_id_changed() to let all clients know.
*
* _pre_source_id: previous source id.
*/
_pre_source_id = _source_id = -1;
/**
* last die time, when all consumeers quit and no publisher,
* we will remove the source when source die.
*/
die_at = -1;
/* edge control service */
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
/* gop cache for client fast startup. */
gop_cache = new SrsGopCache();
/* for aggregate message */
aggregate_stream = new SrsStream();
/* whether stream is monotonically increase. */
is_monotonicaly_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
/**
* atc whether atc(use absolute time and donot adjust time),
* directly use msg time and donot adjust if atc is true,
* otherwise, adjust msg time to start from 0 to make flash happy.
*
* TODO: FIXME: to support reload atc.
*/
atc = false;
}
3.3 SrsSource::initialize
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
int ret = ERROR_SUCCESS;
srs_assert(h);
srs_assert(!_req);
handler = h;
/* 深度拷貝,將 r 中的內容完全拷貝到 _req 中 */
_req = r->copy();
/* 若 vhost 中沒有設置 atc 配置項,則返回 false,
* atc 為 false,則會調整 msg 時間從 0 開始 */
atc = _srs_config->get_atc(_req->vhost);
/* 暫不分析 */
#ifdef SRS_AUTO_HLS
if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
#endif
if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
/* 若 vhost 沒有設置 queue_length,則使用默認的,為 30 */
double queue_size = _srs_config->get_queue_length(_req->vhost);
publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
mix_correct = _srs_config->get_mix_correct(_req->vhost);
return ret;
}
3.4 SrsStatistic::on_client
/**
* when got a client to publish/play stream,
* @param id, the client srs id.
* @param req, the client request object.
* @param conn, the physical abstract connection object.
* @param type, the type of connection.
*/
int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn,
SrsRtmpConnType type)
{
int ret = ERROR_SUCCESS;
SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req);
// create client if not exists
SrsStatisticClient* client = NULL;
if (clients.find(id) == clients.end()) {
client = new SrsStatisticClient();
client->id = id;
client->stream = stream;
clients[id] = client;
} else {
client = clients[id];
}
// got client
client->conn = conn;
client->req = req;
client->type = type;
stream->nb_clients++;
vhost->nb_clients++;
return ret;
}
3.4.1 SrsStatistic::create_vhost
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
{
SrsStatisticVhost* vhost = NULL;
/**
* rvhost:
* key: vhost url, value: vhost Object.
* @remark a fast index for vhost.
*/
// create vhost if not exists.
if (rvhosts.find(req->vhost) == rvhost.end()) {
vhost = new SrsStatisticVhost();
vhost->vhost = req->vhost;
rvhosts[req->vhost] = vhost;
/* vhosts - key: vhost id, value: vhost object. */
vhosts[vhost->id] = vhost;
return vhost;
}
vhost = rvhosts[req->vhost];
return vhost;
}
3.4.2 SrsStatistic::create_stream
SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsRequest* req)
{
std::string url = req->get_stream_url();
SrsStatisticStream* stream = NULL;
// create stream if not exists.
if (rstreams.find(url) == rstreams.end()) {
stream = new SrsStatisticStream();
stream->vhost = vhost;
stream->stream = req->stream;
stream->app = req->app;
stream->url = url;
rstreams[url] = stream;
streams[stream->id] = stream;
return stream;
}
stream = rstreams[url];
return stream;
}
4. SrsSource::set_cache
void SrsSource::set_cache(bool enabled)
{
/* SrsGopCache* gop_cache: gop cache for client fast startup. */
gop_cache->set(enabled);
}
4.1 SrsGopCache::set
/**
* to enabled or disable the gop cache.
*/
void SrsGopCache::set(bool enabled)
{
/* if disabled the gop cache, the client will wait for the next
* keyframe for h264, and will be black-screan. */
enabled_gop_cache = enabled;
if (!enabled) {
srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
clear();
return;
}
srs_info("enable gop cache");
}
5. SrsRtmpServer::start_fmle_publish
/**
* when client type is publish, response with packets:
* releaseStream response
* FCPublish
* FCPublish response
* createStream response
* onFCPublish(NetStream.Publish.Start)
* onStatus(NetStream.Publish.Start)
*/
int SrsRtmpServer::start_fmle_publish(int stream_id)
{
int ret = ERROR_SUCCESS;
// FCPublish
double fc_publish_tid = 0;
if (true) {
SrsCommonMessage* msg = NULL;
SrsFMLEStartPacket* pkt = NULL;
/* 指定接收這幾個中的一個消息:ReleaseStream/FCPublish/FCUnpublish,若不是其中之一,
* 則丟棄,直到接收到其中一個才返回
* 由開始的流程知,這里應該是接收 FCPublish */
if ((ret = expect_message<SrsFMLEStartPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv FCPublish message failed. ret=%d", ret);
return ret;
}
srs_info("recv FCPublish request message success.");
SrsAutoFree(SrsCommonMessage, msg);
SrsAutoFree(SrsFMLEStartPacket, pkt);
fc_publish_tid = pkt->transaction_id;
}
// FCPublish response
if (true) {
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send FCPublish response message failed. ret=%d", ret);
return ret;
}
srs_info("send FCPublish response message success.");
}
// createStream
double create_stream_tid = 0;
if (true) {
SrsCommonMessage* msg = NULL;
SrsCreateStreamPacket* pkt = NULL;
if ((ret = expect_message<SrsCreateStreamPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv createStream message failed. ret=%d", ret);
return ret;
}
srs_info("recv createStream request message success.");
SrsAutoFree(SrsCommonMessage, msg);
SrsAutoFree(SrsCreateStreamPacket, pkt);
create_stream_tid = pkt->transaction_id;
}
// createStream response
if (true) {
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid,
stream_id);
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send createStream response message success.");
}
// publish
if (true) {
SrsCommonMessage* msg = NULL;
SrsPublishPacket* pkt = NULL;
if ((ret = expect_message<SrsPublishPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv publish message failed. ret=%d", ret);
return ret;
}
srs_info("recv publish request message success.");
SrsAutoFree(SrsCommonMessage, msg);
SrsAutoFree(SrsPublishPacket, pkt);
}
// publish response onFCPublish(NetStream.Publish.Start)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d",
ret);
return ret;
}
srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
}
// publish response onStatus(NetStream.Publish.Start)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
pkt->data->set(StatusDescritption, SrsAmf0Any::str("Started publishing stream."));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d",
ret);
return ret;
}
srs_info("send onStatus(NetStream.Publish.Start) message success.");
}
srs_info("FMLE publish success.");
return ret;
}
5.1 FCPublish
5.1.1 FCPublish 接收
接收 FCPublish 后的解析如下代碼所示。
SrsProtocol::do_decode_message:
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
SrsPacket** ppacket)
{
...
/* FCPublish */
else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
*ppacket = packet = new SrsFMLEStartPacket();
return packet->decode(stream);
}
...
}
SrsFMLEStartPacket::decode
int SrsFMLEStartPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
/* 讀取消息的命令名,即 "FCPublish" */
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty()
|| (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
&& command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
&& command_name != RTMP_AMF0_COMMAND_UNPUBLISH)
) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode FMLE start packet success");
return ret;
}
5.1.2 FCPublish response
FCPublish 的響應用 SrsFMLEStartResPacket 類構造數據。該類的構造如下:
SrsFMLEStartResPacket 構造函數
/**
* response for SrsFMLEStartPacket.
*/
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
/* _result */
command_name = RTMP_AMF0_COMMAND_RESULT;
transaction_id = _transaction_id;
command_object = SrsAmf0Any::null();
args = SrsAmf0Any::undefined();
}
SrsFMLEStartResPacket::encode_packet
/**
* subpacket can override to encode the payload to stream.
* @remark never invoke the super.encode_packet, it always failed.
*/
int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");
srs_info("encode FMLE start response packet success.");
return ret;
}
send: FCPublish response

5.2 createStream
5.2.1 createStream 接收
createStream 消息的代表類為 SrsCreateStreamPacket,該類的構造如下。
SrsCreateStreamPacket 構造函數
/**
* createStream
* The client sends this command to the server to create a logical
* channel for message communication The publishing of audio, video, and
* metadata is carried out over stream channel created using the
* createStream command.
*/
SrsCreateStreamPacket::SrsCreateStreamPacket()
{
/* createStream */
command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
/**
* Transaction ID of the command.
*/
transaction_id = 2;
/**
* If there exists any command info this is set, else this is set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
command_object = SrsAmf0Any::null();
}
接收 createStream 后對該消息的解碼如下:
SrsCreateStreamPacket::decode
int SrsCreateStreamPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode createStream command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream command_object failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode createStream packet success");
return ret;
}
5.2.2 createStream response
createStream 的響應消息是通過 SrsCreateStreamResPacket 類構造的,該類的構造如下:
SrsCreateStreamResPacket 構造函數
SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id,
double _stream_id)
{
/* _result */
command_name = RTMP_AMF0_COMMAND_RESULT;
/**
* ID of the command that response belongs to.
*/
transaction_id = _transaction_id;
/**
* If there exists any command info this is set, else this is set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
command_object = SrsAmf0Any::null();
/* The return value is either a stream ID or an error information object. */
stream_id = _stream_id;
}
接着對該 createStream response 消息的負載數據進行編碼(即打包)。
SrsCreateStreamResPacket::encode_packet
int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) {
srs_error("encode stream_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode stream_id success.");
srs_info("encode createStream response packet success.");
return ret;
}
send: createStream response

5.3 publish
5.3.1 publish 接收
publish 消息用 SrsPublishPacket 類代表。該類的構造如下:
SrsPublishPacket 構造函數
/**
* FMLE/flash publish
* Publish
* The client sends the publish command to publish a named stream to the
* server. Using this name, any client can play this stream and receive
* the published audio, video, and data messages.
*/
SrsPublishPacket::SrsPublishPacket()
{
/* Name of the command, set to "publish". */
command_name = RTMP_AMF0_COMMAND_PUBLISH;
/* Transaction ID set to 0. */
transaction_id = 0;
/**
* Command information object does not exist. Set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
command_object = SrsAmf0Any::null();
/**
* Type of publishing. Set to "live", "record", or "append".
* record: The stream is published and the data is recorded to a new file. The file
* is stored on the server in a subdirectory within the directory that
* contains the server application. If the file already exists, it is
* overwritten.
* append: The stream is published and the data is appended to a file. If no file
* is found, it is created.
* live: Live data is published without recording it in a file.
* @remark, SRS only support live.
* @remark, optional, default to live.
*/
type = "live";
}
recv: publish

該 publish 消息的解析如下代碼。
SrsPublishPacket::decode
int SrsPublishPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode publish command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish command_object failed. ret=%d", ret);
return ret;
}
/* 讀取推流的流名稱 */
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish stream_name failed. ret=%d", ret);
return ret;
}
/* 讀取推流的類型,SRS 僅支持 live */
if (!stream->empty() && (ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish type failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode publish packet success");
return ret;
}
5.3.2 publish response onFCPublish(NetStream.Publish.Start)
該 publish 的響應 onFCPublish 使用 SrsOnStatusCallPacket 構造,該類的構造函數如下。
SrsOnStatusCallPacket 構造函數
SrsOnStatusCallPacket::SrsOnStatusCallPacket()
{
/* Name of command. Set to "onStatus" */
command_name = RTMP_AMF0_COMMAND_ON_STATUS;
/* Transaction ID set to 0. */
transaction_id = 0;
/**
* Command information does not exist. Set to null type.
* @remark, never be NULL, an AMF0 null instance.
*/
args = SrsAmf0Any::null();
/**
* Name-value pairs that describe the response from the server.
* 'code','level', 'description' are names of few among such information.
* @remark, never be NULL, an AMF0 object instance.
*/
data = SrsAmf0Any::object();
}
注:publish 的響應消息 onFCPublish 的消息名為 onFCPublish。該消息的抓包如下:
send: onFCPublish

該 onFCPublish 消息負載數據的編碼如下。
SrsOnStatusCallPacket::encode_packet
int SrsOnStatusCallPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");;
if ((ret = data->write(stream)) != ERROR_SUCCESS) {
srs_error("encode data failed. ret=%d", ret);
return ret;
}
srs_verbose("encode data success.");
srs_info("encode onStatus(Call) packet success.");
return ret;
}
5.3.3 publish response onStatus(NetStream.Publish.Start)
該響應消息同樣使用 SrsOnStatusCallPacket 類構造,該消息的名稱即為 onStatus。抓包如下圖
send: onStatus

6. SrsRtmpConn::publishing
當服務器成功響應 obs 發送的 publish 消息后,即進入 SrsRtmpConn::publishing 函數,開始處理 obs 推送的媒體數據。具體分析見 SRS之SrsRtmpConn::publishing詳解.
