1. 綜述
1.1 直播原理
使用 obs 向 nginx 推送一個直播流,該直播流經 nginx-rtmp 的 ngx_rtmp_live_module 模塊轉發給 application live 應用,
然后使用 vlc 連接 live,播放該直播流。
1.2 nginx.conf
# 創建的子進程數
worker_processes 1;
error_log stderr debug;
daemon off;
master_process off;
events {
worker_connections 1024;
}
rtmp {
server {
listen 1935; # rtmp傳輸端口
chunk_size 4096; # 數據傳輸塊大小
application live { # 直播配置
live on;
}
# obs 將流推到該 push 應用,push 應用又將該流發布到 live 應用
application push {
live on;
push rtmp://192.168.1.82:1935/live; # 推流到上面的直播應用
}
}
}
1.3 obs 推流設置
-
點擊 "+" 選擇一個媒體源,確定,然后設置該媒體源,如下圖:

-
點擊 "設置" 選擇 "流",設置推流地址,如下圖,確定后即可進行推流:

1.4 使用 vlc 播放直播流

2. 源碼分析:application push
首先開始分析從 obs 推送 rtmp 流到 nginx 服務器的整個流程。
2.1 監聽連接
nginx 啟動后,就會一直在 ngx_process_events 函數中的 epoll_eait 處休眠,監聽客戶端的連接:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
...
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
/* nginx 最初運行時,timer 為 -1,即一直等待客戶端連接 */
events = epoll_wait(ep, event_list, (int) nevents, timer);
...
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
/* 獲取被監聽的讀事件 */
rev = c->read;
/* 獲取 epoll_wait 返回的事件標志 */
revents = event_list[i].events;
...
/* 若是監聽的事件可讀,首次監聽即表示有新連接到來 */
if ((revents & EPOLLIN) && rev->active) {
...
rev->ready = 1;
/* 若是開啟了負載均衡,則先將該事件添加到 ngx_posted_accept_events
* 延遲隊列中 */
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
/* 否則,直接調用該讀事件的回調函數,若是新連接則
* 調用的是 ngx_event_accept 函數 */
rev->handler(rev);
}
}
...
}
return NGX_OK;
}
ngx_event_accept 函數中主要也就是接受客戶端的連接,並調用該監聽端口對應的回調函數:
void
ngx_event_accept(ngx_event_t *ev)
{
...
do {
...
s = accept(lc->fd, &sa.sockaddr, &socklen);
...
/* 調用該監聽端口對應的回調函數,對於 rtmp 模塊,則固定為 ngx_rtmp_init_connection */
ls->handler(c);
...
} while (ev->available);
}
在 ngx_rtmp_init_connection 函數中先經過一系列的初始化后,開始接收與客戶端進行 rtmp 的 handshake 過程。
下面從 hanshake 到 hanshake 成功后接收到第一個 rtmp 包之間僅以圖片說明,就不再分析源碼了。
2.2 handshake
2.2.1 hs_stage: SERVER_RECV_CHALLENGE(1)
該 hanshake 階段即為等待接收客戶端發送的 C0 和 C1 階段。
receive: Handshake C0+C1 圖(1)

接收到客戶端發送的 C0 和 C1 后,服務器進入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)階段,即為
發送S0 和 S1 階段。
2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)
該 SERVER_SEND_CHALLENGE 階段即為等待接收客戶端發送的 S0 和 S1 階段。但是實際上,服務器在發送完 S0 和
S1 后,進入到 SERVER_SEND_RESPONSE(3) 階段后又立刻發送 S2,因此,在抓到的包如下:
send: Handshake S0+S1+S2 圖(2)

2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)
該階段為等待接收客戶端發送的 C2 階段。
receive:Handshake C2 圖(3)

至此,服務器和客戶端的 rtmp handshake 過程完整,開始正常的信息交互階段。
如下代碼,接收到 C2 后,服務器即進入循環處理客戶端的請求階段:ngx_rtmp_cycle
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
ngx_rtmp_cycle 函數中,重新設置了當前 rtmp 連接的讀、寫事件的回調函數,當監聽到客戶端發送的數據時,將調用
ngx_rtmp_recv 函數進行處理。
void
ngx_rtmp_cycle(ngx_rtmp_session_t *s)
{
ngx_connection_t *c;
c = s->connection;
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
s->ping_evt.data = c;
s->ping_evt.log = c->log;
s->ping_evt.handler = ngx_rtmp_ping;
ngx_rtmp_reset_ping(s);
ngx_rtmp_recv(c->read);
}
在 ngx_rtmp_recv 函數中,會循環接收客戶端發來的 rtmp 包數據,接收到完整的一個 rtmp message 后,會根據該消息
的 rtmp message type,調用相應的函數進行處理,如,若為 20,即為 amf0 類型的命令消息,就會調用
ngx_rtmp_amf_message_handler 函數進行處理。
2.3 connect('push')
hanshake 成功后,接收到客戶端發來的第一個 rtmp 包為連接 nginx.conf 中 rtmp{} 下的 application push{}
應用,如下圖:
receive: connect('push') 圖(4)

從該圖可知,該消息類型為 20,即為 AMF0 Command,因此會調用 ngx_rtmp_amf_message_handler 對該消息進行解析,
然后對其中的命令 connect 調用預先設置好的 ngx_rtmp_cmd_connect_init 回調函數。在 ngx_rtmp_cmd_connect_init
函數中,繼續解析該 connect 余下的消息后,開始 ngx_rtmp_connect 構件的 connect 函數鏈表,該鏈表中存放着各個
rtmp 模塊對該 connect 命令所要做的操作(注:僅有部分 rtmp 模塊會對該 connect 命令設置有回調函數,並且就算
設置了回調函數,也需要在配置文件中啟用相應的模塊才會真正執行該模塊對 connect 的處理)。因此,對於 connect
命令,這里僅會真正處理 ngx_rtmp_cmd_module 模塊設置 ngx_rtmp_cmd_connect 回調函數。
2.3.1 ngx_rtmp_cmd_connect
static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"rtmp cmd: connect");
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t **cacfp;
ngx_uint_t n;
ngx_rtmp_header_t h;
u_char *p;
static double trans;
static double capabilities = NGX_RTMP_CAPABILITIES;
static double object_encoding = 0;
/* 以下內容為服務器將要對客戶端的 connect 命令返回的 amf 類型的響應 */
static ngx_rtmp_amf_elt_t out_obj[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("fmsVer"),
NGX_RTMP_FMS_VERSION, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("capabilities"),
&capabilities, 0 },
};
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Connection succeeded.", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("objectEncoding"),
&object_encoding, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
if (s->connected) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: duplicate connection");
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
trans = v->trans;
/* fill session parameters */
s->connected = 1;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#define NGX_RTMP_SET_STRPAR(name) \
s->name.len = ngx_strlen(v->name); \
s->name.data = ngx_palloc(s->connection->pool, s->name.len); \
ngx_memcpy(s->name.data, v->name, s->name.len)
NGX_RTMP_SET_STRPAR(app);
NGX_RTMP_SET_STRPAR(args);
NGX_RTMP_SET_STRPAR(flashver);
NGX_RTMP_SET_STRPAR(swf_url);
NGX_RTMP_SET_STRPAR(tc_url);
NGX_RTMP_SET_STRPAR(page_url);
#undef NGX_RTMP_SET_STRPAR
p = ngx_strlchr(s->app.data, s->app.data + s->app.len, '?');
if (p) {
s->app.len = (p - s->app.data);
}
s->acodecs = (uint32_t) v->acodecs;
s->vcodecs = (uint32_t) v->vcodecs;
/* 找到客戶端 connect 的應用配置 */
/* find application & set app_conf */
cacfp = cscf->applications.elts;
for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
if ((*cacfp)->name.len == s->app.len &&
ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
{
/* found app! */
s->app_conf = (*cacfp)->app_conf;
break;
}
}
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: '%V'", &s->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* 發送應答窗口大小:ack_size 給客戶端,該消息是用來通知對方應答窗口的大小,
* 發送方在發送了等於窗口大小的數據之后,等的愛接收對方的應答消息(在接收
* 到應答消息之前停止發送數據)。接收當必須發送應答消息,在會話開始時,在
* 會話開始時,會從上一次發送應答之后接收到了等於窗口大小的數據 */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
/* 發送 設置流帶寬消息。發送此消息來說明對方的出口帶寬限制,接收方以此來限制
* 自己的出口帶寬,即限制未被應答的消息數據大小。接收到此消息的一方,如果
* 窗口大小與上一次發送的不一致,應該回復應答窗口大小的消息 */
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
/* 發送 設置塊消息消息,用來通知對方新的最大的塊大小。 */
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
send: ack_size 圖(5)

