SRS之SrsRtmpServer::connect_app詳解


1. connect('live')

2. SrsRtmpServer::connect_app

位於 srs_rtmp_stack.cpp。在 SRS 的 RTMP 連接處理線程 conn 中,當與客戶端 handshake 完成后,即調用該函數接收客戶端第一個 RTMP 消息:connect。如上圖.

int SrsRtmpServer::connect_app(SrsRequest* req)
{
    int ret = ERROR_SUCCESS;
    
    SrsCommonMessage* msg = NULL;
    SrsConnectAppPacket* pkt = NULL;
    /* 通過 SrsConnectAppPacket 指定接收並解析的 RTMP 消息,解析成功后,則通過
     * pkt 返回,這里成功的話,pkt 指向 SrsConnectAppPacket,里面包含解析后的數據 */
    if ((ret = expect_message<SrsConnectAppPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
        srs_error("expect connect app message failed. ret=%d", ret);
        return ret;
    }
    SrsAutoFree(SrsCommonMessage, msg);
    SrsAutoFree(SrsConnectAppPacket, pkt);
    srs_info("get connect app message");
    
    SrsAmf0Any* prop = NULL;
    
    if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {
        ret = ERROR_RTMP_REQ_CONNECT;
        srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);
        return ret;
    }
    req->tcUrl = prop->to_str();
    
    if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {
        req->pageUrl = prop->to_str();
    }
    
    if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {
        req->swfUrl = prop->to_str();
    }
    
    if ((prop = pkt->commnad_object->ensure_property_number("objectEncoding")) != NULL) {
        req->objectEncoding = prop->to_number();
    }
    
    if (pkt->args) {
        srs_freep(req->args);
        req->args = pkt->args->copy()->to_object();
        srs_info("copy edge traverse to origin auth args.");
    }
    
    srs_info("get connect app message params success.");

    /* 解析 tcUrl ,解析的值保存在 schema,host,vhost 等等中 */    
    srs_discovery_tc_url(req->tcUrl, 
        req->schema, req->host, req->vhost, req->app, req->port, 
        req->param);
    /* 去掉不相關的字符,如空格等 */
    req->strip();
    
    return ret;
}

2.1 SrsRtmpServer: expect_message

位於srs_rtmp_stack.hpp。
這是一個定義在 SrsRtmpServer 類中的模板函數。該函數指定了一個期待接收的消息,若接收到的是其他消息則丟棄,直到獲取到指定的消息為止。

/**
 * expect a specified message, drop others until got specified one.
 * @pmsg, user must free it. NULL if not success.
 * @ppacket, user must free it, which decode from payload of message. NULL if not success.
 * @remark, only when success, user can use and must free the pmsg and ppacket.
 * for example:
 *      SrsCommonMessage* msg = NULL;
 *      SrsConnectAppResPacket* pkt = NULL;
 *      if ((ret = server->expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) 
 *          != ERROR_SUCCESS) {
 *          return ret;
 *      }
 *      // use then free msg and pkt
 *      srs_freep(msg);
 *      srs_freep(pkt);
 * user should never recv message and covert it, use this method instead.
 * if need to set timeout, use set timeout of SrsProtocol.
 */
template<class T>
int expect_message(SrsCommmonMessage** pmsg, T** ppacket)
{
    return protocol->expect_message<T>(pmsg, ppacket);
}

接着調用 SrsProtocol 類定義的同名模板函數 expect_message.

在分析 SrsProtocol: expect_message 之前,先看 SrsRtmpServer 類中類型為 SrsProtocol 類的成員 protocol 是如何構造的。

2.1.1 SrsProtocol 的構造函數:

位於 srs_rtmp_stack.hpp。

SrsProtocol 類提供了 RTMP 消息協議服務,從 RTMP chunk stream 中接收 RTMP 消息,或者通過 RTMP chunk stream 發送 RTMP 消息。

/**
 * @io: 構造時傳入的是 SrsStSocket 類的指針,ISrsProtocolReaderWriter 類
 * 是 SrsStSocket 類的父類.
 */
SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
{
    /* 緩存從 io 中接收到的rtmp消息數據,並為 stream 提供服務,
     * 默認分配大小為 128K */
    in_buffer = new SrsFastBuffer();
    skt = io;
    
    /* 初始化塊的大小,這里為 128 */
    in_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
    out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
    
    /* out_iovs: cache for multiple messages send. */
    nb_out_iovs = SRS_CONSTS_IOVS_MAX;
    out_iovs = (iovec*)malloc(sizeof(iovec) * nb_out_iovs);
    // each chunk consumers at least 2 iovs
    srs_assert(nb_out_iovs >= 2);
    
    /* 標志位,是否警示用戶增長 c0c3 頭緩存 */
    warned_c0c3_cache_dry = false;
    /* 標志位,當接收到消息的時候是否自動響應 */
    auto_response_when_recv = true;
    /* 是否打印調試信息 */
    shuw_debug_info = true;
    /* 緩存的大小是由對方設置的 */
    in_buffer_length = 0;
    
    cs_cache = NULL;
    if (SRS_PERF_CHUNK_STREAM_CACHE > 0) {
        /* 構造 SRS_PERF_CHUNK_STREAM_CACHE 個 塊流緩存SrsChunkStream  */
        cs_cache = new SrsChunkStream*[SRS_PERF_CHUNK_STREAM_CACHE];
    }
    /* 接收到的 RTMP 塊流也許是交錯的,因此使用 SrsChunkStream 來緩存輸入的 RTMP 塊流 */
    for (int cid = 0; cid < SRS_PERF_CHUNK_STREAM_CACHE; cid++) {
        SrsChunkStream* cs = new SrsChunkStream(cid);
        // set the perfer cid of chunk,
        // which will copy to the message received.
        cs->header.perfer_cid = cid;
        
        cs_cache[cid] = cs;
    }
}

2.1.2 SrsFastBuffer 構造函數

SrsFastBuffer 類為協議提供字節緩存,將從 socket 接收到的數據存放到該類中,然后解碼為 RTMP 消息。

用法:

ISrsBufferReader* r = ......;
SrsFastBuffer* fb = ......;
fb->grow(r, 1024);
char* header = fb->read_slice(100);
char* payload = fb->read_payload(924);

構造函數具體代碼如下:

// the default recv buffer size, 128KB.
#define SRS_DEFAULT_RECV_BUFFER_SIZE 131072

/**
 * SrsFastBuffer:
 *
 * the buffer provices bytes cache for protocol. generally,
 * protocol recv data from socket, put into buffer, decode to RTMP message.
 * Usage: 
 *     ISrsBufferReader* r = ...;
 *     SrsFastBuffer* fb = ......;
 *     fb->grow(r, 1024);
 *     char* header = fb->read_slice(100);
 *     char* payload = fb->read_payload(924);
 */
SrsFastBuffer::SrsFastBuffer()
{
#ifdef SRS_PERF_MERGED_READ
    merged_read = false;
    _handler = NULL;
#endif
    
    /* 設置默認接收緩存的大小,128K */
    nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE;
    buffer = (char*)malloc(nb_buffer);
    p = end = buffer;
}

2.1.3 SrsChunkStream 構造函數

接收到的 RTMP 塊流也許不是順序的,是交錯的,因此,使用 SrsChunkStream 類去緩存接收到的 RTMP chunk streams。

