一、協議分層
RTMP包是以Message的結構封裝的,結構如下所示:
1)Message Type ID在1-7的消息用於協議控制,這些消息一般是RTMP協議自身管理要使用的消息,用戶一般情況下無需操作其中的數據。
Message Type ID為8,9的消息分別用於傳輸音頻和視頻數據。Message Type ID為15-20的消息用於發送AMF編碼的命令,負責用戶與服務器之間的交互,比如播放,暫停等等。
2)StreamID是音視頻流的唯一ID, 一路流如果既有音頻包又有視頻包,那么這路流音頻包的StreamID和他視頻包的StreamID相同。
一個Message大小不一,音頻視頻的Message往往差異較大,為了充分利用網絡,需要將一個大的Message中的Body部分拆分到一個或者多個Chunk中
Chunk結構:
在拆分到多個Chunk中的時候,第一個Chunk攜帶完整的Message Header信息
因為一個流當中可以傳輸多個Chunk,那么多個Chunk怎么標記同屬於一個Message的呢?
是通過Chunk Stream ID區分的,同一個Chunk Stream ID必然屬於同一個Message
SRS中的Chunk接收和拼接成Message的代碼可以證明:
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// chunk stream basic header.
char fmt = 0;
int cid = 0;
if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read basic header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
// the cid must not negative.
srs_assert(cid >= 0);
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
// use chunk stream cache to get the chunk info.
// @see https://github.com/ossrs/srs/issues/249
if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
// chunk stream cache hit.
srs_verbose("cs-cache hit, cid=%d", cid);
// already init, use it direclty
chunk = cs_cache[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
} else {
// chunk stream cache miss, use map.
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
}
紅色的是以CID為key的一個Map,每次的第一個Chunk過來的時候,都緩存起來,下一個Chunk來了之后進行追加。
因為TCP的有序,所以同一個Message中不同的Chunk會先后抵達。