send: peer bandwidth 圖(6)

send:chunk_size 圖(7)

send:_result('NetConnection.Connect.Success') 圖(8)

2.4 releaseStream('test')
服務器響應客戶端 connect 命令消息后,客戶端接着發送 releaseStream 命令消息給服務器,但是 nginx-rtmp 中沒有
任何一個 rtmp 模塊對該命令設置有回調函數,因此,不進行處理,接着等待接收下一個消息。
receive: releaseStream('test') 圖(9)

2.5 createStream('')
接着服務器接收到客戶端發來的 createStream 命令消息。
receive: createStream('') 圖(10)

從以前的分析可知,此時,會調用 ngx_rtmp_cmd_create_stream_init 函數。
2.5.1 ngx_rtmp_cmd_create_stream_init
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, sizeof(v.trans) },
};
/* 解析該 createStream 命令消息,獲取 v.trans 值,從圖(10) 可知,為 4 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
接着,從該函數中開始調用 ngx_rtmp_create_stream 構建的函數鏈表。這里調用到的是 ngx_rtmp_cmd_create_stream
函數。
2.5.2 ngx_rtmp_cmd_create_stream
static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");
/* support one message stream per connection */
static double stream;
static double trans;
ngx_rtmp_header_t h;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&stream, sizeof(stream) },
};
trans = v->trans;
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
該函數主要是發送服務器對 createStream 的響應。
send: _result()

2.6 publish('test')
接着,客戶端發送 publish 給服務器,用來發布一個有名字的流到服務器,其他客戶端可以使用此流名來播放流,接收
發布的音頻,視頻,以及其他數據消息。
receive:publish('test') 圖(11)