/**
 * incoming chunk stream maybe interlaced,
 * use the chunk stream to cache the input RTMP chunk streams.
 */
SrsChunkStream::SrsChunkStream(int _cid)
{
    /* 代表基本頭中的 chunk type,即fmt */
    fmt = 0;
    /* 初始化當前 SrsChunkStream 所屬的塊流 */
    cid = _cid;
    /* 標志位,表示 chunk 的消息頭中是否有擴展時間戳 */
    extended_timestamp = false;
    /* 讀取到的部分/完整的消息 */
    msg = NULL;
    /* 解碼的消息計數值,用來鑒別該 chunk stream 是否是新的 */
    msg_count = 0;
}

2.2 SrsProtocol: expect_message

該函數是一個模板函數,通過指定接收的期待接收的類來接收相應 RTMP 消息,如調用時指定 T 為 SrsConnectAppPacket,第二個參數 ppacket 為 SrsConnectAppPacket,則表示本次期望接收到的消息為客戶端發送的 connect 命令消息。

/**
 * expect a specified message, drop others until got specified one.
 * @pmsg, user must free it. NULL if not success.
 * @ppacket, user must free it, which decode from payload of message. NULL if not success.
 * @remark, only when success, user can use and must free the pmsg and ppacket.
 * for exampel:
 *     SrsCommomMessage* msg = NULL;
 *     SrsConnectAppResPacket* pkt = NULL;
 *     if ((ret = protocol->expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) 
 *         != ERROR_SUCCESS) {
 *         return ret;
 *     }
 *     // use then free msg and pkt
 *     srs_freep(msg);
 *     srs_freep(pkt);
 * user should never recv message and convert it, use this method instead.
 * if need to set timeout, use set timeout of SrsProtocol.
 */
template<class T>
int expect_message(SrsCommonMessage** pmsg, T** ppacket)
{
    *pmsg = NULL;
    *ppacket = NULL;
    
    int ret = ERROR_SUCCESS;
    
    while (true) {
        SrsCommonMessage* msg = NULL;
        /* 開始接收客戶端發送的 RTMP 消息 */
        if ((ret = recv_message(&msg)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("recv message failed. ret=%d", ret);
            }
            return ret;
        }
        srs_verbose("recv message success.");
        
        SrsPacket* packet = NULL;
        /* 該函數會先讀取消息的名稱,然后根據該名稱構建對應的類,如 SrsConnectAppPacket,
         * 該類通過 packet 返回,接着調用該類實現的 decode 函數來解析數據.
         * 因此,返回的 packet 其實是指向它的子類,如 SrsConnectAppPacket */
        if ((ret = decode_message(msg, &packet)) != ERROR_SUCCESS) {
            srs_error("decode message failed. ret=%d", ret);
            srs_freep(msg);
            srs_freep(packet);
            return ret;
        }
        
        /**
         * dynamic_cast:
         *     將基類的指針或引用安全地轉換成派生類的指針或引用,並用派生類的指針或引用
         * 調用非虛函數。如果是基類指針或引用調用的是虛函數無需轉換就能在運行時調用派生類
         * 的虛函數。
         * @remark,當將 dynamic_cast 用於某種類型的指針或引用時,只有該類含有虛函數時,才能
         * 進行這種轉換。否則,編譯器會報錯。
         */
        T* pkt = dynamic_cast<T*>(packet);
        if (!pkt) {
            srs_info("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).", 
                msg->header.message_type, msg->header.payload_length,
                msg->header.timestamp, msg->header.stream_id);
            srs_freep(msg);
            srs_freep(packet);
            continue;
        }
        
        *pmsg = msg;
        *ppacket = pkt;
        break;
    }
    
    return ret;
}

2.2.1 SrsProtocol::recv_message

位於 srs_rtmp_stack.cpp。

/**
 * recv a RTMP message, which is bytes oriented.
 * user can use decode_message to get the decoded RTMP packet.
 * @param pmsg, set the received message,
 *      always NULL if errno,
 *      NULL for unknown packet but return success.
 *      never NULL if decode success.
 * @remark, drop message when msg is empty or payload length is empty.
 */
int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
    *pmsg = NULL;
    
    int ret = ERROR_SUCCESS;
    
    while (true) {
        SrsCommonMessage* msg = NULL;
        
        /* 每調用該函數就讀取一個 chunk,直到獲取到完整的 message 為止,此時
         * 該完整的 message 通過 msg 返回 */
        if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("recv interlaced message failed. ret=%d", ret);
            }
            srs_freep(msg);
            return ret;
        }
        srs_verbose("entire msg received");
        
        /* 若沒有獲取到完整的 message,則繼續讀 */
        if (!msg) {
            srs_info("got empty message without error.");
            continue;
        }
        
        if (msg->size <= 0 || msg->header.payload_length <= 0) {
            srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
                msg->header.message_type, msg->header.payload_length,
                msg->header.timestamp, msg->header.stream_id);
            srs_freep(msg);
            continue;
        }
        
        /* 執行到這里表示已經獲取到一個完整的 message */
        if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
            srs_error("hook the received msg failed. ret=%d", ret);
            srs_freep(msg);
            return ret;
        }
        
        srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64, 
            msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length, 
            msg->header.timestamp);
        *pmsg = msg;
        break;
    }
    
    return ret;
}

2.2.2 SrsProtocol::recv_interlaced_message

位於 srs_rtmp_stack.cpp。

/**
 * how many chunk stream to cache, [0, N].
 * to improve about 10% performance when chunk size small, and 5% for large chunk.
 * @see https://github.com/ossrs/srs/issues/249: cache the chunk headers info to 
 *      improve performance
 * @remark 0 to disable the chunk stream cache.
 */
#define SRS_PERF_CHUNK_STREAM_CACHE 16