從圖中可知,publish type 為 'live',即服務器不會保存客戶端發布的流到文件中。
2.6.1 ngx_rtmp_cmd_publish_init
static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_publish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.type, sizeof(v.type) },
};
ngx_memzero(&v, sizeof(v));
/* 從 publish 命令消息中獲取 in_elts 中指定的值 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name='%s' args='%s' type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
接着,該函數開始調用 ngx_rtmp_publish 構建的函數鏈表。從 nginx-rtmp 的源碼和 nginx.conf 的配置可知,主要調用
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 兩個函數。
由 rtmp 模塊的排序,首先調用 ngx_rtmp_relay_publish。
2.6.2 ngx_rtmp_relay_publish
static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
if (s->auto_pushed) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && s->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->pushes.nelts == 0) {
goto next;
}
/* v->name 中保存的是從客戶端發送的 publish 命令消息中提取出的要發布的流名稱 */
name.len = ngx_strlen(v->name);
name.data = v->name;
/* 從 pushes 數組中取出首元素,遍歷該數組 */
t = racf->pushes.elts;
for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
target = *t;
/* 配置文件中是否指定了要推流的名稱,若是,則檢測指定的流名字與當前接收到的publish 流名
* 是否一致 */
if (target->name.len && (name.len != target->name.len ||
ngx_memcmp(name.data, target->name.data, name.len)))
{
continue;
}
if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push failed name='%V' app='%V' "
"playpath='%V' url='%V'",
&name, &target->app, &target->play_path,
&target->url.url);
if (!ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
next:
return next_publish(s, v);
}
2.6.3 ngx_rtmp_relay_push
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create push name='%V' app='%V' playpath='%V' url='%V'",
name, &target->app, &target->play_path, &target->url.url);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
2.6.4 ngx_rtmp_relay_create
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
/* 該函數主要是創建一個新的連接,連接推流url中指定的地址,即將該地址作為上游服務器的地址,
* 向該上游服務器發起連接 */
play_ctx = create_play_ctx(s, name, target);
if (play_ctx == NULL) {
return NGX_ERROR;
}
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
/* 創建一個本地 ngx_rtmp_relay_ctx_t */
publish_ctx = create_publish_ctx(s, name, target);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
2.6.4.1 ngx_rtmp_relay_create_remote_ctx
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_conf_ctx_t cctx;
cctx.app_conf = s->app_conf;
cctx.srv_conf = s->srv_conf;
cctx.main_conf = s->main_conf;
return ngx_rtmp_relay_create_connection(&cctx, name, target);
}
2.6.4.2 ngx_rtmp_relay_create_connection
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_addr_t *addr;
ngx_pool_t *pool;
ngx_int_t rc;
ngx_str_t v, *uri;
u_char *first, *last, *p;
racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: create remote context");
pool = NULL;
/* 分配一個內存池 */
pool = ngx_create_pool(4096, racf->log);
if (pool == NULL) {
return NULL;
}
/* 從內存池中為 ngx_rtmp_relay_ctx_t 結構體分配內存 */
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (rctx == NULL) {
goto clear;
}
/* 將發布的流名拷貝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成員 */
if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
goto clear;
}
/* 將配置文件中配置的 push 推流地址,即 url 拷貝到新建的 ngx_rtmp_relay_ctx_t
* 結構體的 url 成員中 */
if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
goto clear;
}
/* target->tag 指向 ngx_rtmp_relay_module 結構體的首地址 */
rctx->tag = target->tag;
/* target->data 指向當前 data 所屬的 ngx_rtmp_relay_ctx_t 結構體的首地址 */
rctx->data = target->data;
#define NGX_RTMP_RELAY_STR_COPY(to, from) \
if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \
goto clear; \
}
/* 將以下 target 中的值拷貝到新建的 ngx_rtmp_relay_ctx_t 結構體的相應成員中 */
NGX_RTMP_RELAY_STR_COPY(app, app);
NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);
NGX_RTMP_RELAY_STR_COPY(page_url, page_url);
NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);
NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);
NGX_RTMP_RELAY_STR_COPY(play_path, play_path);
rctx->live = target->live;
rctx->start = target->start;
rctx->stop = target->stop;
#undef NGX_RTMP_RELAY_STR_COPY
/* 若 app 的值未知 */
if (rctx->app.len == 0 || rctx->play_path.len == 0) {
/* 這里是從推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;"
* 為例,則提出的 live 將賦給 rctx->app */
/* parse uri */
uri = &target->url.uri;
first = uri->data;
last = uri->data + uri->len;
if (first != last && *first == '/') {
++first;
}
if (first != last) {
/* deduce app */
p = ngx_strlchr(first, last, '/');
if (p == NULL) {
p = last;
}
if (rctx->app.len == 0 && first != p) {
/* 這里 v.data 指向 "live" */
v.data = first;
v.len = p - first;
/* 將 "live" 賦給 rctx->app */
if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {
goto clear;
}
}
/* deduce play_path */
if (p != last) {
++p;
}
/* 若播放路徑為 NULL 且 p 不等於 last(注,這里 p 不等於 last 意味着
* "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面還有數據,
* 但是,這里沒有)*/
if (rctx->play_path.len == 0 && p != last) {
v.data = p;
v.len = last - p;
if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)
!= NGX_OK)
{
goto clear;
}
}
}
}
/* 從內存池中為主動連接結構體 ngx_peer_connection_t 分配內存 */
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
}
if (target->url.naddrs == 0) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: no address");
goto clear;
}
/* get address */
/* 獲取 推流地址 url 中指明的服務器地址(即推流的目標地址)
* 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */
addr = &target->url.addrs[target->counter % target->url.naddrs];
target->counter++;
/* copy log to keep shared log unchanged */
rctx->log = *racf->log;
pc->log = &rctx->log;
/* 當使用長連接與上游服務器通信時,可通過該方法由連接池中獲取一個新連接 */
pc->get = ngx_rtmp_relay_get_peer;
/* 當使用長連接與上游服務器通信時,通過該方法將使用完畢的連接釋放給連接池 */
pc->free = ngx_rtmp_relay_free_peer;
/* 遠端服務器的名稱,這里其實就是 "192.168.1.82:1935" 該串字符串 */
pc->name = &addr->name;
pc->socklen = addr->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
/* 將 addr->sockaddr 中保存的遠端服務器的地址信息拷貝到 pc->sockaddr 中 */
ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);
/* 開始連接上游服務器 */
rc = ngx_event_connect_peer(pc);
/* 由 ngx_event_connect_peer 源碼可知,因為 socket 套接字被設置為非阻塞,
* 因為首次 connect 必定失敗,因此該函數返回 NGX_AGAIN */
if (rc != NGX_OK && rc != NGX_AGAIN ) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: connection failed");
goto clear;
}
c = pc->connection;
c->pool = pool;
/* 推流 URL */
c->addr_text = rctx->url;
addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));
if (addr_conf == NULL) {
goto clear;
}
addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));
if (addr_ctx == NULL) {
goto clear;
}
addr_conf->ctx = addr_ctx;
addr_ctx->main_conf = cctx->main_conf;
addr_ctx->srv_conf = cctx->srv_conf;
ngx_str_set(&addr_conf->addr_text, "ngx-relay");
/* 為該主動連接初始化一個會話 */
rs = ngx_rtmp_init_session(c, addr_conf);
if (rs == NULL) {
/* no need to destroy pool */
return NULL;
}
rs->app_conf = cctx->app_conf;
/* 置該標志位為 1 */
rs->relay = 1;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
ngx_str_set(&rs->flashver, "ngx-local-relay");
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif
/* 此時作為客戶端,開始向上游服務器發說送 hanshake 包,即 C0 + C1 */
ngx_rtmp_client_handshake(rs, 1);
return rctx;
clear:
if (pool) {
ngx_destroy_pool(pool);
}
return NULL;
}
2.6.4.3 ngx_event_connect_peer
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
int rc, type;
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
in_port_t port;
#endif
ngx_int_t event;
ngx_err_t err;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* 該 get 方法其實沒有做任何處理 */
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}
type = (pc->type ? pc->type : SOCK_STREAM);
/* 創建一個 socket 套接字 */
s = ngx_socket(pc->sockaddr->sa_family, type, 0);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",
(type == SOCK_STREAM) ? "stream" : "dgram", s);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
/* 從連接池中獲取一個空閑連接 */
c = ngx_get_connection(s, pc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
/* 當前 socket 的類型,是 STREAM 還是 DGRAM,這里為 STREAM */
c->type = type;
/* 若設置了接收緩沖區的大小,從上面知沒有設置 */
if (pc->rcvbuf) {
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(const void *) &pc->rcvbuf, sizeof(int)) == -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_RCVBUF) failed");
goto failed;
}
}
/* 將該 socket 套接字設置為非阻塞 */
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
goto failed;
}
/* local 保存的是本地地址信息,則上面可知,沒有設置 */
if (pc->local) {
#if (NGX_HAVE_TRANSPARENT_PROXY)
if (pc->transparent) {
if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {
goto failed;
}
}
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
port = ngx_inet_get_port(pc->local->sockaddr);
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)
if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {
static int bind_address_no_port = 1;
if (bind_address_no_port) {
if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,
(const void *) &bind_address_no_port,
sizeof(int)) == -1)
{
err = ngx_socket_errno;
if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {
ngx_log_error(NGX_LOG_ALERT, pc->log, err,
"setsockopt(IP_BIND_ADDRESS_NO_PORT) "
"failed, ignored");
} else {
bind_address_no_port = 0;
}
}
}
}
#endif
#if (NGX_LINUX)
if (pc->type == SOCK_DGRAM && port != 0) {
int reuse_addr = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuse_addr, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_REUSEADDR) failed");
goto failed;
}
}
#endif
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
}
if (type == SOCK_STREAM) {
/* 設置當前連接的 IO 回調函數 */
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
/* 使用 sendfile */
c->sendfile = 1;
if (pc->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
} else { /* type == SOCK_DGRAM */
c->recv = ngx_udp_recv;
c->send = ngx_send;
c->send_chain = ngx_udp_send_chain;
}
c->log_error = pc->log_error;
/* 設置當前主動連接讀寫事件的回調函數 */
rev = c->read;
wev = c->write;
rev->log = pc->log;
wev->log = pc->log;
pc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
/* 將該主動連接的讀寫事件添加到 epoll 等事件監控機制中 */
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"connect to %V, fd:%d #%uA", pc->name, s, c->number);
/* 連接該上游服務器,因為該 socket 套接字被設置為非阻塞,因此首次connect返回 -1,即失敗 */
rc = connect(s, pc->sockaddr, pc->socklen);
if (rc == -1) {
err = ngx_socket_errno;
if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
&& err != NGX_EAGAIN
#endif
)
{
if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
/*
* Linux returns EAGAIN instead of ECONNREFUSED
* for unix sockets if listen queue is full
*/
|| err == NGX_EAGAIN
#endif
|| err == NGX_ECONNRESET
|| err == NGX_ENETDOWN
|| err == NGX_ENETUNREACH
|| err == NGX_EHOSTDOWN
|| err == NGX_EHOSTUNREACH)
{
level = NGX_LOG_ERR;
} else {
level = NGX_LOG_CRIT;
}
ngx_log_error(level, c->log, err, "connect() to %V failed",
pc->name);
ngx_close_connection(c);
pc->connection = NULL;
return NGX_DECLINED;
}
}
/* 因此,從這里返回 NGX_AGAIN */
if (ngx_add_conn) {
if (rc == -1) {
/* NGX_EINPROGRESS */
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,
"connect(): %d", rc);
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_blocking_n " failed");
goto failed;
}
/*
* FreeBSD's aio allows to post an operation on non-connected socket.
* NT does not support it.
*
* TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
*/
rev->ready = 1;
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue */
event = NGX_CLEAR_EVENT;
} else {
/* select, poll, /dev/poll */
event = NGX_LEVEL_EVENT;
}
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
goto failed;
}
if (rc == -1) {
/* NGX_EINPROGRESS */
if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
goto failed;
}
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
failed:
ngx_close_connection(c);
pc->connection = NULL;
return NGX_ERROR;
}
2.6.4.4 ngx_rtmp_client_handshake
void
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async)
{
ngx_connection_t *c;
c = s->connection;
/* 設置當前連接讀寫事件的回調函數 */
c->read->handler = ngx_rtmp_handshake_recv;
c->write->handler = ngx_rtmp_handshake_send;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: start client handshake");
/* 為該將要進行的 hanshake 過程分配數據緩存,用於存儲接收/響應的 hanshake 包 */
s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);
/* 設置當前 hanshake 階段,即為 client send: C0 + C1 */
s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;
/* 構建 C0 + C1 的 數據包 */
if (ngx_rtmp_handshake_create_challenge(s,
ngx_rtmp_client_version,
&ngx_rtmp_client_partial_key) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
/* 有前面的調用傳入的參數可知,該值為 1,即為異步,因此這里暫時不向上游服務器發送 handshake,
* 而是將其寫事件添加到定時器和 epoll 中,等待下次循環監控到該寫事件可寫時才發送 C0 + C1 */
if (async) {
/* 將該寫事件添加到定時器中,超時時間為 s->timeout */
ngx_add_timer(c->write, s->timeout);
/* 將該寫事件添加到 epoll 等事件監控機制中 */
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
return;
}
ngx_rtmp_handshake_send(c->write);
}
2.6.4.5 ngx_rtmp_relay_create_local_ctx
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create local context");
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
/* 設置該 push_evt 事件的回調函數 */
ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;
if (ctx->publish) {
return NULL;
}
if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
!= NGX_OK)
{
return NULL;
}
return ctx;
}
從 ngx_rtmp_relay_create_local_ctx 函數返回后,就一直返回到 ngx_rtmp_relay_publish 函數中,接着執行 next_publish 的下
一個函數。這里為 ngx_rtmp_live_publish。
2.6.5 ngx_rtmp_live_publish
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name='%s' type='%s'",
v->name, v->type);
/* join stream as publisher */
/* 構建一個 ngx_rtmp_live_ctx_t 結構體作為發布者 */
ngx_rtmp_live_join(s, v->name, 1);
/* 這里獲取到的就是上面構建的 ngx_rtmp_live_ctx_t 結構體 */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
/* 對之前客戶端發送的 publish 返回一個響應 */
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
send: onStatus('NetStream.Publish.Start') 圖(12)