/**
 * recv bytes oriented RTMP message from protocol stack.
 * return error if error occur and never set the pmsg,
 * return success amd pmsg set to NULL if no entire message got,
 * return success and pmsg set to entire message if got one.
 */
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
    int ret = ERROR_SUCCESS;
    
    // chunk stream basic header.
    char fmt = 0;
    int cid = 0;
    /* 讀取 RTMP chunk stream 的基本頭 */
    if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read basic header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
    
    // the cid must not negative.
    srs_assert(cid >= 0);
    
    // get the cached chunk stream.
    SrsChunkStream* chunk = NULL;
    
    // use chunk stream cache to get the chunk info.
    // @see https://github.com/ossrs/srs/issues/249: cache the chunk headers info 
    //      to improve performance 
    if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
        // chunk stream cache hit.
        srs_verbose("cs-cache hit, cid=%d", cid);
        // already init, use it directly
        /* 從已經分配好 SRS_PERF_CHUNK_STREAM_CACHE 個的 cs_cache 數組中
         * 取出一個 cid 對應位置的 SrsChunkStream */
        chunk = cs_cache[cid];
        srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
            chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
    } else {
        // chunk stream cache miss, use map.
        /* 先從 map 類型的容器 chunk_streams 中查找該 cid 對應的 SrsChunkStream 是否存在 */
        if (chunk_streams.find(cid) == chunk_streams.end()) {
            /* 根據 cid 重新構建一個 SrsChunkStream,並將其放入到 map 容器 chunk_streams 中 */
            chunk = chunk_streams[cid] = new SrsChunkStream(cid);
            // set the perfer cid of chunk,
            // which will copy to the message received.
            chunk->header.perfer_cid = cid;
            srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
        } else {
            chunk = chunk_streams[cid];
            srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                        "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        }
    }
    
    // chunk stream message header
    if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read message header success. "
            "fmt=%d, ext_time=%d, size=%d, message(type=%d, size=%d, "
            "time=%"PRId64", sid=%d)", 
            fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length, 
            chunk->header.timestamp, chunk->header.stream_id);
    
    // read msg payload from chunk stream.
    SrsCommonMessage* msg = NULL;
    if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message payload failed. ret=%d", ret);
        }
        return ret;
    }
    
    // not got an entire RTMP message, try next chunk.
    if (!msg) {
        srs_verbose("get partial message success. size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        return ret;
    }
    
    /* 得到完整的 message */
    *pmsg = msg;
    srs_info("get entire message success. size=%d, message(type=%d, "
             "size=%d, time=%"PRId64", sid=%d)",
            (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
    
    return ret;
}

2.2.3 SrsProtocol::read_basic_header

讀取 RTMP chunk stream 的基本頭。

/**
 * Chunk Basic Header
 * The Chunk Basic Header encodes the chunk stream ID and the chunk
 * type(represented by fmt field in the figure below). Chunk type
 * determines the format of the encoded message header. Chunk Basic
 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
 * ID.
 *
 * The bits 0-5 (least significant) in the chunk basic header represent
 * the chunk stream ID.
 *
 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this 
 * field.
 *    0 1 2 3 4 5 6 7
 *   +-+-+-+-+-+-+-+-+
 *   |fmt|   cs id   |
 *   +-+-+-+-+-+-+-+-+
 *   Figure 6 Chunk basic header 1
 * 
 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
 * field. ID is computed as (the second byte + 64).
 *   0                   1
 *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|    0      | cs id - 64    |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 7 Chunk basic header 2
 * 
 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
 * this field. ID is computed as ((the third byte)*256 + the second byte
 * + 64).
 *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|     1     |         cs id - 64            |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 8 Chunk basic header 3
 *
 * cs id: 6 bits
 * fmt: 2 bits
 * cs id - 64: 8 or 16 bits
 *
 * Chunk stream IDs with values 64-319 could be represented by both 2-byte
 * version and 3-byte version of this field.
 */
int SrsProtocol::read_basic_header(char& fmt, int& cid)
{
    int ret = ERROR_SUCCESS;
    
    /* 當前用戶需要從SrsFastBuffer中讀取 1 字節的數據,但SrsFastBuffer中已有大於或等於
     * 1 字節的數據時,直接返回,否則需要從 socket 中讀取數據,將讀取到的數據存放到
     * SrsFastBuffer中(實際上該SrsFastBuffer從 socket 中的讀取數據大小為當前緩存的
     * 空閑空間大小) */
    if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", 1, ret);
        }
        return ret;
    }
    
    /* 從SrsFastBuffer中讀取 1 字節的數據 */
    fmt = in_buffer->read_1byte();
    cid = fmt & 0x3f;
    fmt = (fmt >> 6) & 0x03;
    
    // 2-63, 1B chunk header
    if (cid > 1) {
        srs_verbose("basic header parsed. fmt=%d, cid=%d", fmt, cid);
        return ret;
    }
    
    // 64-319, 2B chunk header
    if (cid == 0) {
        /* 若 cid 為 0,則表明basic header為 2 字節 */
        if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", 
                          1, ret);
            }
            return ret;
        }
        
        cid = 64;
        cid += (u_int8_t)in_buffer->read_1byte();
        srs_verbose("2bytes basic header parsed. fmt=%d, cid=%d", fmt, cid);
    // 64-65599, 3B chunk header
    } else if (cid == 1) {
        if ((ret = in_buffer->grow(skt, 2)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", 
                          2, ret);
            }
            return ret;
        }
        
        cid = 64;
        cid += (u_int8_t)in_buffer->read_1byte();
        cid += ((u_int8_t)in_buffer->read_1byte()) * 256;
        srs_verbose("3bytes basic header parsed. fmt=%d, cid=%d", fmt, cid);
    } else {
        srs_error("invalid path, impossible basic header.");
        srs_assert(false);
    }
    
    return ret;
}

2.2.4 SrsFastBuffer::grow

由前面調用可知,傳入的 reader 參數(派生類 -> 父類):SrsStSocket -> ISrsProtocolReaderWriter -> ISrsProtocolReader
and ISrsProtocolWriter.

/**
 * grow buffer to the required size, loop to read from skt to fill.
 * @param reader, read more bytes from reader to fill the buffer to required size.
 * @param required_size, loop to fill to ensure buffer size to required.
 * @return an int error code, error if required_size negative.
 * @remark, we actually maybe read more than required_size, maybe 4K for example.
 */
int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
{
    int ret = ERROR_SUCCESS;
    
    // already got required size of bytes.
    if (end - p >= required_size) {
        return ret;
    }
    
    // must be positive.
    srs_assert(required_size > 0);
    
    /**
     * p: ptr to the current read position.
     * end: ptr to the content end.
     * buffer: ptr to the buffer,buffer <= p <= end <= buffer + nb_buffer
     * nb_buffer: the size of buffer
     */
    
    /* 計算當前緩存中空閑空間的大小 */
    // the free space of buffer,
    //     buffer = consumed_bytes + exists_bytes + free_space.
    int nb_free_space = (int)(buffer + nb_buffer - end);
    
    // the bytes already in buffer
    int nb_exists_bytes = (int)(end - p);
    srs_assert(nb_exists_bytes >= 0);
    
    /* 當沒有空閑緩存時調整緩存的大小 */
    // resize the space when no left space.
    if (nb_free_space < required_size - nb_exists_bytes) {
        srs_verbose("move fast buffer %d bytes", nb_exists_bytes);
        
        /* 當緩存中沒有需要讀取的數據時,復位該緩存 */
        // reset or move to get more space.
        if (!nb_exists_bytes) {
            // reset when buffer is empty.
            p = end = buffer;
            srs_verbose("all consumed, reset fast buffer");
            
        /* 否則,若緩存中有未讀取的數據,則將這些數據移到緩存的前面 */
        } else if (nb_exists_bytes < nb_buffer && p > buffer) {
            // move the left bytes to start of buffer.
            // @remark Only move memory when space is enough, or failed at next check.
            // @see https://github.com/ossrs/srs/issues/848
            buffer = (char*)memmove(buffer, p, nb_exists_bytes);
            p = buffer;
            end = p + nb_exists_bytes;
        }
        
        /* 檢查當調整完緩存的大小后,空閑空間的大小是否足夠存放請求的數據大小,若不足,則返回錯誤 */
        // check whether enough free space in buffer.
        nb_free_space = (int) (buffer + nb_buffer - end);
        if (nb_free_space < required_size - nb_exists_bytes) {
            ret = ERROR_READER_BUFFER_OVERFLOW;
            srs_error("buffer overflow, required=%d, max=%d, left=%d, ret=%d", 
                required_size, nb_buffer, nb_free_space, ret);
            return ret;
        }
    }
    
    /* 檢查空閑緩存足夠時,則開始讀取數據 */
    // buffer is ok, read required size of bytes.
    while (end - p < required_size) {
        ssize_t nread;
        /* 調用子類 SrsStSocket 的 read 函數讀取數據 */
        if ((ret = reader->read(end, nb_free_space, &nread)) != ERROR_SUCCESS) {
            return ret;
        }
    
        /* 暫不分析 */
#ifdef SRS_PERF_MERGED_READ
        /**
         * to improve read performance, merge some packets then read,
         * when it on and read small bytes, we sleep to wait more data.
         * that is, we merge some data to read together.
         * @see https://github.com/ossrs/srs/issues/241
         */
        if (merged_read && _handler) {
            _handler->on_read(nread);
        }
#endif
        
        // we just move the ptr to next.
        srs_assert((int)nread > 0);
        end += nread;
        nb_free_space -= nread;
    }
    
    return ret;
}