之后又回到 epoll_wait 處,等待監聽的事件觸發。接下來的分析先看 nginx 的一段打印:
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070
ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8
ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0
ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16
ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10
ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected '192.168.1.82'
ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128
ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16
ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect
ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake
ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638
ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3
ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536
ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable)
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9
ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10
ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done
ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done
首先 fd = 9 為連接上游服務器(192.168.1.82:1935) 時創建的作為客戶端的 STREAM 類型的 socket 套接字,而 fd = 5 為 nginx
啟動時創建的 STREAM 類型的 socket 監聽套接字。因此,從打印中可以看出,上面的打印是這么一個流程:
- epoll 監聽的 fd 為 9 的套接字可寫,因此調用該套接字上寫事件的回調函數,從之前的源碼可知,為
ngx_rtmp_handshake_send 函數,該函數將已經准備好的 C0 和 C1 通過該寫事件對應的 send 函數,即
ngx_unix_send 函數發送給上游服務器(192.168.1.82:1935);發送完后進入 CLIENT_RECV_CHALLENGE(7) 階段,
該階段為等待接收服務器 S0 和 S1 的階段; - epool 監控到服務器 fd:5 有數據可讀,且為新連接,因此調用 ngx_event_accept 接收該客戶端(192.168.1.82:39334)的
連接,接受連接后服務器使用 fd:10 與客戶端進行交互,接着服務器開始進入 handshake 階段; - 下面就開始了服務器 (192.168.1.82:1935, fd = 10) 和 客戶端(192.168.1.82:39334, fd = 9) 的 hanshake 過程,就不再詳
述,和之前分析的 hanshake 一樣。
客戶端發送 C2 后,會進入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 階段,接着會調用該函數 ngx_rtmp_handshake_done:
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
該函數接着會調用到 ngx_rtmp_relay_handshake_done 函數:
static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");
return NGX_OK;
}
/* 主要是向服務器發送 connect 連接命令 */
return ngx_rtmp_relay_send_connect(s);
}
2.7 客戶端(fd = 9)發送:connect
客戶端(192.168.1.82:39334, fd = 9) hanshake 成功后會向服務器發送 connec 連接命令。
2.7.1 ngx_rtmp_relay_send_connect
static ngx_int_t
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");
static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;
static double acodecs = 3575;
static double vcodecs = 252;
static ngx_rtmp_amf_elt_t out_cmd[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("app"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("tcUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("pageUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("swfUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("flashVer"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audioCodecs"),
&acodecs, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videoCodecs"),
&vcodecs, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"connect", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_cmd, sizeof(out_cmd) }
};
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_header_t h;
size_t len, url_len;
u_char *p, *url_end;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
/* app */
if (ctx->app.len) {
out_cmd[0].data = ctx->app.data;
out_cmd[0].len = ctx->app.len;
} else {
out_cmd[0].data = cacf->name.data;
out_cmd[0].len = cacf->name.len;
}
/* tcUrl */
if (ctx->tc_url.len) {
out_cmd[1].data = ctx->tc_url.data;
out_cmd[1].len = ctx->tc_url.len;
} else {
len = sizeof("rtmp://") - 1 + ctx->url.len +
sizeof("/") - 1 + ctx->app.len;
p = ngx_palloc(s->connection->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
out_cmd[1].data = p;
p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);
url_len = ctx->url.len;
url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, '/');
if (url_end) {
url_len = (size_t) (url_end - ctx->url.data);
}
p = ngx_cpymem(p, ctx->url.data, url_len);
*p++ = '/';
p = ngx_cpymem(p, ctx->app.data, ctx->app.len);
out_cmd[1].len = p - (u_char *)out_cmd[1].data;
}
/* pageUrl */
out_cmd[2].data = ctx->page_url.data;
out_cmd[2].len = ctx->page_url.len;
/* swfUrl */
out_cmd[3].data = ctx->swf_url.data;
out_cmd[3].len = ctx->swf_url.len;
/* flashVer */
if (ctx->flash_ver.len) {
out_cmd[4].data = ctx->flash_ver.data;
out_cmd[4].len = ctx->flash_ver.len;
} else {
out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;
out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
}
發送完這幾個 RTMP 包,后,又回到 epoll_wait 中進行監聽。
下面的分析區分一個服務器,兩個客戶端:
- 服務器:192.168.1.82:1935
- 客戶端:obs 推流
- 客戶端:192.168.1.82:xxxx
2.8 服務器 接收 客戶端 obs: amf_meta(18)
此時,監聽到 obs 客戶端發送的類型為 amf_meta(18) 的 rtmp 消息。
receive: @setDataFrame(meta_data 18) 圖(13)