2.2.5 SrsStSocket::read

int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
    int ret = ERROR_SUCCESS;
    
    /* 該函數在一個 while 循環中首先嘗試讀取 size 字節的數據,若能讀取到,則直接返回。
     * 否則,調用 st_netfd_poll 將當前線程添加到 IO 隊列中,並設置監聽條件為 POLLIN,
     * 直到讀取到指定的數據為止 */
    ssize_t nb_read = st_read(stfd, buf, size, recv_timeout);
    if (nread) {
        *nread = nb_read;
    }
    
    // On success a non-negative integer indicating the number of bytes actually 
    // read is returned (a value of 0 means the network connection is closed or 
    // end of file is reached). Otherwise, a value of -1 is returned and errno 
    // is set to indicate the error.
    if (nb_read <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_read < 0 && errno == ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }
        
        if (nb_read == 0) {
            errno = ECONNRESET;
        }
        
        return ERROR_SOCKET_READ;
    }
    
    recv_bytes += nb_read;
    
    return ret;
}

2.2.6 SrsProtocol::read_message_header

/**
 * Chunks of Type 0 are 11 bytes long. This type MUST be used at the
 * start of a chunk stream, and whenever the stream timestamp goes
 * backward (e.g., because of a backward seek).
 */
#define RTMP_FMT_TYPE0              0
/**
 * Chunks of Type 1 are 7 bytes long. The message stream ID is not 
 * included; this chunk takes the same stream ID as the preceding chunk.
 * Streams with variable-sized messages (for example, many video
 * formats) SHOULD use this format for the first chunk of each new 
 * message after the first.
 */
#define RTMP_FMT_TYPE1              1

/**
 * parse the message header.
 *    3bytes: timestamp delta, fmt=0,1,2
 *    3bytes: payload length,  fmt=0,1
 *    1byte:  message type,    fmt=0,1
 *    4bytes: stream id,       fmt=0
 * where:
 *    fmt=0, 0x0X
 *    fmt=1, 0x4X
 *    fmt=2, 0x8X
 *    fmt=3, 0xCX
 * 
 * read the chunk message header(timestamp, payload_length, message_type, stream_id) 
 * from chunk stream and save to SrsChunkStream.
 */