對於 "@setDataFrame",僅有 ngx_rtmp_codec_module 模塊對其設置了會調函數,為 ngx_rtmp_codec_meta_data 函數:
2.8.1 ngx_rtmp_codec_meta_data
static ngx_int_t
ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_codec_app_conf_t *cacf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_uint_t skip;
static struct {
double width;
double height;
double duration;
double frame_rate;
double video_data_rate;
double video_codec_id_n;
u_char video_codec_id_s[32];
double audio_data_rate;
double audio_codec_id_n;
u_char audio_codec_id_s[32];
u_char profile[32];
u_char level[32];
} v;
static ngx_rtmp_amf_elt_t in_video_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.video_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.video_codec_id_s, sizeof(v.video_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_audio_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.audio_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_string("width"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("height"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("duration"),
&v.duration, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("framerate"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("fps"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videodatarate"),
&v.video_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("videocodecid"),
in_video_codec_id, sizeof(in_video_codec_id) },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiodatarate"),
&v.audio_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("audiocodecid"),
in_audio_codec_id, sizeof(in_audio_codec_id) },
{ NGX_RTMP_AMF_STRING,
ngx_string("profile"),
&v.profile, sizeof(v.profile) },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
ngx_memzero(&v, sizeof(v));
/* use -1 as a sign of unchanged data;
* 0 is a valid value for uncompressed audio */
v.audio_codec_id_n = -1;
/* FFmpeg sends a string in front of actal metadata; ignore it */
skip = !(in->buf->last > in->buf->pos
&& *in->buf->pos == NGX_RTMP_AMF_STRING);
if (ngx_rtmp_receive_amf(s, in, in_elts + skip,
sizeof(in_elts) / sizeof(in_elts[0]) - skip))
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"codec: error parsing data frame");
return NGX_OK;
}
ctx->width = (ngx_uint_t) v.width;
ctx->height = (ngx_uint_t) v.height;
ctx->duration = (ngx_uint_t) v.duration;
ctx->frame_rate = (ngx_uint_t) v.frame_rate;
ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;
ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;
ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;
ctx->audio_codec_id = (v.audio_codec_id_n == -1
? 0 : v.audio_codec_id_n == 0
? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);
ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));
ngx_memcpy(ctx->level, v.level, sizeof(v.level));
ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: data frame: "
"width=%ui height=%ui duration=%ui frame_rate=%ui "
"video=%s (%ui) audio=%s (%ui)",
ctx->width, ctx->height, ctx->duration, ctx->frame_rate,
ngx_rtmp_get_video_codec_name(ctx->video_codec_id),
ctx->video_codec_id,
ngx_rtmp_get_audio_codec_name(ctx->audio_codec_id),
ctx->audio_codec_id);
switch (cacf->meta) {
case NGX_RTMP_CODEC_META_ON: // 初始化為該值
return ngx_rtmp_codec_reconstruct_meta(s);
case NGX_RTMP_CODEC_META_COPY:
return ngx_rtmp_codec_copy_meta(s, h, in);
}
/* NGX_RTMP_CODEC_META_OFF */
return NGX_OK;
}
該函數主要是解析 setDataFrame 的數據,然后調用 ngx_rtmp_codec_reconstruct_meta 函數。
2.8.2 ngx_rtmp_codec_reconstruct_meta
static ngx_int_t
ngx_rtmp_codec_reconstruct_meta(ngx_rtmp_session_t *s)
{
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_int_t rc;
static struct {
double width;
double height;
double duration;
double frame_rate;
double video_data_rate;
double video_codec_id;
double audio_data_rate;
double audio_codec_id;
u_char profile[32];
u_char level[32];
} v;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("Server"),
"NGINX RTMP (github.com/arut/nginx-rtmp-module)", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("width"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("height"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("displayWidth"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("displayHeight"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("duration"),
&v.duration, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("framerate"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("fps"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videodatarate"),
&v.video_data_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videocodecid"),
&v.video_codec_id, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiodatarate"),
&v.audio_data_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiocodecid"),
&v.audio_codec_id, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("profile"),
&v.profile, sizeof(v.profile) },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onMetaData", 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (ctx->meta) {
ngx_rtmp_free_shared_chain(cscf, ctx->meta);
ctx->meta = NULL;
}
v.width = ctx->width;
v.height = ctx->height;
v.duration = ctx->duration;
v.frame_rate = ctx->frame_rate;
v.video_data_rate = ctx->video_data_rate;
v.video_codec_id = ctx->video_codec_id;
v.audio_data_rate = ctx->audio_data_rate;
v.audio_codec_id = ctx->audio_codec_id;
ngx_memcpy(v.profile, ctx->profile, sizeof(ctx->profile));
ngx_memcpy(v.level, ctx->level, sizeof(ctx->level));
rc = ngx_rtmp_append_amf(s, &ctx->meta, NULL, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
if (rc != NGX_OK || ctx->meta == NULL) {
return NGX_ERROR;
}
return ngx_rtmp_codec_prepare_meta(s, 0);
}
2.8.3 ngx_rtmp_codec_prepare_meta
static ngx_int_t
ngx_rtmp_codec_prepare_meta(ngx_rtmp_session_t *s, uint32_t timestamp)
{
ngx_rtmp_header_t h;
ngx_rtmp_codec_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
h.type = NGX_RTMP_MSG_AMF_META;
h.timestamp = timestamp;
/* 構造完整的 rtmp 消息 */
ngx_rtmp_prepare_message(s, &h, NULL, ctx->meta);
ctx->meta_version = ngx_rtmp_codec_get_next_version();
return NGX_OK;
}
2.9 服務器 接收 客戶端(192.168.1.82:xxx):chunk_size(1)
服務器接收到客戶端發送的設置塊大小消息。此時服務器會調用到 ngx_rtmp_set_chunk_size 函數進行塊大小的設置。
2.9.1 ngx_rtmp_set_chunk_size
ngx_int_t
ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *li, *fli, *lo, *flo;
ngx_buf_t *bi, *bo;
ngx_int_t n;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"setting chunk_size=%ui", size);
if (size > NGX_RTMP_MAX_CHUNK_SIZE) {
ngx_log_error(NGX_LOG_ALERT, s->connection->log, 0,
"too big RTMP chunk size:%ui", size);
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
s->in_old_pool = s->in_pool;
s->in_chunk_size = size;
s->in_pool = ngx_create_pool(4096, s->connection->log);
/* copy existing chunk data */
if (s->in_old_pool) {
s->in_chunk_size_changing = 1;
s->in_streams[0].in = NULL;
for(n = 1; n < cscf->max_streams; ++n) {
/* stream buffer is circular
* for all streams except for the current one
* (which caused this chunk size change);
* we can simply ignore it */
li = s->in_streams[n].in;
if (li == NULL || li->next == NULL) {
s->in_streams[n].in = NULL;
continue;
}
/* move from last to the first */
li = li->next;
fli = li;
lo = ngx_rtmp_alloc_in_buf(s);
if (lo == NULL) {
return NGX_ERROR;
}
flo = lo;
for ( ;; ) {
bi = li->buf;
bo = lo->buf;
if (bo->end - bo->last >= bi->last - bi->pos) {
bo->last = ngx_cpymem(bo->last, bi->pos,
bi->last - bi->pos);
li = li->next;
if (li == fli) {
lo->next = flo;
s->in_streams[n].in = lo;
break;
}
continue;
}
bi->pos += (ngx_cpymem(bo->last, bi->pos,
bo->end - bo->last) - bo->last);
lo->next = ngx_rtmp_alloc_in_buf(s);
lo = lo->next;
if (lo == NULL) {
return NGX_ERROR;
}
}
}
}
return NGX_OK;
}
2.10 服務器 接收 客戶端(192.168.1.82:xxx):ack_size(5)
服務器接收到客戶端發送的設置應答窗口大小的消息。
2.10.1 ngx_rtmp_protocol_message_handler
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive ack_size=%uD", val);
/* 直接設置應答窗口大小 */
s->ack_size = val;
break;
...
}
}
2.11 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 connect
服務器接收到客戶端發送的 connect 連接命令。該客戶端要連接的 app 為 live。
抓不到包,只能看打印:
AMF read (1) 00 '?'
AMF read (8) 3F F0 00 00 00 00 00 00 '????????'
AMF read (1) 03 '?'
AMF read (2) 00 03 '??'
AMF read (3) 61 70 70 'app'
AMF read (1) 02 '?'
AMF read (2) 00 04 '??'
AMF read (4) 6C 69 76 65 'live'
AMF read (2) 00 05 '??'
AMF read (5) 74 63 55 72 6C 'tcUrl'
AMF read (1) 02 '?'
AMF read (2) 00 1D '??'
AMF read (29) 72 74 6D 70 3A 2F 2F 31 39 32 2E 31 36 38 2E 31 'rtmp://192.168.1'
AMF read (2) 00 07 '??'
AMF read (7) 70 61 67 65 55 72 6C 'pageUrl'
AMF read (1) 02 '?'
AMF read (2) 00 00 '??'
AMF read (2) 00 06 '??'
AMF read (6) 73 77 66 55 72 6C 'swfUrl'
AMF read (1) 02 '?'
AMF read (2) 00 00 '??'
AMF read (2) 00 08 '??'
AMF read (8) 66 6C 61 73 68 56 65 72 'flashVer'
AMF read (1) 02 '?'
AMF read (2) 00 0F '??'
AMF read (15) 4C 4E 58 2E 31 31 2C 31 2C 31 30 32 2C 35 35 'LNX.11,1,102,55'
AMF read (2) 00 0B '??'
AMF read (11) 61 75 64 69 6F 43 6F 64 65 63 73 'audioCodecs'
AMF read (1) 00 '?'
AMF read (8) 40 AB EE 00 00 00 00 00 '@???????'
AMF read (2) 00 0B '??'
AMF read (11) 76 69 64 65 6F 43 6F 64 65 63 73 'videoCodecs'
AMF read (1) 00 '?'
AMF read (8) 40 6F 80 00 00 00 00 00 '@o??????'
AMF read (2) 00 00 '??'
AMF read (1) 09 '?'
2.11.1 ngx_rtmp_cmd_connect
static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"rtmp cmd: connect");
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t **cacfp;
ngx_uint_t n;
ngx_rtmp_header_t h;
u_char *p;
static double trans;
static double capabilities = NGX_RTMP_CAPABILITIES;
static double object_encoding = 0;
/* 以下內容為服務器將要對客戶端的 connect 命令返回的 amf 類型的響應 */
static ngx_rtmp_amf_elt_t out_obj[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("fmsVer"),
NGX_RTMP_FMS_VERSION, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("capabilities"),
&capabilities, 0 },
};
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Connection succeeded.", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("objectEncoding"),
&object_encoding, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
if (s->connected) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: duplicate connection");
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
trans = v->trans;
/* fill session parameters */
s->connected = 1;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#define NGX_RTMP_SET_STRPAR(name) \
s->name.len = ngx_strlen(v->name); \
s->name.data = ngx_palloc(s->connection->pool, s->name.len); \
ngx_memcpy(s->name.data, v->name, s->name.len)
NGX_RTMP_SET_STRPAR(app);
NGX_RTMP_SET_STRPAR(args);
NGX_RTMP_SET_STRPAR(flashver);
NGX_RTMP_SET_STRPAR(swf_url);
NGX_RTMP_SET_STRPAR(tc_url);
NGX_RTMP_SET_STRPAR(page_url);
#undef NGX_RTMP_SET_STRPAR
p = ngx_strlchr(s->app.data, s->app.data + s->app.len, '?');
if (p) {
s->app.len = (p - s->app.data);
}
s->acodecs = (uint32_t) v->acodecs;
s->vcodecs = (uint32_t) v->vcodecs;
/* 找到客戶端 connect 的應用配置 */
/* find application & set app_conf */
cacfp = cscf->applications.elts;
for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
if ((*cacfp)->name.len == s->app.len &&
ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
{
/* found app! */
s->app_conf = (*cacfp)->app_conf;
break;
}
}
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: '%V'", &s->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* 發送應答窗口大小:ack_size 給客戶端,該消息是用來通知對方應答窗口的大小,
* 發送方在發送了等於窗口大小的數據之后,等的愛接收對方的應答消息(在接收
* 到應答消息之前停止發送數據)。接收當必須發送應答消息,在會話開始時,在
* 會話開始時,會從上一次發送應答之后接收到了等於窗口大小的數據 */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
/* 發送 設置流帶寬消息。發送此消息來說明對方的出口帶寬限制,接收方以此來限制
* 自己的出口帶寬,即限制未被應答的消息數據大小。接收到此消息的一方,如果
* 窗口大小與上一次發送的不一致,應該回復應答窗口大小的消息 */
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
/* 發送 設置塊消息消息,用來通知對方新的最大的塊大小。 */
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
這里,服務器向客戶端(192.168.1.82:xxxx)發送了 ack_size、bandwidth、chunk_size 和 對 connect 的響應的包。
2.12 客戶端(192.168.1.82:xxx) 接收 服務器: ack_size(5)
客戶端接收到服務器發來的設置應答窗口大小的消息。
2.12.1 ngx_rtmp_protocol_message_handler
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive ack_size=%uD", val);
/* 直接設置應答窗口大小 */
s->ack_size = val;
break;
...
}
}
2.13 客戶端(192.168.1.82:xxx) 接收 服務器: bandwidth(6)
客戶端接收到服務器發來的設置流帶寬的消息。
2.13.1 ngx_rtmp_protocol_message_handler
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_BANDWIDTH:
if (b->last - b->pos >= 5) {
limit = *(uint8_t*)&b->pos[4];
(void)val;
(void)limit;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive bandwidth=%uD limit=%d",
val, (int)limit);
/* receive window size =val
* && limit */
}
break;
...
}
}
2.13 客戶端(192.168.1.82:xxx) 接收 服務器: chunk_size(1)
客戶端接收到服務器發來的設置塊大小的消息。因此調用 ngx_rtmp_set_chunk_size 函數進行設置。
2.13 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 _result()
客戶端接收到服務器發送的對 connect 的響應:_result(NetConnection.Connect.Success)。
2.13.1 ngx_rtmp_relay_on_result
static ngx_int_t
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _result: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
switch ((ngx_int_t)v.trans) {
case NGX_RTMP_RELAY_CONNECT_TRANS:
/* 向服務器發送 createStream 命令 */
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx && !s->static_relay) {
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_play_local(s);
} else {
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_publish_local(s);
}
default:
return NGX_OK;
}
}
該函數中首先解析接收到響應數據,然后根據 v.trans 調用相應的函數進行處理,這里為調用 ngx_rtmp_relay_send_create_stream。
2.13.2 ngx_rtmp_relay_send_create_stream
static ngx_int_t
ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send create stream");
static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"createStream", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 }
};
ngx_rtmp_header_t h;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
該函數主要是構建 createStream 包,然后發送給服務器。
2.14 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 createStream
服務器接收到客戶端發來的 createStream 命令消息。
2.14.1 ngx_rtmp_cmd_create_stream_init
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, sizeof(v.trans) },
};
/* 解析該 createStream 命令消息,獲取 v.trans 值 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
接着,從該函數中開始調用 ngx_rtmp_create_stream 構建的函數鏈表。這里調用到的是 ngx_rtmp_cmd_create_stream
函數。
2.14.2 ngx_rtmp_cmd_create_stream
static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");
/* support one message stream per connection */
static double stream;
static double trans;
ngx_rtmp_header_t h;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&stream, sizeof(stream) },
};
trans = v->trans;
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
該函數是構建對 createStream 的響應。
2.15 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 _result()
客戶端接收到服務器對 createStream 的響應包:_result()
2.15.1 ngx_rtmp_relay_on_result
static ngx_int_t
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _result: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
switch ((ngx_int_t)v.trans) {
case NGX_RTMP_RELAY_CONNECT_TRANS:
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx && !s->static_relay) {
/* 向服務器發送 publish 命令 */
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_play_local(s);
} else {
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_publish_local(s);
}
default:
return NGX_OK;
}
}
該函數中首先解析接收到響應數據,然后根據 v.trans 調用相應的函數進行處理,這里為調用 ngx_rtmp_relay_send_publish。
2.15.2 ngx_rtmp_relay_send_publish
static ngx_int_t
ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send publish");
static double trans;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"publish", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 }, /* <- to fill */
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"live", 0 }
};
ngx_rtmp_header_t h;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
if (ctx->play_path.len) {
out_elts[3].data = ctx->play_path.data;
out_elts[3].len = ctx->play_path.len;
} else {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF;
h.msid = NGX_RTMP_RELAY_MSID;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
2.15.3 ngx_rtmp_relay_play_local
static ngx_int_t
ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s)
{
ngx_rtmp_play_t v;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_memzero(&v, sizeof(ngx_rtmp_play_t));
v.silent = 1;
*(ngx_cpymem(v.name, ctx->name.data,
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
return ngx_rtmp_play(s, &v);
}
在該函數中又調用 ngx_rtmp_play 構建的函數鏈表,這里主要調用了 ngx_rtmp_live_play 函數。
2.15.4 ngx_rtmp_live_play
static ngx_int_t
ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: play: name='%s' start=%uD duration=%uD reset=%d",
v->name, (uint32_t) v->start,
(uint32_t) v->duration, (uint32_t) v->reset);
/* join stream as subscriber */
ngx_rtmp_live_join(s, v->name, 0);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent && !lacf->play_restart) {
ngx_rtmp_send_status(s, "NetStream.Play.Start",
"status", "Start live");
ngx_rtmp_send_sample_access(s);
}
next:
return next_play(s, v);
}
2.16 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 publish('test')
服務器接收到客戶端的 publish 命令。
2.16.1 ngx_rtmp_cmd_publish_init
static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_publish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.type, sizeof(v.type) },
};
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name='%s' args='%s' type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
當前客戶端連接的 application 為 live,而該 application{} 下沒有 push,因此這里主要調用 ngx_rtmp_live_publish。
2.16.2 ngx_rtmp_live_publish
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name='%s' type='%s'",
v->name, v->type);
/* join stream as publisher */
ngx_rtmp_live_join(s, v->name, 1);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
/* 發送對 publish 的響應 */
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
2.17 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 onStatus
客戶端接收到服務器發送的對 publish 的響應。表示客戶端可以向服務器發布流了。
2.17.1 ngx_rtmp_relay_on_status
static ngx_int_t
ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
static ngx_rtmp_amf_elt_t in_elts_meta[] = {
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (h->type == NGX_RTMP_MSG_AMF_META) {
ngx_rtmp_receive_amf(s, in, in_elts_meta,
sizeof(in_elts_meta) / sizeof(in_elts_meta[0]));
} else {
ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0]));
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: onStatus: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
return NGX_OK;
}
2.18 服務器 接收 客戶端(obs): audio(8)
服務器接收到客戶端 obs 發送的音頻包。
receive: audio(8) 圖(14)

對於 NGX_RTMP_MSG_AUDIO(8),主要有以下幾個 rtmp 模塊設置了回調函數:
- ngx_rtmp_dash_module
- ngx_rtmp_hls_module
- ngx_rtmp_live_module
- ngx_rtmp_record_module
- ngx_rtmp_codec_module
這里主要調用 codec 和 live 模塊設置的回調函數,首先調用 ngx_rtmp_codec_module 模塊設置的回調函數 ngx_rtmp_codec_av。
2.18.1 ngx_rtmp_codec_av
static ngx_int_t
ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_chain_t **header;
uint8_t fmt;
static ngx_uint_t sample_rates[] =
{ 5512, 11025, 22050, 44100 };
if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
/* save codec */
if (in->buf->last - in->buf->pos < 1) {
return NGX_OK;
}
fmt = in->buf->pos[0];
if (h->type == NGX_RTMP_MSG_AUDIO) {
ctx->audio_codec_id = (fmt & 0xf0) >> 4;
ctx->audio_channels = (fmt & 0x01) + 1;
ctx->sample_size = (fmt & 0x02) ? 2 : 1;
if (ctx->sample_rate == 0) {
ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];
}
} else {
ctx->video_codec_id = (fmt & 0x0f);
}
/* save AVC/AAC header */
if (in->buf->last - in->buf->pos < 3) {
return NGX_OK;
}
/* no conf */
if (!ngx_rtmp_is_codec_header(in)) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
header = NULL;
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {
header = &ctx->aac_header;
ngx_rtmp_codec_parse_aac_header(s, in);
}
} else {
if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {
header = &ctx->avc_header;
ngx_rtmp_codec_parse_avc_header(s, in);
}
}
if (header == NULL) {
return NGX_OK;
}
if (*header) {
ngx_rtmp_free_shared_chain(cscf, *header);
}
*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
return NGX_OK;
}
2.18.2 ngx_rtmp_codec_parse_aac_header
static void
ngx_rtmp_codec_parse_aac_header(ngx_rtmp_session_t *s, ngx_chain_t *in)
{
ngx_uint_t idx;
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_bit_reader_t br;
static ngx_uint_t aac_sample_rates[] =
{ 96000, 88200, 64000, 48000,
44100, 32000, 24000, 22050,
16000, 12000, 11025, 8000,
7350, 0, 0, 0 };
#if (NGX_DEBUG)
ngx_rtmp_codec_dump_header(s, "aac", in);
#endif
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);
/* 讀取 16 bit 的值,這里讀取到的值不做處理,相當於跳過 16 bit */
ngx_rtmp_bit_read(&br, 16);
/* 讀取 5 bit 的 aac_profile 值 */
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);
if (ctx->aac_profile == 31) {
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;
}
idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (idx == 15) {
ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);
} else {
ctx->sample_rate = aac_sample_rates[idx];
}
ctx->aac_chan_conf = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (ctx->aac_profile == 5 || ctx->aac_profile == 29) {
if (ctx->aac_profile == 29) {
ctx->aac_ps = 1;
}
ctx->aac_sbr = 1;
idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (idx == 15) {
ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);
} else {
ctx->sample_rate = aac_sample_rates[idx];
}
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);
if (ctx->aac_profile == 31) {
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;
}
}
/* MPEG-4 Audio Specific Config
5 bits: object type
if (object type == 31)
6 bits + 32: object type
4 bits: frequency index
if (frequency index == 15)
24 bits: frequency
4 bits: channel configuration
if (object_type == 5)
4 bits: frequency index
if (frequency index == 15)
24 bits: frequency
5 bits: object type
if (object type == 31)
6 bits + 32: object type
var bits: AOT Specific Config
*/
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: aac header profile=%ui, "
"sample_rate=%ui, chan_conf=%ui",
ctx->aac_profile, ctx->sample_rate, ctx->aac_chan_conf);
}
2.18.3 ngx_rtmp_bit_init_reader
void
ngx_rtmp_bit_init_reader(ngx_rtmp_bit_reader_t *br, u_char *pos, u_char *last)
{
ngx_memzero(br, sizeof(ngx_rtmp_bit_reader_t));
br->pos = pos;
br->last = last;
}
該函數初始化一個 bit reader。
2.18.4 ngx_rtmp_bit_read
uint64_t
ngx_rtmp_bit_read(ngx_rtmp_bit_reader_t *br, ngx_uint_t n)
{
uint64_t v;
ngx_uint_t d;
v = 0;
while (n) {
/* 若已經讀取到尾部,則置位錯誤標志位 */
if (br->pos >= br->last) {
br->err = 1;
return 0;
}
/* 控制一次讀取的 bit 數不超過 8 bit */
d = (br->offs + n > 8 ? (ngx_uint_t) (8 - br->offs) : n);
v <<= d;
/* 將讀取到的值追加到 v 中 */
v += (*br->pos >> (8 - br->offs - d)) & ((u_char) 0xff >> (8 - d));
/* 更新 bit reader 的 偏移值 offs */
br->offs += d;
n -= d;
/* 若偏移值為8,則重置該偏移值 */
if (br->offs == 8) {
br->pos++;
br->offs = 0;
}
}
return v;
}
2.18.5 ngx_rtmp_live_av
static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_rtmp_codec_ctx_t *codec_ctx;
ngx_chain_t *header, *coheader, *meta,
*apkt, *aapkt, *acopkt, *rpkt;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch, lh, clh;
ngx_int_t rc, mandatory, dummy_audio;
ngx_uint_t prio;
ngx_uint_t peers;
ngx_uint_t meta_version;
ngx_uint_t csidx;
uint32_t delta;
ngx_rtmp_live_chunk_stream_t *cs;
#ifdef NGX_DEBUG
const char *type_s;
type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio");
#endif
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return NGX_ERROR;
}
if (!lacf->live || in == NULL || in->buf == NULL) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || ctx->stream == NULL) {
return NGX_OK;
}
if (ctx->publishing == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", type_s);
return NGX_OK;
}
/* 若當前流處於未活躍狀態 */
if (!ctx->stream->active) {
ngx_rtmp_live_start(s);
}
if (ctx->idle_evt.timer_set) {
ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s packet timestamp=%uD",
type_s, h->timestamp);
s->current_time = h->timestamp;
peers = 0;
apkt = NULL;
aapkt = NULL;
acopkt = NULL;
header = NULL;
coheader = NULL;
meta = NULL;
meta_version = 0;
mandatory = 0;
prio = (h->type == NGX_RTMP_MSG_VIDEO ?
ngx_rtmp_get_video_frame_type(in) : 0);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
cs = &ctx->cs[csidx];
ngx_memzero(&ch, sizeof(ch));
ch.timestamp = h->timestamp;
ch.msid = NGX_RTMP_MSID;
ch.csid = cs->csid;
ch.type = h->type;
lh = ch;
if (cs->active) {
lh.timestamp = cs->timestamp;
}
clh = lh;
clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :
NGX_RTMP_MSG_AUDIO);
cs->active = 1;
cs->timestamp = ch.timestamp;
delta = ch.timestamp - lh.timestamp;
/*
if (delta >> 31) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: clipping non-monotonical timestamp %uD->%uD",
lh.timestamp, ch.timestamp);
delta = 0;
ch.timestamp = lh.timestamp;
}
*/
rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);
codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (codec_ctx) {
if (h->type == NGX_RTMP_MSG_AUDIO) {
header = codec_ctx->aac_header;
if (lacf->interleave) {
coheader = codec_ctx->avc_header;
}
if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
} else {
header = codec_ctx->avc_header;
if (lacf->interleave) {
coheader = codec_ctx->aac_header;
}
if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
}
if (codec_ctx->meta) {
meta = codec_ctx->meta;
meta_version = codec_ctx->meta_version;
}
}
/* broadcast to all subscribers */
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx == ctx || pctx->paused) {
continue;
}
ss = pctx->session;
cs = &pctx->cs[csidx];
/* send metadata */
if (meta && meta_version != pctx->meta_version) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: meta");
if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
pctx->meta_version = meta_version;
}
}
/* sync stream */
if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sync %s dropped=%uD", type_s, cs->dropped);
cs->active = 0;
cs->dropped = 0;
}
/* absolute packet */
if (!cs->active) {
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skipping header");
continue;
}
if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&
!pctx->cs[0].active)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: waiting for video");
continue;
}
if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&
(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skip non-key");
continue;
}
dummy_audio = 0;
if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&
!pctx->cs[1].active)
{
dummy_audio = 1;
if (aapkt == NULL) {
aapkt = ngx_rtmp_alloc_shared_buf(cscf);
ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);
}
}
if (header || coheader) {
/* send absolute codec header */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s header timestamp=%uD",
type_s, lh.timestamp);
if (header) {
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
ngx_rtmp_prepare_message(s, &lh, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, 0);
if (rc != NGX_OK) {
continue;
}
}
if (coheader) {
if (acopkt == NULL) {
acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);
ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);
}
rc = ngx_rtmp_send_message(ss, acopkt, 0);
if (rc != NGX_OK) {
continue;
}
} else if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
cs->timestamp = lh.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
} else {
/* send absolute packet */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s packet timestamp=%uD",
type_s, ch.timestamp);
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, prio);
if (rc != NGX_OK) {
continue;
}
cs->timestamp = ch.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
++peers;
if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
continue;
}
}
/* send relative packet */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: rel %s packet delta=%uD",
type_s, delta);
if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {
++pctx->ndropped;
cs->dropped += delta;
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: mandatory packet failed");
ngx_rtmp_finalize_session(ss);
}
continue;
}
cs->timestamp += delta;
++peers;
ss->current_time = cs->timestamp;
}
if (rpkt) {
ngx_rtmp_free_shared_chain(cscf, rpkt);
}
if (apkt) {
ngx_rtmp_free_shared_chain(cscf, apkt);
}
if (aapkt) {
ngx_rtmp_free_shared_chain(cscf, aapkt);
}
if (acopkt) {
ngx_rtmp_free_shared_chain(cscf, acopkt);
}
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?
&ctx->stream->bw_in_audio :
&ctx->stream->bw_in_video,
h->mlen);
return NGX_OK;
}
該函數的主要是將接收到來自客戶端 obs 發送來的 音視頻 數據轉發給該流的訂購者,即 application live。
2.18.6 ngx_rtmp_live_start
static void
ngx_rtmp_live_start(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_chain_t *control;
ngx_chain_t *status[3];
size_t n, nstatus;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
/* 構建好 stream_begin rtmp 包 */
control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID);
nstatus = 0;
if (lacf->play_restart) {
status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start",
"status", "Start live");
status[nstatus++] = ngx_rtmp_create_sample_access(s);
}
if (lacf->publish_notify) {
status[nstatus++] = ngx_rtmp_create_status(s,
"NetStream.Play.PublishNotify",
"status", "Start publishing");
}
ngx_rtmp_live_set_status(s, control, status, nstatus, 1);
if (control) {
ngx_rtmp_free_shared_chain(cscf, control);
}
for (n = 0; n < nstatus; ++n) {
ngx_rtmp_free_shared_chain(cscf, status[n]);
}
}
2.19 服務器 接收 客戶端(obs): video(9)
receive: video(9) 圖(15)