int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt)
{
    int ret = ERROR_SUCCESS;
    
    /**
     * we should not assert anything about fmt, for the first packet.
     * (when first packet, the chunk->msg is NULL).
     * the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
     * the previous packet is:
     *     04                // fmt=0, cid=4
     *     00 00 1a          // timestamp=26
     *     00 00 9d          // payload_length=157
     *     08                // message_type=8(audio)
     *     01 00 00 00       // stream_id=1
     * the current packet maybe:
     *     c4                // fmt=3, cid=4
     * it's ok, for the packet is audio, and timestamp delta is 26.
     * the current packet must be parsed as:
     *     fmt=0, cid=4
     *     timestamp=26+26=52
     *     payload_length=157
     *     message_type=8(audio)
     *     stream_id=1
     * so we must update the timestamp even fmt=3 for first packet.
     */
     // fresh packet used to update the timestamp even fmt=3 for first packet.
     // fresh packet always means the chunk is the first one of message.
    bool is_first_chunk_of_msg = !chunk->msg;
    
    /* 若當前chunk是消息中的第一個chunk時,當前的chunk的fmt必須是0 */
    // but, we can ensure that when a chunk stream is fresh,
    // the fmt must be 0, a new stream.
    if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) {
        // for librtmp, if ping, it will send a fresh stream with fmt=1,
        // 0x42             where: fmt=1, cid=2, protocol control user-control message
        // 0x00 0x00 0x00   where: timestamp=0
        // 0x00 0x00 0x06   where: payload_length=6
        // 0x04             where: message_type=4(protocol control user-control message)
        // 0x00 0x06            where: event Ping(0x06)
        // 0x00 0x00 0x0d 0x0f  where: event data 4bytes ping timestamp.
        // @see: https://github.com/ossrs/srs/issues/98
        if (chunk->cid == RTMP_CID_ProtocolControl && fmt == RTMP_FMT_TYPE1) {
            srs_warn("accept cid=2, fmt=1 to make librtmp happy.");
        } else {
            // must be a RTMP protocol level error.
            ret = ERROR_RTMP_CHUNK_START;
            srs_error("chunk stream is fresh, fmt must be %d, "
                      "actual is %d. cid=%d, ret=%d", 
                      RTMP_FMT_TYPE0, fmt, chunk->cid, ret);
            return ret;
        }
    }
    
    // when exists cache msg, means got an partial message,
    // the fmt must not be type0 which means new message.
    if (chunk->msg && fmt == RTMP_FMT_TYPE0) {
        ret = ERROR_RTMP_CHUNK_START;
        srs_error("chunk stream exists, "
            "fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
        return ret;
    }
    
    // create msg when new chunk stream start
    if (!chunk->msg) {
        /* 若當前的chunk是塊流的第一個塊時,構造一個新的 SrsCommonMessage  */
        chunk->msg = new SrsCommonMessage();
        srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
    }
    
    // read message header from socket to buffer.
    static char mh_sizes[] = {11, 7, 3, 0};
    int mh_size = mg_sizes[(int)fmt];
    srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
    
    if (mh_size > 0 && (ret = in_buffer->grow(skt, mh_size)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read %dbytes message header failed. ret=%d", mh_size, ret);
        }
        
        return ret;
    }
    
    /**
     * parse the message header.
     *   3bytes: timestamp delta,    fmt=0,1,2
     *   3bytes: payload length,     fmt=0,1
     *   1bytes: message type,       fmt=0,1
     *   4bytes: stream id,          fmt=0
     * where:
     *   fmt=0, 0x0X
     *   fmt=1, 0x4X
     *   fmt=2, 0x8X
     *   fmt=3, 0xCX
     */
    // see also: ngx_rtmp_recv
    if (fmt <= RTMP_FMT_TYPE2) {
        char* p = in_buffer->read_slice(mh_size);
        
        char* pp = (char*)&chunk->header.timestamp_delta;
        pp[2] = *p++;
        pp[1] = *p++;
        pp[0] = *p++;
        pp[3] = 0;
        
        // fmt: 0
        // timestamp: 3 bytes
        // IF the timestamp is greater than or equal to 16777215
        // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
        // 'extended timestamp header' MUST be present. Otherwise, this
        // value SHULD be the entire timestamp.
        // 
        // fmt: 1 or 2
        // timestamp delta: 3 bytes
        // If the delta is greater than or equal to 16777215 (hexadecimal
        // 0x00ffffff), this value MUST be 16777215, and the 'extended
        // timestamp header' MUST be present. Otherwise, this value SHOULD be
        // the entire delta.
        chunk->extended_timestamp = (chunk->header.timestamp_delta >= 
                                     RTMP_EXTENDED_TIMESTAMP);
        if (!chunk->extended_timestamp) {
            /*
             * Extended timestamp: 0 or 4 bytes
             * This field MUST be sent when the normal timestamp is set to
             * 0xffffff, it MUST NOT be sent if the normal timestamp is set to
             * anything else. So for values less than 0xffffff the normal 
             * timestamp field SHOULD be used in which case the extended timestamp
             * MUST NOT be present. For values greater than or equal to 0xffffff
             * the normal timestamp field MUST NOT be used and MUST be set to
             * 0xffffff and the extended timestamp MUST be sent.
             */
            if (fmt == RTMP_FMT_TYPE0) {
                /*
                 * Type 0
                 * For a type-0 chunk, the absolute timestamp of the message is sent
                 * here.
                 */
                chunk->header.timestamp = chunk->header.timestamp_delta;
            } else {
                /*
                 * Type 1
                 * Type 2
                 * For a type-1 or type-2 chunk, the difference between the previous
                 * chunk's timestamp and the current chunk's timestamp is sent here.
                 */
                chunk->header.timestamp += chunk->header.timestamp_delta;
            }
        }
        
        if (fmt <= RTMP_FMT_TYPE1) {
            int32_t payload_length = 0;
            pp = (char*)&payload_length;
            pp[2] = *p++;
            pp[1] = *p++;
            pp[0] = *p++;
            pp[3] = 0;
            
            /* for a message, if msg exists in cache, the size must not changed.
             * always use the actual msg size to compare, for the cache payload 
             * length can changed, for the fmt type1(stream_id not changed), 
             * user can change the payload length(it's not allowed in the continue chunks).
             */
            if (!is_first_chunk_of_msg && chunk->header.payload_length != payload_length) {
                ret = ERROR_RTMP_PACKET_SIZE;
                srs_error("msg exists in chunk cache, "
                    "size=%d cannot change to %d, ret=%d", 
                    chunk->header.payload_length, payload_length, ret);
                return ret;
            }
            
            chunk->header.payload_length = payload_length;
            chunk->header.message_type = *p++;
            
            if (fmt == RTMP_FMT_TYPE0) {
                /* 消息流 id 是小端存儲的 */
                pp = (char*)&chunk->header.stream_id;
                pp[0] = *p++;
                pp[1] = *p++;
                pp[2] = *p++;
                pp[3] = *p++;
                srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, "
                            "time=%"PRId64", payload=%d, type=%d, sid=%d", 
                    fmt, mh_size, chunk->extended_timestamp, chunk->header.
                    timestamp, chunk->header.payload_length, 
                    chunk->header.message_type, chunk->header.stream_id);
                    
            /* fmt = 1 */
            } else {
                srs_verbose("header read completed. fmt=%d, mh_size=%d, "
                            "ext_time=%d, time=%"PRId64", payload=%d, type=%d", 
                    fmt, mh_size, chunk->extended_timestamp, 
                    chunk->header.timestamp, chunk->header.payload_length, 
                    chunk->header.message_type);
            }
            
        /* fmt = 2 */
        } else {
            srs_verbose("header read completed. fmt=%d, "
                        "mh_size=%d, ext_time=%d, time=%"PRId64"", 
                fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
        }
        
    /* 否則為 fmt=3 */
    } else {
        // update the timestamp event fmt=3 for first chunk packet
        if (is_first_chunk_id_msg && !chunk->extended_timestamp) {
            chunk->header.timestamp += chunk->header.timestamp_delta;
        }
        srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d", 
            fmt, mh_size, chunk->extended_timestamp);
    }
    
    // read extended-timestamp
    if (chunk->extended_timestamp) {
        mh_size += 4;
        srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", 
                    fmt, chunk->extended_timestamp, mh_size);
        if ((ret = in_buffer->grow(skt, 4)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read %dbytes message header failed. required_size=%d, ret=%d", 
                          mh_size, 4, ret);
            }
            return ret;
        }
        // the ptr to the slice maybe invalid when grow()
        // reset the p to get 4bytes slice.
        char* p = in_buffer->read_slice(4);
        
        u_int32_t timestamp = 0x00;
        char* pp = (char*)&timestamp;
        pp[3] = *p++;
        pp[2] = *p++;
        pp[1] = *p++;
        pp[0] = *p++;
        
        // always use 31bits timestamp, for some server may use 32bits extended timestamp.
        // @see https://github.com/ossrs/srs/issues/111
        timestamp &= 0x7fffffff;
        
        /**
         * RTMP specification and ffmpeg/librtmp is false,
         * but, adobe changed the specification, so flash/FMLE/FMS always true.
         * default to true to support flash/FMLE/FMS.
         *
         * ffmpeg/librtmp may donot send this field, need to detect the value.
         * @see also: http://blog.csdn.net/win_lin/article/details/13363699
         * compare to the chunk timestamp, which is set by chunk message header
         * type 0, 1 or 2.
         *
         * @remark, nginx send the extended-timestamp in sequence-header,
         * and timestamp delta in continue C1 chunks, and so compatible with ffmpeg,
         * that is, there is no continue chunks and extended-timestamp in nginx-rtmp.
         *
         * @remark, srs always send the extenede-timestamp, to keep simple,
         * and compatible with adobe products.
         */
        u_int32_t chunk_timestamp = (u_int32_t) chunk->header.timestamp;
        
        /**
         * if chunk_timestamp<=0, the chunk previous packet has no extended-timestamp,
         * always use the extended timestamp.
         */
        /**
         * about the is_first_chunk_of_msg.
         * @remark, for the first chunk of message, always use the extended timestamp.
         */
        if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) 
        {
            mg_size -= 4;
            in_buffer->skip(-4);
            srs_info("no 4bytes extended timestamp in the continued chunk");
        } else {
            chunk->header.timestamp = timestamp;
        }
        srs_verbose("header read ext_time completed. time=%"PRId64"", 
                    chunk->header.timestamp);
    }
    
    /**
     * the extended-timestamp must be unsigned-int,
     *     24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
     *     32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
     * because the rtmp protocol says the 32bits timestamp is about "50 days":
     *     3. Byte Order, Alignment, and Time Format
     *         Because timestamp are generally only 32 bits long, they will roll
     *         over after fewer than 50 days.
     *
     * but, its sample says the timestamp is 31bits:
     *     An application could assume, for example, that all
     *     adjacent timestamps are within 2^31 milliseconds of each other, so
     *     10000 comes after 4000000000, while 3000000000 comes before
     *     4000000000.
     * and flv specification says timestamp is 31bits:
     *     Extension of the Timestamp field to form a SI32 value. This
     *     field represents the upper 8 bits, while the previous
     *     Timestamp field represent the lower 24 bits of the time in
     *     milliseconds.
     * in a word, 31bits timestamp is ok.
     * convert extended timestamp to 31bits.
     */
    chunk->header.timestamp &= 0x7fffffff;
    
    /* valid message, the payload_length is 24bits,
     * so it shuld never be negative. */
    srs_assert(chunk->header.payload_length >= 0);
    
    /* copy header to msg */
    chunk->msg->header = chunk->header;
    
    /* increase the msg count, the chunk stream can accept fmt=1/2/3 message now. */
    chunk->msg_count++;
    
    return ret;
}

2.2.7 SrsProtocol::read_message_payload

讀取 chunk 的負載數據。

int SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg)
{
    int ret = ERROR_SUCCESS;
    
    // empty message
    if (chunk->header.payload_length <= 0) {
        srs_trace("get an empty RTMP "
                "message(type=%d, size=%d, time=%"PRId64", sid=%d)", 
                chunk->header.message_type, chunk->header.payload_length, 
                chunk->header.timestamp, chunk->header.stream_id);
        
        *pmsg = chunk->msg;
        chunk->msg = NULL;
                
        return ret;
    }
    srs_assert(chunk->header.payload_length > 0);
    
    // the chunk payload size.
    int payload_size = chunk->header.payload_length - chunk->msg->size;
    payload_size = srs_min(payload_size, in_chunk_size);
    srs_verbose("chunk payload size is %d, message_size=%d, "
                "received_size=%d, in_chunk_size=%d", 
                payload_size, chunk->header.payload_length, 
                chunk->msg->size, in_chunk_size);
    
    // create msg payload if not initialized
    if (!chunk->msg->payload) {
        /* 該函數是為 payload 分配 chunk->header.payload_length 大的內存 */
        chunk->msg->create_payload(chunk->header.payload_length);
    }
    
    // read payload to buffer
    if ((ret = in_buffer->grow(skt, payload_size)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read payload failed. required_size=%d, ret=%d", 
                       payload_size, ret);
        }
        return ret;
    }
    memcpy(chunk->msg->payload + chunk->msg->size, 
           in_buffer->read_slice(payload_size), payload_size);
    chunk->msg->size += payload_size;
    
    srs_verbose("chunk payload read completed. payload_size=%d", payload_size);
    
    // got entire RTMP message?
    if (chunk->header.payload_length == chunk->msg->size) {
        *pmsg = chunk->msg;
        chunk->msg = NULL;
        srs_verbose("get entire RTMP message(type=%d, size=%d, "
                    "time=%"PRId64", sid=%d)", 
                    chunk->header.message_type, chunk->header.payload_length, 
                    chunk->header.timestamp, chunk->header.stream_id);
    
        return ret;
    }
    
    srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", "
                "sid=%d), partial size=%d", 
            chunk->header.message_type, chunk->header.payload_length, 
            chunk->header.timestamp, chunk->header.stream_id,
            chunk->msg->size);
            
    return ret;
}

2.2.8 SrsProtocol::on_recv_message

當接收到一個完整的消息后,首先會檢測是否需要發送應答消息給對方,若當前已接收的字節數等於窗口大小,則必須發送,否則不做處理,並且若消息類型為 SetChunkSize(1)、UserControlMessage(4)、WindowAcknowledgementSize(5)等,會調用 decode_message 對該消息進行解碼,然后做相應的處理,其他類型的消息則直接返回,不做處理。

int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(msg != NULL);
    
    /* 檢測服務器從上一次應答或啟動為止是否已經接收到與窗口大小相等的數據,
     * 若是,則必須發送應答消息給對方,若沒有,則不發送此消息 */
    // try to response acknowledgement
    if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) {
        return ret;
    }
    
    SrsPacket* packet = NULL;
    switch (msg->header.message_type) {
        case RTMP_MSG_SetChunkSize:
        case RTMP_MSG_UserControlMessage:
        case RTMP_MSG_WindowAcknowledgementSize:
            if ((ret = decode_message(msg, &packet)) != ERROR_SUCCESS) {
                srs_error("decode packet from message payload failed. ret=%d", ret);
                return ret;
            }
            srs_verbose("decode packet from message payload success.");
            break;
        case RTMP_MSG_VideoMessage:
        case RTMP_MSG_AudioMessage:
            print_debug_info();
        default: 
            /* 若是接收到如 AMF0 Command 消息,則返回 */
            return ret;
    }
    
    srs_assert(packet);
    
    // always free the packet.
    SrsAutoFree(SrsPacket, packet);
    
    swtich (msg->header.message_type) {
        case RTMP_MSG_WindowAcknowledgementSize:
            SrsSetWindowAckSizePacket* pkt = 
                    dynamic_cast<SrsSetWindowAckSizePacket*>(packet);
            srs_assert(pkt != NULL);
            
            if (pkt->ackowledgement_window_size > 0) {
                in_ack_size.window = (uint32_t)pkt->ackowledgement_window_size;
                // @remark, we ignore this message, for user noneed to care.
                // but it's important for dev, for client/server will block if required 
                // ack msg not arrived.
                srs_info("set ack window size to %d", pkt->ackowledgement_window_size);
            } else {
                srs_warn("ignored. set ack window size is %d", 
                         pkt->ackowledgement_window_size);
            }
            break;
        }
        case RTMP_MSG_SetChunkSize: {
            SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet);
            srs_assert(pkt != NULL);

            // for some server, the actual chunk size can greater than the max 
            // value(65536), so we just warning the invalid chunk size, and actually 
            // use it is ok.
            // @see: https://github.com/ossrs/srs/issues/160
            if (pkt->chunk_size < SRS_CONSTS_RTMP_MIN_CHUNK_SIZE 
                || pkt->chunk_size > SRS_CONSTS_RTMP_MAX_CHUNK_SIZE) 
            {
                srs_warn("accept chunk=%d, should in [%d, %d], please see #160",
                         pkt->chunk_size, SRS_CONSTS_RTMP_MIN_CHUNK_SIZE, 
                         SRS_CONSTS_RTMP_MAX_CHUNK_SIZE);
            }

            // @see: https://github.com/ossrs/srs/issues/541
            if (pkt->chunk_size < SRS_CONSTS_RTMP_MIN_CHUNK_SIZE) {
                ret = ERROR_RTMP_CHUNK_SIZE;
                srs_error("chunk size should be %d+, value=%d. ret=%d",
                    SRS_CONSTS_RTMP_MIN_CHUNK_SIZE, pkt->chunk_size, ret);
                return ret;
            }
            
            in_chunk_size = pkt->chunk_size;
            srs_info("in.chunk=%d", pkt->chunk_size);

            break;
        }
        case RTMP_MSG_UserControlMessage: {
            SrsUserControlPacket* pkt = dynamic_cast<SrsUserControlPacket*>(packet);
            srs_assert(pkt != NULL);
            
            if (pkt->event_type == SrcPCUCSetBufferLength) {
                in_buffer_length = pkt->extra_data;
                srs_info("buffer=%d, in.ack=%d, out.ack=%d, in.chunk=%d, out.chunk=%d", 
                         pkt->extra_data, in_ack_size.window, 
                         out_ack_size.window, in_chunk_size, out_chunk_size);
            }
            if (pkt->event_type == SrcPCUCPingRequest) {
                if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) {
                    return ret;
                }
            }
            break;
        }
        default:
            break;
    }
    
    return ret;
}

2.2.9 SrsProtocol::response_acknowledgement_message