2.19.1 ngx_rtmp_codec_av
static ngx_int_t
ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_chain_t **header;
uint8_t fmt;
static ngx_uint_t sample_rates[] =
{ 5512, 11025, 22050, 44100 };
if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
/* save codec */
if (in->buf->last - in->buf->pos < 1) {
return NGX_OK;
}
fmt = in->buf->pos[0];
if (h->type == NGX_RTMP_MSG_AUDIO) {
ctx->audio_codec_id = (fmt & 0xf0) >> 4;
ctx->audio_channels = (fmt & 0x01) + 1;
ctx->sample_size = (fmt & 0x02) ? 2 : 1;
if (ctx->sample_rate == 0) {
ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];
}
} else {
ctx->video_codec_id = (fmt & 0x0f);
}
/* save AVC/AAC header */
if (in->buf->last - in->buf->pos < 3) {
return NGX_OK;
}
/* no conf */
if (!ngx_rtmp_is_codec_header(in)) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
header = NULL;
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {
header = &ctx->aac_header;
ngx_rtmp_codec_parse_aac_header(s, in);
}
} else {
if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {
header = &ctx->avc_header;
ngx_rtmp_codec_parse_avc_header(s, in);
}
}
if (header == NULL) {
return NGX_OK;
}
if (*header) {
ngx_rtmp_free_shared_chain(cscf, *header);
}
*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
return NGX_OK;
}
2.19.2 ngx_rtmp_codec_parse_avc_header
static void
ngx_rtmp_codec_parse_avc_header(ngx_rtmp_session_t *s, ngx_chain_t *in)
{
ngx_uint_t profile_idc, width, height, crop_left, crop_right,
crop_top, crop_bottom, frame_mbs_only, n, cf_idc,
num_ref_frames;
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_bit_reader_t br;
#if (NGX_DEBUG)
ngx_rtmp_codec_dump_header(s, "avc", in);
#endif
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);
ngx_rtmp_bit_read(&br, 48);
ctx->avc_profile = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
ctx->avc_compat = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
ctx->avc_level = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
/* nal bytes */
ctx->avc_nal_bytes = (ngx_uint_t) ((ngx_rtmp_bit_read_8(&br) & 0x03) + 1);
/* nnals */
if ((ngx_rtmp_bit_read_8(&br) & 0x1f) == 0) {
return;
}
/* nal size */
ngx_rtmp_bit_read(&br, 16);
/* nal type */
if (ngx_rtmp_bit_read_8(&br) != 0x67) {
return;
}
/* SPS */
/* profile idc */
profile_idc = (ngx_uint_t) ngx_rtmp_bit_read(&br, 8);
/* flags */
ngx_rtmp_bit_read(&br, 8);
/* level idc */
ngx_rtmp_bit_read(&br, 8);
/* SPS id */
ngx_rtmp_bit_read_golomb(&br);
if (profile_idc == 100 || profile_idc == 110 ||
profile_idc == 122 || profile_idc == 244 || profile_idc == 44 ||
profile_idc == 83 || profile_idc == 86 || profile_idc == 118)
{
/* chroma format idc */
cf_idc = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
if (cf_idc == 3) {
/* separate color plane */
ngx_rtmp_bit_read(&br, 1);
}
/* bit depth luma - 8 */
ngx_rtmp_bit_read_golomb(&br);
/* bit depth chroma - 8 */
ngx_rtmp_bit_read_golomb(&br);
/* qpprime y zero transform bypass */
ngx_rtmp_bit_read(&br, 1);
/* seq scaling matrix present */
if (ngx_rtmp_bit_read(&br, 1)) {
for (n = 0; n < (cf_idc != 3 ? 8u : 12u); n++) {
/* seq scaling list present */
if (ngx_rtmp_bit_read(&br, 1)) {
/* TODO: scaling_list()
if (n < 6) {
} else {
}
*/
}
}
}
}
/* log2 max frame num */
ngx_rtmp_bit_read_golomb(&br);
/* pic order cnt type */
switch (ngx_rtmp_bit_read_golomb(&br)) {
case 0:
/* max pic order cnt */
ngx_rtmp_bit_read_golomb(&br);
break;
case 1:
/* delta pic order alwys zero */
ngx_rtmp_bit_read(&br, 1);
/* offset for non-ref pic */
ngx_rtmp_bit_read_golomb(&br);
/* offset for top to bottom field */
ngx_rtmp_bit_read_golomb(&br);
/* num ref frames in pic order */
num_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
for (n = 0; n < num_ref_frames; n++) {
/* offset for ref frame */
ngx_rtmp_bit_read_golomb(&br);
}
}
/* num ref frames */
ctx->avc_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* gaps in frame num allowed */
ngx_rtmp_bit_read(&br, 1);
/* pic width in mbs - 1 */
width = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* pic height in map units - 1 */
height = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* frame mbs only flag */
frame_mbs_only = (ngx_uint_t) ngx_rtmp_bit_read(&br, 1);
if (!frame_mbs_only) {
/* mbs adaprive frame field */
ngx_rtmp_bit_read(&br, 1);
}
/* direct 8x8 inference flag */
ngx_rtmp_bit_read(&br, 1);
/* frame cropping */
if (ngx_rtmp_bit_read(&br, 1)) {
crop_left = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_right = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_top = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_bottom = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
} else {
crop_left = 0;
crop_right = 0;
crop_top = 0;
crop_bottom = 0;
}
ctx->width = (width + 1) * 16 - (crop_left + crop_right) * 2;
ctx->height = (2 - frame_mbs_only) * (height + 1) * 16 -
(crop_top + crop_bottom) * 2;
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: avc header "
"profile=%ui, compat=%ui, level=%ui, "
"nal_bytes=%ui, ref_frames=%ui, width=%ui, height=%ui",
ctx->avc_profile, ctx->avc_compat, ctx->avc_level,
ctx->avc_nal_bytes, ctx->avc_ref_frames,
ctx->width, ctx->height);
}