檢測是否發送應答消息給對方。

int SrsProtocol::response_acknowledgement_message()
{
    int ret = ERROR_SUCCESS;
    
    /* 若窗口大小小於等於 0,即當前沒有設置窗口大小,則直接返回 */
    if (in_ack_size.window <= 0) {
        return ret;
    }
    
    // ignore when delta bytes not exceed half of window(ack size).
    uint32_t delta = (uint32_t)(skt->get_recv_bytes() - in_ack_size.nb_recv_bytes);
    if (delta < in_ack_size.window / 2) {
        return ret;
    }
    in_ack_size.nb_recv_bytes = skt->get_recv_bytes();
    
    // when the sequence number overflow, reset it.
    uint32_t sequence_number = in_ack_size.sequence_number + delta;
    if (sequence_number > 0xf0000000) {
        sequence_number = delta;
    }
    in_ack_size.sequence_number = sequence_number;
    
    SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();
    pkt->sequence_number = sequence_number;
    
    // cache the message and use flush to send.
    if (!auto_response_when_recv) {
        manual_response_queue.push_back(pkt);
        return ret;
    }
    
    // use underlayer api to send, donot flush again.
    if ((ret = do_send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
        srs_error("send acknowledgement failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("send acknowledgement success.");
    
    return ret;
}

2.2.10 SrsProtocol::decode_message

在 SrsProtocol::expect_message 函數中,調用 recv_message 接收到完整的一個 RTMP 消息后,會接着調用 decode_message 消息對該消息進行解碼。

/**
 * decode bytes oriented RTMP message to RTMP packet,
 * @param ppacket, output decoded packet,
 *      always NULL if error, never NULL if success.
 * @return error when unknown packet, error when decode failed.
 */
int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
    *ppacket = NULL;
    
    int ret = ERROR_SUCCESS;
    
    srs_assert(msg != NULL);
    srs_assert(msg->payload != NULL);
    srs_assert(msg->size > 0);
    
    SrsStream stream;
    
    // initialize the decode stream for all message,
    // it's ok for the initialize if fast and without memory copy.
    if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
        srs_error("initialize stream failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("decode stream initialized success");
    
    // decode the packet.
    SrsPacket* packet = NULL;
    /* 該函數會根據讀取到消息的名稱,構建對應消息類,如 SrsConnectAppResPacket,
     * 然后調用該類的 decode 去解碼消息,返回的 packet 即指向構建的 SrsConnectAppResPacket */
    if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
        srs_freep(packet);
        return ret;
    }
    
    // set to output ppacket only when success.
    *ppacket = packet;
    
    return ret;
}

2.2.11 SrsProtocol::do_decode_message

int SrsProtocol::do_decode_message(SrsMessageHeader& header, 
    SrsStream* stream, SrsPacket** ppacket)
{
    int ret = ERROR_SUCCESS;
    
    SrsPacket* packet = NULL;
    
    // decode specified packet type
    /* AMF 命令消息:AMF0-20, AMF3-17 */
    if (header.is_amf0_command() || header.is_amf3_command() || 
        header.is_amf0_data() || header.is_amf3_data()) {
        srs_verbose("start to decode AMF0/AMF3 command message.");
        
        // skip 1byte to decode the amf3 commmand.
        if (header.is_amf3_command() && stream->require(1)) {
            srs_verbose("skip 1bytes to decode AMF3 command");
            stream->skip(1);
        }
        
        // amf0 command message.
        // need to read the command name.
        std::string command;
        /* 讀取該命令消息的名字 */
        if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
            srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
        
        // result/error packet
        if (commnad == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
            double transactionId = 0.0;
            if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
                srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
                return ret;
            }
            srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
            
            // reset stream, for header read completed.
            stream->skip(-1 * stream->pos());
            if (header.is_amf3_command()) {
                stream->skip(1);
            }
            
            // find the call name
            if (requests.find(transactionId) == requests.end()) {
                ret = ERROR_RTMP_NO_REQUEST;
                srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
                return ret;
            }
            
            std::string request_name = requests[transactionId];
            srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
            
            if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
                srs_info("decode the AMF0/AMF3 response command(%s message).", 
                         request_name.c_str());
                *ppacket = packet = new SrsCreateStreamResPacket(0, 0);
                return packet->decode(stream);
            } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
                || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
                || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
                srs_info("decode the AMF0/AMF3 response command(%s message).", 
                         request_name.c_str());
                *ppacket = packet = new SrsFMLEStartResPacket(0);
                return packet->decode(stream);
            }  else {
                ret = ERROR_RTMP_NO_REQUEST;
                srs_error("decode AMF0/AMF3 request failed. "
                    "request_name=%s, transactionId=%.2f, ret=%d", 
                    request_name.c_str(), transactionId, ret);
                return ret;
            }
        }
        
        /* 恢復 stream 的讀取指針指向該消息負載的起始處 */
        // reset to zero(amf3 to 1) to restart decode.
        stream->skip(-1 * stream->pos());
        if (header.is_amf3_command()) {
            stream->skip(1);
        }
        
        // decode command object.
        if (command == RTMP_AMF0_COMMAND_CONNECT) {
            srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
            /* 接收客戶端的第一個消息一般為 connect,這里構造一個 SrsConnectAppPacket 類 */
            *ppacket = packet = new SrsConnectAppPacket();
            /* 調用 SrsConnectAppPacket::decode 開始解析該消息的負載數據 */
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
            srs_info("decode the AMF0/AMF3 command(createStream message).");
            *ppacket = packet = new SrsCreateStreamPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_PLAY) {
            srs_info("decode the AMF0/AMF3 command(paly message).");
            *ppacket = packet = new SrsPlayPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_PAUSE) {
            srs_info("decode the AMF0/AMF3 command(pause message).");
            *ppacket = packet = new SrsPausePacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
            srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
            srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_PUBLISH) {
            srs_info("decode the AMF0/AMF3 command(publish message).");
            *ppacket = packet = new SrsPublishPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_UNPUBLISH) {
            srs_info("decode the AMF0/AMF3 command(unpublish message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if (command == SRS_CONSTS_RTMP_SET_DATAFRAME || 
                   command == SRS_CONSTS_RTMP_ON_METADATA) {
            srs_info("decode the AMF0/AMF3 data(onMetaData message).");
            *ppacket = packet = new SrsOnMetaDataPacket();
            return packet->decode(stream);
        } else if (command == SRS_BW_CHECK_FINISHED
            || command == SRS_BW_CHECK_PLAYING
            || command == SRS_BW_CHECK_PUBLISHING
            || command == SRS_BW_CHECK_STARTING_PLAY
            || command == SRS_BW_CHECK_STARTING_PUBLISH
            || command == SRS_BW_CHECK_START_PLAY
            || command == SRS_BW_CHECK_START_PUBLISH
            || command == SRS_BW_CHECK_STOPPED_PLAY
            || command == SRS_BW_CHECK_STOP_PLAY
            || command == SRS_BW_CHECK_STOP_PUBLISH
            || command == SRS_BW_CHECK_STOPPED_PUBLISH
            || command == SRS_BW_CHECK_FINAL)
        {
            srs_info("decode the AMF0/AMF3 band width check message.");
            *ppacket = packet = new SrsBandwidthPacket();
            return packet->decode(stream);
        } else if (commmand == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
            srs_info("decode the AMF0/AMF3 closeStream message.");
            *ppacket = packet = new SrsCloseStreamPacket();
            return packet->decode(stream);
        } else if (header.is_amf0_command() || header.is_amf3_command()) {
            srs_info("decode the AMF0/AMF3 call message.");
            *ppacket = packet = new SrsCallPacket();
            return packet->decode(stream);
        }
        
        // default packet to drop message.
        srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
        *ppacket = packet = new SrsPacket();
        return ret;
    
    /* 用戶控制消息:4 */
    } else if (header.is_user_control_message()) {
        srs_verbose("start to decode user control message.");
        *ppacket = packet = new SrsUserControlPacket();
        return packet->decode(stream);
    
    /* 應答窗口大小:5 */
    } else if(header.is_window_ackledgement_size()) {
        srs_verbose("start to decode set ack window size message.");
        *ppacket = packet = new SrsSetWindowAckSizePacket();
        return packet->decode(stream);
        
    /* 設置塊大小: 1 */
    } else if(header.is_set_chunk_size()) {
        srs_verbose("start to decode set chunk size message.");
        *ppacket = packet = new SrsSetChunkSizePacket();
        return packet->decode(stream);
    } else {
        if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) {
            srs_trace("drop unknown message, type=%d", header.message_type);
        }
    }
    
    return ret;
}

2.2.12 構造 SrsConnectAppPacket

handshake 成功后,客戶端會發送 connect 命令消息給服務器,指定要連接服務器的某個 application instance。

#define RTMP_AMF0_COMMAND_CONNECT               "connect"

SrsConnectAppPacket::SrsConnectAppPacket()
{
    /* 該命令的名稱,為 "connect" */
    command_name = RTMP_AMF0_COMMAND_CONNECT;
    /* 對於 connect,該值為 1 */
    transaction_id = 1;
    /**
     * command_object 是 SrsAmf0Object 類指針,說明如下:
     * Command information object which has the name-value pairs.
     * @remark: alloc in packet constructor, user can directly use it,
     *       user shuld never alloc it again which will cause memory leak.
     * @remark, never be NULL.
     */
    /* create an AMF0 empty object instance */
    command_object = SrsAmf0Any::object();
     /* Any optional information
      * @remark, optional, init to and maybe NULL. */ 
    args = NULL;
}

注: AMF0 相關參考 SRS之AMF0的實現

2.2.13 SrsConnectAppPacket::decode

int SrsConnectAppPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    /* 首先讀取該命令消息的名稱 */
    if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode connect command_name failed. ret=%d", ret);
        return ret;
    }
    if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CONNECT) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 decode connect command_name failed. "
            "command_name=%s, ret=%d", command_name.c_str(), ret);
        return ret;
    }
    
    /* 對於 connect,transactionId 為 1 */
    if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("amf0 decode connect transaction_id failed. ret=%d", ret);
        return ret;
    }
    
    // some client donot send id=1.0, so we only warn user if not match.
    if (transaction_id != 1.0) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_warn("amf0 decode connect transaction_id failed. "
                 "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret);
        ret = ERROR_SUCCESS;
    }
    
    /* 讀取該 object 包含的 property,將其按鍵值對的方式保存在 command_object 
     * 的成員 properties 中 */
    if ((ret = command_object->read(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 decode connect command_object failed. ret=%d", ret);
        return ret;
    }
    
    if (!stream->empty()) {
        srs_freep(args);
        
        // see: https://github.com/ossrs/srs/issues/186
        // the args maybe any amf0, for instance, a string. we should drop if not object.
        SrsAmf0Any* any = NULL;
        if ((ret = SrsAmf0Any::discovery(stream, &any)) != ERROR_SUCCESS) {
            srs_error("amf0 find connect args failed. ret=%d", ret);
            return ret;
        }
        srs_assert(any);
        
        // read the instance
        if ((ret = any->read(stream)) != ERROR_SUCCESS) {
            srs_error("amf0 decode connect args failed. ret=%d", ret);
            srs_freep(any);
            return ret;
        }
        
        // drop when not an AMF0 object.
        if (!any->is_object()) {
            srs_warn("drop the args, see: '4.1.1. connect', marker=%#x", any->marker);
            srs_freep(any);
        } else {
            args = any->to_object();
        }
    }
    
    /* 這里表示解碼 connect 成功 */
    srs_info("amf0 decode connect packet success");
    
    return ret;
}

2.3 srs_discovery_tc_url

/**
 * parse the tcUrl, output the schema, host, vhost, app and port.
 * @param tcUrl, the input tcUrl, for example,
 *     rtmp://192.168.1.10:19350/live?vhost=vhost.ossrs.net
 * @param schema, for example, rtmp
 * @param host, for example, 192.168.1.10
 * @param vhost, for example, vhost.ossrs.net.
 *        vhost default to host, when user not set vhost in query of app.
 * @param app, for example, live
 * @param port, for example, 19350
 *       default to 1935 if not specified.
 * param param, for example, vhost=vhost.ossrs.net
 */
void srs_discovery_tc_url(string tcUrl, string& schema, string& host, 
    string& vhost, string& app, string& port, std::string& param)
{
    size_t pos = std::string::npos;
    std::string url = tcUrl;
    
    if ((pos = url.find("://")) != std::string::npos) {
        schema = url.substr(0, pos);
        url = url.substr(schema.length() + 3);
        srs_info("discovery schema=%s", schema.c_str());
    }
    
    if ((pos = url.find("/")) != std::string::npos) {
        host = url.substr(0, pos);
        url = url.substr(host.length() + 1);
        srs_info("discovery host=%s", host.c_str());
    }
    
    port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    if ((pos = host.find(":")) != std::string::npos) {
        port = host.substr(pos + 1);
        host = host.substr(0, pos);
        srs_info("discovery host=%s, port=%s", host.c_str(), port.c_str());
    }
    
    app = url;
    vhost = host;
    srs_vhost_resolve(vhost, app, param);
}

2.3.1 srs_vhost_resolve

/**
 * resolve the vhost in query string
 * @param vhost, update the vhost if query contains the vhost.
 * @param app, may contains the vhost in query string format:
 *     app?vhost=request_vhost
 *     app...vhost...request_vhost
 * @param param, the query, for example, ?vhost=xxx
 */
void srs_vhost_resolve(string& vhost, string& app, string& param)
{
    // get original param
    size_t pos = 0;
    if ((pos = app.find("?")) != std::string::npos) {
        param = app.substr(pos);
    }
    
    // filter tcUrl
    app = srs_string_replace(app, ",", "?");
    app = srs_string_replace(app, "...", "?");
    app = srs_string_replace(app, "&&", "?");
    app = srs_string_replace(app, "=", "?");
    
    if ((pos = app.find("?")) != std::string::npos) {
        std::string query = app.substr(pos + 1);
        app = app.substr(0, pos);
        
        if ((pos = query.find("vhost?")) != std::string::npos) {
            query = query.substr(pos + 6);
            if (!query.empty()) {
                vhost = query;
            }
            if ((pos = vhost.find("?")) != std::string::npos) {
                vhost = vhost.substr(0, pos);
            }
        }
    }
    
    /* others */
}

以上即為 connect_app 的內容。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM