0: 主要函數調用分析
rtmpdump 是一個用來處理 RTMP 流媒體的開源工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps://.也提供 Android 版本。
最近研究了一下它內部函數調用的關系。
下面列出幾個主要的函數的調用關系。
RTMPDump用於下載RTMP流媒體的函數Download:
用於建立網絡連接(NetConnect)的函數Connect:
用於建立網絡流(NetStream)的函數
rtmpdump源代碼(Linux):http://download.csdn.net/detail/leixiaohua1020/6376561
rtmpdump源代碼(VC 2005 工程):http://download.csdn.net/detail/leixiaohua1020/6563163
1: main()函數
rtmpdump 是一個用來處理 RTMP 流媒體的工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps:// 等。之前在學習RTMP協議的時候,發現沒有講它源代碼的,只好自己分析,現在打算把自己學習的成果寫出來,可能結果不一定都對,先暫且記錄一下。
使用RTMPdump下載一個流媒體的大致流程是這樣的:
RTMP_Init();//初始化結構體 InitSockets();//初始化Socket RTMP_ParseURL();//解析輸入URL RTMP_SetupStream();//一些設置 fopen();//打開文件,准備寫入 RTMP_Connect();//建立NetConnection RTMP_ConnectStream()//建立NetStream Download();//下載函數 RTMP_Close();//關閉連接 fclose();//關閉文件 CleanupSockets();//清理Socket
其中Download()主要是使用RTMP_Read()進行下載的。
注:可以參考:RTMP流媒體播放過程
下面貼上自己注釋的RTMPDump源代碼。注意以下幾點:
1.此RTMPDump已經被移植進VC 2010 的 MFC的工程,所以main()函數已經被改名為rtmpdump(),而且參數也改了,傳進來一個MFC窗口的句柄。不過功能沒怎么改(控制台程序移植到MFC以后,main()就不是程序的入口了,所以main()名字改成什么是無所謂的)
2.里面有很多提取信息的代碼形如:rtmp.dlg->AppendCInfo("開始初始化Socket...");這些代碼是我為了獲取RTMP信息而自己加的,並不影響程序的執行。
int rtmpdump(LPVOID lpParam,int argc,char **argv) { extern char *optarg; //一定要設置,否則只能運行一次 extern int optind; optind=0; int nStatus = RD_SUCCESS; double percent = 0; double duration = 0.0; int nSkipKeyFrames = DEF_SKIPFRM; // skip this number of keyframes when resuming int bOverrideBufferTime = FALSE; // if the user specifies a buffer time override this is true int bStdoutMode = TRUE; // if true print the stream directly to stdout, messages go to stderr int bResume = FALSE; // true in resume mode uint32_t dSeek = 0; // seek position in resume mode, 0 otherwise uint32_t bufferTime = DEF_BUFTIME; // meta header and initial frame for the resume mode (they are read from the file and compared with // the stream we are trying to continue char *metaHeader = 0; uint32_t nMetaHeaderSize = 0; // video keyframe for matching char *initialFrame = 0; uint32_t nInitialFrameSize = 0; int initialFrameType = 0; // tye: audio or video AVal hostname = { 0, 0 }; AVal playpath = { 0, 0 }; AVal subscribepath = { 0, 0 }; int port = -1; int protocol = RTMP_PROTOCOL_UNDEFINED; int retries = 0; int bLiveStream = FALSE; // 是直播流嗎? then we can't seek/resume int bHashes = FALSE; // display byte counters not hashes by default long int timeout = DEF_TIMEOUT; // timeout connection after 120 seconds uint32_t dStartOffset = 0; // 非直播流搜尋點seek position in non-live mode uint32_t dStopOffset = 0; RTMP rtmp = { 0 }; AVal swfUrl = { 0, 0 }; AVal tcUrl = { 0, 0 }; AVal pageUrl = { 0, 0 }; AVal app = { 0, 0 }; AVal auth = { 0, 0 }; AVal swfHash = { 0, 0 }; uint32_t swfSize = 0; AVal flashVer = { 0, 0 }; AVal sockshost = { 0, 0 }; #ifdef CRYPTO int swfAge = 30; /* 30 days for SWF cache by default */ int swfVfy = 0; unsigned char hash[RTMP_SWF_HASHLEN]; #endif char *flvFile = 0; signal(SIGINT, sigIntHandler); signal(SIGTERM, sigIntHandler); #ifndef WIN32 signal(SIGHUP, sigIntHandler); signal(SIGPIPE, sigIntHandler); signal(SIGQUIT, sigIntHandler); #endif RTMP_debuglevel = RTMP_LOGINFO; //首先搜尋“ --quiet”選項 int index = 0; while (index < argc) { if (strcmp(argv[index], "--quiet") == 0 || strcmp(argv[index], "-q") == 0) RTMP_debuglevel = RTMP_LOGCRIT; index++; } #define RTMPDUMP_VERSION "1.0" RTMP_LogPrintf("RTMP流媒體下載 %s\n", RTMPDUMP_VERSION); RTMP_LogPrintf ("2012 雷霄驊 中國傳媒大學/信息工程學院/通信與信息系統/數字電視技術\n"); //RTMP_LogPrintf("輸入 -h 獲取命令選項\n"); RTMP_Init(&rtmp); //句柄----------------------------- rtmp.dlg=(CSpecialPRTMPDlg *)lpParam; //--------------------------------- //---------------------- rtmp.dlg->AppendCInfo("開始初始化Socket..."); //----------------------------- if (!InitSockets()) { //---------------------- rtmp.dlg->AppendCInfo("初始化Socket失敗!"); //----------------------------- RTMP_Log(RTMP_LOGERROR, "Couldn't load sockets support on your platform, exiting!"); return RD_FAILED; } //---------------------- rtmp.dlg->AppendCInfo("成功初始化Socket"); //----------------------------- /* sleep(30); */ int opt; /* struct option longopts[] = { {"help", 0, NULL, 'h'}, {"host", 1, NULL, 'n'}, {"port", 1, NULL, 'c'}, {"socks", 1, NULL, 'S'}, {"protocol", 1, NULL, 'l'}, {"playpath", 1, NULL, 'y'}, {"playlist", 0, NULL, 'Y'}, {"rtmp", 1, NULL, 'r'}, {"swfUrl", 1, NULL, 's'}, {"tcUrl", 1, NULL, 't'}, {"pageUrl", 1, NULL, 'p'}, {"app", 1, NULL, 'a'}, {"auth", 1, NULL, 'u'}, {"conn", 1, NULL, 'C'}, #ifdef CRYPTO {"swfhash", 1, NULL, 'w'}, {"swfsize", 1, NULL, 'x'}, {"swfVfy", 1, NULL, 'W'}, {"swfAge", 1, NULL, 'X'}, #endif {"flashVer", 1, NULL, 'f'}, {"live", 0, NULL, 'v'}, {"flv", 1, NULL, 'o'}, {"resume", 0, NULL, 'e'}, {"timeout", 1, NULL, 'm'}, {"buffer", 1, NULL, 'b'}, {"skip", 1, NULL, 'k'}, {"subscribe", 1, NULL, 'd'}, {"start", 1, NULL, 'A'}, {"stop", 1, NULL, 'B'}, {"token", 1, NULL, 'T'}, {"hashes", 0, NULL, '#'}, {"debug", 0, NULL, 'z'}, {"quiet", 0, NULL, 'q'}, {"verbose", 0, NULL, 'V'}, {0, 0, 0, 0} };*/ //分析命令行參數,注意用法。 //選項都是一個字母,后面有冒號的代表該選項還有相關參數 //一直循環直到獲取所有的opt while ((opt = getopt/*_long*/(argc, argv, "hVveqzr:s:t:p:a:b:f:o:u:C:n:c:l:y:Ym:k:d:A:B:T:w:x:W:X:S:#"/*, longopts, NULL*/)) != -1) { //不同的選項做不同的處理 switch (opt) { case 'h': usage(argv[0]); return RD_SUCCESS; #ifdef CRYPTO case 'w': { int res = hex2bin(optarg, &swfHash.av_val); if (res != RTMP_SWF_HASHLEN) { swfHash.av_val = NULL; RTMP_Log(RTMP_LOGWARNING, "Couldn't parse swf hash hex string, not hexstring or not %d bytes, ignoring!", RTMP_SWF_HASHLEN); } swfHash.av_len = RTMP_SWF_HASHLEN; break; } case 'x': { int size = atoi(optarg); if (size <= 0) { RTMP_Log(RTMP_LOGERROR, "SWF Size must be at least 1, ignoring\n"); } else { swfSize = size; } break; } case 'W': STR2AVAL(swfUrl, optarg); swfVfy = 1; break; case 'X': { int num = atoi(optarg); if (num < 0) { RTMP_Log(RTMP_LOGERROR, "SWF Age must be non-negative, ignoring\n"); } else { swfAge = num; } } break; #endif case 'k': nSkipKeyFrames = atoi(optarg); if (nSkipKeyFrames < 0) { RTMP_Log(RTMP_LOGERROR, "Number of keyframes skipped must be greater or equal zero, using zero!"); nSkipKeyFrames = 0; } else { RTMP_Log(RTMP_LOGDEBUG, "Number of skipped key frames for resume: %d", nSkipKeyFrames); } break; case 'b': { int32_t bt = atol(optarg); if (bt < 0) { RTMP_Log(RTMP_LOGERROR, "Buffer time must be greater than zero, ignoring the specified value %d!", bt); } else { bufferTime = bt; bOverrideBufferTime = TRUE; } break; } //直播流 case 'v': //---------------- rtmp.dlg->AppendCInfo("該RTMP的URL是一個直播流"); //---------------- bLiveStream = TRUE; // no seeking or resuming possible! break; case 'd': STR2AVAL(subscribepath, optarg); break; case 'n': STR2AVAL(hostname, optarg); break; case 'c': port = atoi(optarg); break; case 'l': protocol = atoi(optarg); if (protocol < RTMP_PROTOCOL_RTMP || protocol > RTMP_PROTOCOL_RTMPTS) { RTMP_Log(RTMP_LOGERROR, "Unknown protocol specified: %d", protocol); return RD_FAILED; } break; case 'y': STR2AVAL(playpath, optarg); break; case 'Y': RTMP_SetOpt(&rtmp, &av_playlist, (AVal *)&av_true); break; //路徑參數-r case 'r': { AVal parsedHost, parsedApp, parsedPlaypath; unsigned int parsedPort = 0; int parsedProtocol = RTMP_PROTOCOL_UNDEFINED; //解析URL。注optarg指向參數(URL) RTMP_LogPrintf("RTMP URL : %s\n",optarg); //---------------- rtmp.dlg->AppendCInfo("解析RTMP的URL..."); //---------------- if (!RTMP_ParseURL (optarg, &parsedProtocol, &parsedHost, &parsedPort, &parsedPlaypath, &parsedApp)) { //---------------- rtmp.dlg->AppendCInfo("解析RTMP的URL失敗!"); //---------------- RTMP_Log(RTMP_LOGWARNING, "無法解析 url (%s)!", optarg); } else { //---------------- rtmp.dlg->AppendCInfo("解析RTMP的URL成功"); //---------------- //把解析出來的數據賦值 if (!hostname.av_len) hostname = parsedHost; if (port == -1) port = parsedPort; if (playpath.av_len == 0 && parsedPlaypath.av_len) { playpath = parsedPlaypath; } if (protocol == RTMP_PROTOCOL_UNDEFINED) protocol = parsedProtocol; if (app.av_len == 0 && parsedApp.av_len) { app = parsedApp; } } break; } case 's': STR2AVAL(swfUrl, optarg); break; case 't': STR2AVAL(tcUrl, optarg); break; case 'p': STR2AVAL(pageUrl, optarg); break; case 'a': STR2AVAL(app, optarg); break; case 'f': STR2AVAL(flashVer, optarg); break; //指定輸出文件 case 'o': flvFile = optarg; if (strcmp(flvFile, "-")) bStdoutMode = FALSE; break; case 'e': bResume = TRUE; break; case 'u': STR2AVAL(auth, optarg); break; case 'C': { AVal av; STR2AVAL(av, optarg); if (!RTMP_SetOpt(&rtmp, &av_conn, &av)) { RTMP_Log(RTMP_LOGERROR, "Invalid AMF parameter: %s", optarg); return RD_FAILED; } } break; case 'm': timeout = atoi(optarg); break; case 'A': dStartOffset = (int) (atof(optarg) * 1000.0); break; case 'B': dStopOffset = (int) (atof(optarg) * 1000.0); break; case 'T': { AVal token; STR2AVAL(token, optarg); RTMP_SetOpt(&rtmp, &av_token, &token); } break; case '#': bHashes = TRUE; break; case 'q': RTMP_debuglevel = RTMP_LOGCRIT; break; case 'V': RTMP_debuglevel = RTMP_LOGDEBUG; break; case 'z': RTMP_debuglevel = RTMP_LOGALL; break; case 'S': STR2AVAL(sockshost, optarg); break; default: RTMP_LogPrintf("unknown option: %c\n", opt); usage(argv[0]); return RD_FAILED; break; } } if (!hostname.av_len) { RTMP_Log(RTMP_LOGERROR, "您必須指定 主機名(hostname) (--host) 或 url (-r \"rtmp://host[:port]/playpath\") 包含 a hostname"); return RD_FAILED; } if (playpath.av_len == 0) { RTMP_Log(RTMP_LOGERROR, "您必須指定 播放路徑(playpath) (--playpath) 或 url (-r \"rtmp://host[:port]/playpath\") 包含 a playpath"); return RD_FAILED; } if (protocol == RTMP_PROTOCOL_UNDEFINED) { RTMP_Log(RTMP_LOGWARNING, "您沒有指定 協議(protocol) (--protocol) 或 rtmp url (-r), 默認協議 RTMP"); protocol = RTMP_PROTOCOL_RTMP; } if (port == -1) { RTMP_Log(RTMP_LOGWARNING, "您沒有指定 端口(port) (--port) 或 rtmp url (-r), 默認端口 1935"); port = 0; } if (port == 0) { if (protocol & RTMP_FEATURE_SSL) port = 443; else if (protocol & RTMP_FEATURE_HTTP) port = 80; else port = 1935; } if (flvFile == 0) { RTMP_Log(RTMP_LOGWARNING, "請指定一個輸出文件 (-o filename), using stdout"); bStdoutMode = TRUE; } if (bStdoutMode && bResume) { RTMP_Log(RTMP_LOGWARNING, "Can't resume in stdout mode, ignoring --resume option"); bResume = FALSE; } if (bLiveStream && bResume) { RTMP_Log(RTMP_LOGWARNING, "Can't resume live stream, ignoring --resume option"); bResume = FALSE; } #ifdef CRYPTO if (swfVfy) { if (RTMP_HashSWF(swfUrl.av_val, (unsigned int *)&swfSize, hash, swfAge) == 0) { swfHash.av_val = (char *)hash; swfHash.av_len = RTMP_SWF_HASHLEN; } } if (swfHash.av_len == 0 && swfSize > 0) { RTMP_Log(RTMP_LOGWARNING, "Ignoring SWF size, supply also the hash with --swfhash"); swfSize = 0; } if (swfHash.av_len != 0 && swfSize == 0) { RTMP_Log(RTMP_LOGWARNING, "Ignoring SWF hash, supply also the swf size with --swfsize"); swfHash.av_len = 0; swfHash.av_val = NULL; } #endif if (tcUrl.av_len == 0) { char str[512] = { 0 }; tcUrl.av_len = snprintf(str, 511, "%s://%.*s:%d/%.*s", RTMPProtocolStringsLower[protocol], hostname.av_len, hostname.av_val, port, app.av_len, app.av_val); tcUrl.av_val = (char *) malloc(tcUrl.av_len + 1); strcpy(tcUrl.av_val, str); } int first = 1; // User defined seek offset if (dStartOffset > 0) { //直播流 if (bLiveStream) { RTMP_Log(RTMP_LOGWARNING, "Can't seek in a live stream, ignoring --start option"); dStartOffset = 0; } } //---------------- rtmp.dlg->AppendCInfo("開始初始化RTMP連接的參數..."); //---------------- //設置 RTMP_SetupStream(&rtmp, protocol, &hostname, port, &sockshost, &playpath, &tcUrl, &swfUrl, &pageUrl, &app, &auth, &swfHash, swfSize, &flashVer, &subscribepath, dSeek, dStopOffset, bLiveStream, timeout); //此處設置參數----------------- rtmp.dlg->AppendCInfo("成功初始化RTMP連接的參數"); //----------------------------- char *temp=(char *)malloc(MAX_URL_LENGTH); memcpy(temp,rtmp.Link.hostname.av_val,rtmp.Link.hostname.av_len); temp[rtmp.Link.hostname.av_len]='\0'; rtmp.dlg->AppendB_R_L_Info("主機名",temp); itoa(rtmp.Link.port,temp,10); rtmp.dlg->AppendB_R_L_Info("端口號",temp); memcpy(temp,rtmp.Link.app.av_val,rtmp.Link.app.av_len); temp[rtmp.Link.app.av_len]='\0'; rtmp.dlg->AppendB_R_L_Info("應用程序",temp); memcpy(temp,rtmp.Link.playpath.av_val,rtmp.Link.playpath.av_len); temp[rtmp.Link.playpath.av_len]='\0'; rtmp.dlg->AppendB_R_L_Info("路徑",temp); //----------------------------- /* Try to keep the stream moving if it pauses on us */ if (!bLiveStream && !(protocol & RTMP_FEATURE_HTTP)) rtmp.Link.lFlags |= RTMP_LF_BUFX; off_t size = 0; // ok,我們必須獲得timestamp of the last keyframe (only keyframes are seekable) / last audio frame (audio only streams) if (bResume) { //打開文件,輸出的文件(Resume) nStatus = OpenResumeFile(flvFile, &file, &size, &metaHeader, &nMetaHeaderSize, &duration); if (nStatus == RD_FAILED) goto clean; if (!file) { // file does not exist, so go back into normal mode bResume = FALSE; // we are back in fresh file mode (otherwise finalizing file won't be done) } else { //獲取最后一個關鍵幀 nStatus = GetLastKeyframe(file, nSkipKeyFrames, &dSeek, &initialFrame, &initialFrameType, &nInitialFrameSize); if (nStatus == RD_FAILED) { RTMP_Log(RTMP_LOGDEBUG, "Failed to get last keyframe."); goto clean; } if (dSeek == 0) { RTMP_Log(RTMP_LOGDEBUG, "Last keyframe is first frame in stream, switching from resume to normal mode!"); bResume = FALSE; } } } //如果輸出文件不存在 if (!file) { if (bStdoutMode) { //直接輸出到stdout file = stdout; SET_BINMODE(file); } else { //打開一個文件 //w+b 讀寫打開或建立一個二進制文件,允許讀和寫。 //----------------- rtmp.dlg->AppendCInfo("創建輸出文件..."); //----------------------------- file = fopen(flvFile, "w+b"); if (file == 0) { //----------------- rtmp.dlg->AppendCInfo("創建輸出文件失敗!"); //----------------------------- RTMP_LogPrintf("Failed to open file! %s\n", flvFile); return RD_FAILED; } rtmp.dlg->AppendCInfo("成功創建輸出文件"); } } #ifdef _DEBUG netstackdump = fopen("netstackdump", "wb"); netstackdump_read = fopen("netstackdump_read", "wb"); #endif while (!RTMP_ctrlC) { RTMP_Log(RTMP_LOGDEBUG, "Setting buffer time to: %dms", bufferTime); //設置Buffer時間 //----------------- rtmp.dlg->AppendCInfo("設置緩沖(Buffer)的時間"); //----------------------------- RTMP_SetBufferMS(&rtmp, bufferTime); //第一次執行 if (first) { first = 0; RTMP_LogPrintf("開始建立連接!\n"); //----------------- rtmp.dlg->AppendCInfo("開始建立連接(NetConnection)..."); //----------------------------- //建立連接(Connect) if (!RTMP_Connect(&rtmp, NULL)) { //----------------- rtmp.dlg->AppendCInfo("建立連接(NetConnection)失敗!"); //----------------------------- nStatus = RD_FAILED; break; } //----------------- rtmp.dlg->AppendCInfo("成功建立連接(NetConnection)"); //----------------------------- //RTMP_Log(RTMP_LOGINFO, "已鏈接..."); // User defined seek offset if (dStartOffset > 0) { // Don't need the start offset if resuming an existing file if (bResume) { RTMP_Log(RTMP_LOGWARNING, "Can't seek a resumed stream, ignoring --start option"); dStartOffset = 0; } else { dSeek = dStartOffset; } } // Calculate the length of the stream to still play if (dStopOffset > 0) { // Quit if start seek is past required stop offset if (dStopOffset <= dSeek) { RTMP_LogPrintf("Already Completed\n"); nStatus = RD_SUCCESS; break; } } //創建流(Stream)(發送connect命令消息后處理傳來的數據) itoa(rtmp.m_inChunkSize,temp,10); rtmp.dlg->AppendB_R_Info("輸入Chunk大小",temp); itoa(rtmp.m_outChunkSize,temp,10); rtmp.dlg->AppendB_R_Info("輸出Chunk大小",temp); itoa(rtmp.m_stream_id,temp,10); rtmp.dlg->AppendB_R_Info("Stream ID",temp); itoa(rtmp.m_nBufferMS,temp,10); rtmp.dlg->AppendB_R_Info("Buffer時長(ms)",temp); itoa(rtmp.m_nServerBW,temp,10); rtmp.dlg->AppendB_R_Info("ServerBW",temp); itoa(rtmp.m_nClientBW,temp,10); rtmp.dlg->AppendB_R_Info("ClientBW",temp); itoa((int)rtmp.m_fEncoding,temp,10); rtmp.dlg->AppendB_R_Info("命令消息編碼方法",temp); itoa((int)rtmp.m_fDuration,temp,10); rtmp.dlg->AppendB_R_Info("時長(s)",temp); rtmp.dlg->ShowBInfo(); free(temp); //----------------- rtmp.dlg->AppendCInfo("開始建立網絡流(NetStream)"); //----------------------------- if (!RTMP_ConnectStream(&rtmp, dSeek)) { //----------------- rtmp.dlg->AppendCInfo("建立網絡流(NetStream)失敗!"); //----------------- nStatus = RD_FAILED; break; } //----------------- rtmp.dlg->AppendCInfo("成功建立網絡流(NetStream)!"); //----------------- } else { nInitialFrameSize = 0; if (retries) { RTMP_Log(RTMP_LOGERROR, "Failed to resume the stream\n\n"); if (!RTMP_IsTimedout(&rtmp)) nStatus = RD_FAILED; else nStatus = RD_INCOMPLETE; break; } RTMP_Log(RTMP_LOGINFO, "Connection timed out, trying to resume.\n\n"); /* Did we already try pausing, and it still didn't work? */ if (rtmp.m_pausing == 3) { /* Only one try at reconnecting... */ retries = 1; dSeek = rtmp.m_pauseStamp; if (dStopOffset > 0) { if (dStopOffset <= dSeek) { RTMP_LogPrintf("Already Completed\n"); nStatus = RD_SUCCESS; break; } } if (!RTMP_ReconnectStream(&rtmp, dSeek)) { RTMP_Log(RTMP_LOGERROR, "Failed to resume the stream\n\n"); if (!RTMP_IsTimedout(&rtmp)) nStatus = RD_FAILED; else nStatus = RD_INCOMPLETE; break; } } else if (!RTMP_ToggleStream(&rtmp)) { RTMP_Log(RTMP_LOGERROR, "Failed to resume the stream\n\n"); if (!RTMP_IsTimedout(&rtmp)) nStatus = RD_FAILED; else nStatus = RD_INCOMPLETE; break; } bResume = TRUE; } //----------------- //----------------- rtmp.dlg->AppendCInfo("開始將媒體數據寫入文件"); //----------------- //下載,寫入文件 nStatus = Download(&rtmp, file, dSeek, dStopOffset, duration, bResume, metaHeader, nMetaHeaderSize, initialFrame, initialFrameType, nInitialFrameSize, nSkipKeyFrames, bStdoutMode, bLiveStream, bHashes, bOverrideBufferTime, bufferTime, &percent); free(initialFrame); initialFrame = NULL; /* If we succeeded, we're done. */ if (nStatus != RD_INCOMPLETE || !RTMP_IsTimedout(&rtmp) || bLiveStream) break; } //當下載完的時候 if (nStatus == RD_SUCCESS) { //----------------- rtmp.dlg->AppendCInfo("寫入文件完成"); //----------------- RTMP_LogPrintf("Download complete\n"); } //沒下載完的時候 else if (nStatus == RD_INCOMPLETE) { //----------------- rtmp.dlg->AppendCInfo("寫入文件可能不完整"); //----------------- RTMP_LogPrintf ("Download may be incomplete (downloaded about %.2f%%), try resuming\n", percent); } //后續清理工作 clean: //----------------- rtmp.dlg->AppendCInfo("關閉連接"); //----------------- RTMP_Log(RTMP_LOGDEBUG, "Closing connection.\n"); RTMP_Close(&rtmp); rtmp.dlg->AppendCInfo("關閉文件"); if (file != 0) fclose(file); rtmp.dlg->AppendCInfo("關閉Socket"); CleanupSockets(); #ifdef _DEBUG if (netstackdump != 0) fclose(netstackdump); if (netstackdump_read != 0) fclose(netstackdump_read); #endif return nStatus; }
其中InitSocket()代碼很簡單,初始化了Socket,如下:
// 初始化 sockets int InitSockets() { #ifdef WIN32 WORD version; WSADATA wsaData; version = MAKEWORD(1, 1); return (WSAStartup(version, &wsaData) == 0); #else return TRUE; #endif }
CleanupSockets()則更簡單:
inline void CleanupSockets() { #ifdef WIN32 WSACleanup(); #endif }
Download()函數則比較復雜:
int Download(RTMP * rtmp, // connected RTMP object FILE * file, uint32_t dSeek, uint32_t dStopOffset, double duration, int bResume, char *metaHeader, uint32_t nMetaHeaderSize, char *initialFrame, int initialFrameType, uint32_t nInitialFrameSize, int nSkipKeyFrames, int bStdoutMode, int bLiveStream, int bHashes, int bOverrideBufferTime, uint32_t bufferTime, double *percent) // percentage downloaded [out] { int32_t now, lastUpdate; int bufferSize = 64 * 1024; char *buffer = (char *) malloc(bufferSize); int nRead = 0; //long ftell(FILE *stream); //返回當前文件指針 RTMP_LogPrintf("開始下載!\n"); off_t size = ftello(file); unsigned long lastPercent = 0; //時間戳 rtmp->m_read.timestamp = dSeek; *percent = 0.0; if (rtmp->m_read.timestamp) { RTMP_Log(RTMP_LOGDEBUG, "Continuing at TS: %d ms\n", rtmp->m_read.timestamp); } //是直播 if (bLiveStream) { RTMP_LogPrintf("直播流\n"); } else { // print initial status // Workaround to exit with 0 if the file is fully (> 99.9%) downloaded if (duration > 0) { if ((double) rtmp->m_read.timestamp >= (double) duration * 999.0) { RTMP_LogPrintf("Already Completed at: %.3f sec Duration=%.3f sec\n", (double) rtmp->m_read.timestamp / 1000.0, (double) duration / 1000.0); return RD_SUCCESS; } else { *percent = ((double) rtmp->m_read.timestamp) / (duration * 1000.0) * 100.0; *percent = ((double) (int) (*percent * 10.0)) / 10.0; RTMP_LogPrintf("%s download at: %.3f kB / %.3f sec (%.1f%%)\n", bResume ? "Resuming" : "Starting", (double) size / 1024.0, (double) rtmp->m_read.timestamp / 1000.0, *percent); } } else { RTMP_LogPrintf("%s download at: %.3f kB\n", bResume ? "Resuming" : "Starting", (double) size / 1024.0); } } if (dStopOffset > 0) RTMP_LogPrintf("For duration: %.3f sec\n", (double) (dStopOffset - dSeek) / 1000.0); //各種設置參數到rtmp連接 if (bResume && nInitialFrameSize > 0) rtmp->m_read.flags |= RTMP_READ_RESUME; rtmp->m_read.initialFrameType = initialFrameType; rtmp->m_read.nResumeTS = dSeek; rtmp->m_read.metaHeader = metaHeader; rtmp->m_read.initialFrame = initialFrame; rtmp->m_read.nMetaHeaderSize = nMetaHeaderSize; rtmp->m_read.nInitialFrameSize = nInitialFrameSize; now = RTMP_GetTime(); lastUpdate = now - 1000; do { //從rtmp中把bufferSize(64k)個數據讀入buffer nRead = RTMP_Read(rtmp, buffer, bufferSize); //RTMP_LogPrintf("nRead: %d\n", nRead); if (nRead > 0) { //函數:size_t fwrite(const void* buffer,size_t size,size_t count,FILE* stream); //向文件讀入寫入一個數據塊。返回值:返回實際寫入的數據塊數目 //(1)buffer:是一個指針,對fwrite來說,是要輸出數據的地址。 //(2)size:要寫入內容的單字節數; //(3)count:要進行寫入size字節的數據項的個數; //(4)stream:目標文件指針。 //(5)返回實際寫入的數據項個數count。 //關鍵。把buffer里面的數據寫成文件 if (fwrite(buffer, sizeof(unsigned char), nRead, file) != (size_t) nRead) { RTMP_Log(RTMP_LOGERROR, "%s: Failed writing, exiting!", __FUNCTION__); free(buffer); return RD_FAILED; } //記錄已經寫入的字節數 size += nRead; //RTMP_LogPrintf("write %dbytes (%.1f kB)\n", nRead, nRead/1024.0); if (duration <= 0) // if duration unknown try to get it from the stream (onMetaData) duration = RTMP_GetDuration(rtmp); if (duration > 0) { // make sure we claim to have enough buffer time! if (!bOverrideBufferTime && bufferTime < (duration * 1000.0)) { bufferTime = (uint32_t) (duration * 1000.0) + 5000; // 再加5s以確保buffertime足夠長 RTMP_Log(RTMP_LOGDEBUG, "Detected that buffer time is less than duration, resetting to: %dms", bufferTime); //重設Buffer長度 RTMP_SetBufferMS(rtmp, bufferTime); //給服務器發送UserControl消息通知Buffer改變 RTMP_UpdateBufferMS(rtmp); } //計算百分比 *percent = ((double) rtmp->m_read.timestamp) / (duration * 1000.0) * 100.0; *percent = ((double) (int) (*percent * 10.0)) / 10.0; if (bHashes) { if (lastPercent + 1 <= *percent) { RTMP_LogStatus("#"); lastPercent = (unsigned long) *percent; } } else { //設置顯示數據的更新間隔200ms now = RTMP_GetTime(); if (abs(now - lastUpdate) > 200) { RTMP_LogStatus("\r%.3f kB / %.2f sec (%.1f%%)", (double) size / 1024.0, (double) (rtmp->m_read.timestamp) / 1000.0, *percent); lastUpdate = now; } } } else { //現在距離開機的毫秒數 now = RTMP_GetTime(); //每間隔200ms刷新一次數據 if (abs(now - lastUpdate) > 200) { if (bHashes) RTMP_LogStatus("#"); else //size為已寫入文件的字節數 RTMP_LogStatus("\r%.3f kB / %.2f sec", (double) size / 1024.0, (double) (rtmp->m_read.timestamp) / 1000.0); lastUpdate = now; } } } #ifdef _DEBUG else { RTMP_Log(RTMP_LOGDEBUG, "zero read!"); } #endif } while (!RTMP_ctrlC && nRead > -1 && RTMP_IsConnected(rtmp) && !RTMP_IsTimedout(rtmp)); free(buffer); if (nRead < 0) //nRead是讀取情況 nRead = rtmp->m_read.status; /* Final status update */ if (!bHashes) { if (duration > 0) { *percent = ((double) rtmp->m_read.timestamp) / (duration * 1000.0) * 100.0; *percent = ((double) (int) (*percent * 10.0)) / 10.0; //輸出 RTMP_LogStatus("\r%.3f kB / %.2f sec (%.1f%%)", (double) size / 1024.0, (double) (rtmp->m_read.timestamp) / 1000.0, *percent); } else { RTMP_LogStatus("\r%.3f kB / %.2f sec", (double) size / 1024.0, (double) (rtmp->m_read.timestamp) / 1000.0); } } RTMP_Log(RTMP_LOGDEBUG, "RTMP_Read returned: %d", nRead); //讀取錯誤 if (bResume && nRead == -2) { RTMP_LogPrintf("Couldn't resume FLV file, try --skip %d\n\n", nSkipKeyFrames + 1); return RD_FAILED; } //讀取正確 if (nRead == -3) return RD_SUCCESS; //沒讀完... if ((duration > 0 && *percent < 99.9) || RTMP_ctrlC || nRead < 0 || RTMP_IsTimedout(rtmp)) { return RD_INCOMPLETE; } return RD_SUCCESS; }
以上內容是我能理解到的rtmpdump.c里面的內容。
2:解析RTMP地址——RTMP_ParseURL()
之前分析了一下RTMPDump的Main()函數,其中獲取RTMP流媒體數據很重要的前提是RTMP的URL的解析。如果沒有這一步,那程序在強大也是白搭。現在來解析一下這個函數吧:RTMP_ParseURL()。
下面首先回顧一下RTMP的URL的格式:
rtmp://localhost/vod/mp4:sample1_1500kbps.f4v
“://”之前的是使用的協議類型,可以是rtmp,rtmpt,rtmps等
之后是服務器地址
再之后是端口號(可以沒有,默認1935)
在之后是application的名字,在這里是“vod”
最后是流媒體文件路徑。
關於URL就不多說了,可以查看相關文檔,下面貼上注釋后的代碼(整個parseurl.c):
/* * 本文件主要包含了對輸入URL的解析 */ #include "stdafx.h" #include <stdlib.h> #include <string.h> #include <assert.h> #include <ctype.h> #include "rtmp_sys.h" #include "log.h" /*解析URL,得到協議名稱(protocol),主機名稱(host),應用程序名稱(app) * */ int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app) { char *p, *end, *col, *ques, *slash; RTMP_Log(RTMP_LOGDEBUG, "Parsing..."); *protocol = RTMP_PROTOCOL_RTMP; *port = 0; playpath->av_len = 0; playpath->av_val = NULL; app->av_len = 0; app->av_val = NULL; /* 字符串解析 */ /* 查找“://” */ //函數原型:char *strstr(char *str1, char *str2); //功能:找出str2字符串在str1字符串中第一次出現的位置(不包括str2的串結束符)。 //返回值:返回該位置的指針,如找不到,返回空指針。 p = strstr((char *)url, "://"); if(!p) { RTMP_Log(RTMP_LOGERROR, "RTMP URL: No :// in url!"); return FALSE; } { //指針相減,返回“://”之前字符串長度len int len = (int)(p-url); //獲取使用的協議 //通過比較字符串的方法 if(len == 4 && strncasecmp(url, "rtmp", 4)==0) *protocol = RTMP_PROTOCOL_RTMP; else if(len == 5 && strncasecmp(url, "rtmpt", 5)==0) *protocol = RTMP_PROTOCOL_RTMPT; else if(len == 5 && strncasecmp(url, "rtmps", 5)==0) *protocol = RTMP_PROTOCOL_RTMPS; else if(len == 5 && strncasecmp(url, "rtmpe", 5)==0) *protocol = RTMP_PROTOCOL_RTMPE; else if(len == 5 && strncasecmp(url, "rtmfp", 5)==0) *protocol = RTMP_PROTOCOL_RTMFP; else if(len == 6 && strncasecmp(url, "rtmpte", 6)==0) *protocol = RTMP_PROTOCOL_RTMPTE; else if(len == 6 && strncasecmp(url, "rtmpts", 6)==0) *protocol = RTMP_PROTOCOL_RTMPTS; else { RTMP_Log(RTMP_LOGWARNING, "Unknown protocol!\n"); goto parsehost; } } RTMP_Log(RTMP_LOGDEBUG, "Parsed protocol: %d", *protocol); parsehost: //獲取主機名稱 //跳過“://” p+=3; /* 檢查一下主機名 */ if(*p==0) { RTMP_Log(RTMP_LOGWARNING, "No hostname in URL!"); return FALSE; } //原型:char *strchr(const char *s,char c); //功能:查找字符串s中首次出現字符c的位置 //說明:返回首次出現c的位置的指針,如果s中不存在c則返回NULL。 end = p + strlen(p);//指向結尾的指針 col = strchr(p, ':');//指向冒號(第一個)的指針 ques = strchr(p, '?');//指向問號(第一個)的指針 slash = strchr(p, '/');//指向斜杠(第一個)的指針 { int hostlen; if(slash) hostlen = slash - p; else hostlen = end - p; if(col && col -p < hostlen) hostlen = col - p; if(hostlen < 256) { host->av_val = p; host->av_len = hostlen; RTMP_Log(RTMP_LOGDEBUG, "Parsed host : %.*s", hostlen, host->av_val); } else { RTMP_Log(RTMP_LOGWARNING, "Hostname exceeds 255 characters!"); } p+=hostlen; } /* 獲取端口號 */ if(*p == ':') { unsigned int p2; p++; p2 = atoi(p); if(p2 > 65535) { RTMP_Log(RTMP_LOGWARNING, "Invalid port number!"); } else { *port = p2; } } if(!slash) { RTMP_Log(RTMP_LOGWARNING, "No application or playpath in URL!"); return TRUE; } p = slash+1; { /* 獲取應用程序(application) * * rtmp://host[:port]/app[/appinstance][/...] * application = app[/appinstance] */ char *slash2, *slash3 = NULL;//指向第二個斜杠,第三個斜杠的指針 int applen, appnamelen; slash2 = strchr(p, '/');//指向第二個斜杠 if(slash2) slash3 = strchr(slash2+1, '/');//指向第三個斜杠,注意slash2之所以+1是因為讓其后移一位 applen = end-p; /* ondemand, pass all parameters as app */ appnamelen = applen; /* ondemand length */ if(ques && strstr(p, "slist=")) { /* whatever it is, the '?' and slist= means we need to use everything as app and parse plapath from slist= */ appnamelen = ques-p; } else if(strncmp(p, "ondemand/", 9)==0) { /* app = ondemand/foobar, only pass app=ondemand */ applen = 8; appnamelen = 8; } else { /* app!=ondemand, so app is app[/appinstance] */ if(slash3) appnamelen = slash3-p; else if(slash2) appnamelen = slash2-p; applen = appnamelen; } app->av_val = p; app->av_len = applen; RTMP_Log(RTMP_LOGDEBUG, "Parsed app : %.*s", applen, p); p += appnamelen; } if (*p == '/') p++; if (end-p) { AVal av = {p, end-p}; RTMP_ParsePlaypath(&av, playpath); } return TRUE; } /* * 從URL中獲取播放路徑(playpath)。播放路徑是URL中“rtmp://host:port/app/”后面的部分 * * 獲取FMS能夠識別的播放路徑 * mp4 流: 前面添加 "mp4:", 刪除擴展名 * mp3 流: 前面添加 "mp3:", 刪除擴展名 * flv 流: 刪除擴展名 */ void RTMP_ParsePlaypath(AVal *in, AVal *out) { int addMP4 = 0; int addMP3 = 0; int subExt = 0; const char *playpath = in->av_val; const char *temp, *q, *ext = NULL; const char *ppstart = playpath; char *streamname, *destptr, *p; int pplen = in->av_len; out->av_val = NULL; out->av_len = 0; if ((*ppstart == '?') && (temp=strstr(ppstart, "slist=")) != 0) { ppstart = temp+6; pplen = strlen(ppstart); temp = strchr(ppstart, '&'); if (temp) { pplen = temp-ppstart; } } q = strchr(ppstart, '?'); if (pplen >= 4) { if (q) ext = q-4; else ext = &ppstart[pplen-4]; if ((strncmp(ext, ".f4v", 4) == 0) || (strncmp(ext, ".mp4", 4) == 0)) { addMP4 = 1; subExt = 1; /* Only remove .flv from rtmp URL, not slist params */ } else if ((ppstart == playpath) && (strncmp(ext, ".flv", 4) == 0)) { subExt = 1; } else if (strncmp(ext, ".mp3", 4) == 0) { addMP3 = 1; subExt = 1; } } streamname = (char *)malloc((pplen+4+1)*sizeof(char)); if (!streamname) return; destptr = streamname; if (addMP4) { if (strncmp(ppstart, "mp4:", 4)) { strcpy(destptr, "mp4:"); destptr += 4; } else { subExt = 0; } } else if (addMP3) { if (strncmp(ppstart, "mp3:", 4)) { strcpy(destptr, "mp3:"); destptr += 4; } else { subExt = 0; } } for (p=(char *)ppstart; pplen >0;) { /* skip extension */ if (subExt && p == ext) { p += 4; pplen -= 4; continue; } if (*p == '%') { unsigned int c; sscanf(p+1, "%02x", &c); *destptr++ = c; pplen -= 3; p += 3; } else { *destptr++ = *p++; pplen--; } } *destptr = '\0'; out->av_val = streamname; out->av_len = destptr - streamname; }
3: AMF編碼
之前分析了RTMPDump(libRTMP)解析RTMP的URL的源代碼,在這里簡單分析一下其AMF編碼方面的源碼。
AMF編碼廣泛用於Adobe公司的Flash以及Flex系統中。由於RTMP協議也是Adobe公司的,所以它也使用AMF進行通信。具體 AMF是怎么使用的在這里就不做詳細討論了。RTMPDump如果想實現RTMP協議的流媒體的下載保存,就必須可以編碼和解碼AMF格式的數據。
amf.c是RTMPDump解析RTMP協議的函數存放的地方,在這里貼上其源代碼。先不做詳細解釋了,以后有機會再補充。
#include "stdafx.h" /* 本文件主要包含了對AMF對象的操作 *------------------------------------- *AMF數據類型: *Type Byte code *Number 0x00 *Boolean 0x01 *String 0x02 *Object 0x03 *MovieClip 0x04 *Null 0x05 *Undefined 0x06 *Reference 0x07 *MixedArray 0x08 *EndOfObject 0x09 *Array 0x0a *Date 0x0b *LongString 0x0c *Unsupported 0x0d *Recordset 0x0e *XML 0x0f *TypedObject (Class instance) 0x10 *AMF3 data 0×11 *-------------------------------------- *應用舉例: *0.Number這里指的是double類型,數據用8字節表示,比如十六進制00 40 10 00 00 00 00 00 00就表示的是一個double數4.0 *1.Boolean對應的是.net中的bool類型,數據使用1字節表示,和C語言差不多,使用00表示false,使用01表示true。比如十六進制01 01就表示true。 *2.String相當於.net中的string類型,String所占用的空間有1個類型標識字節和2個表示字符串UTF8長度的字節加上字符串UTF8格式的內容組成。 * 比如十六進制03 00 08 73 68 61 6E 67 67 75 61表示的就是字符串,該字符串長8字節,字符串內容為73 68 61 6E 67 67 75 61,對應的就是“shanggua”。 *3.Object在對應的就是Hashtable,內容由UTF8字符串作為Key,其他AMF類型作為Value,該對象由3個字節:00 00 09來表示結束。 *5.Null就是空對象,該對象只占用一個字節,那就是Null對象標識0x05。 *6.Undefined 也是只占用一個字節0x06。 *8.MixedArray相當於Hashtable,與3不同的是該對象定義了Hashtable的大小。 */ #include <string.h> #include <assert.h> #include <stdlib.h> #include "rtmp_sys.h" #include "amf.h" #include "log.h" #include "bytes.h" static const AMFObjectProperty AMFProp_Invalid = { {0, 0}, AMF_INVALID }; static const AVal AV_empty = { 0, 0 }; //大端Big-Endian //低地址存放最高有效位(MSB),既高位字節排放在內存的低地址端,低位字節排放在內存的高地址端。 //符合人腦邏輯,與計算機邏輯不同 //網絡字節序 Network Order:TCP/IP各層協議將字節序定義為Big-Endian,因此TCP/IP協議中使 //用的字節序通常稱之為網絡字節序。 //主機序 Host Orader:它遵循Little-Endian規則。所以當兩台主機之間要通過TCP/IP協議進行通 //信的時候就需要調用相應的函數進行主機序(Little-Endian)和網絡序(Big-Endian)的轉換。 /*AMF數據采用 Big-Endian(大端模式),主機采用Little-Endian(小端模式) */ unsigned short AMF_DecodeInt16(const char *data) { unsigned char *c = (unsigned char *) data; unsigned short val; val = (c[0] << 8) | c[1];//轉換 return val; } unsigned int AMF_DecodeInt24(const char *data) { unsigned char *c = (unsigned char *) data; unsigned int val; val = (c[0] << 16) | (c[1] << 8) | c[2]; return val; } unsigned int AMF_DecodeInt32(const char *data) { unsigned char *c = (unsigned char *)data; unsigned int val; val = (c[0] << 24) | (c[1] << 16) | (c[2] << 8) | c[3]; return val; } void AMF_DecodeString(const char *data, AVal *bv) { bv->av_len = AMF_DecodeInt16(data); bv->av_val = (bv->av_len > 0) ? (char *)data + 2 : NULL; } void AMF_DecodeLongString(const char *data, AVal *bv) { bv->av_len = AMF_DecodeInt32(data); bv->av_val = (bv->av_len > 0) ? (char *)data + 4 : NULL; } double AMF_DecodeNumber(const char *data) { double dVal; #if __FLOAT_WORD_ORDER == __BYTE_ORDER #if __BYTE_ORDER == __BIG_ENDIAN memcpy(&dVal, data, 8); #elif __BYTE_ORDER == __LITTLE_ENDIAN unsigned char *ci, *co; ci = (unsigned char *)data; co = (unsigned char *)&dVal; co[0] = ci[7]; co[1] = ci[6]; co[2] = ci[5]; co[3] = ci[4]; co[4] = ci[3]; co[5] = ci[2]; co[6] = ci[1]; co[7] = ci[0]; #endif #else #if __BYTE_ORDER == __LITTLE_ENDIAN /* __FLOAT_WORD_ORER == __BIG_ENDIAN */ unsigned char *ci, *co; ci = (unsigned char *)data; co = (unsigned char *)&dVal; co[0] = ci[3]; co[1] = ci[2]; co[2] = ci[1]; co[3] = ci[0]; co[4] = ci[7]; co[5] = ci[6]; co[6] = ci[5]; co[7] = ci[4]; #else /* __BYTE_ORDER == __BIG_ENDIAN && __FLOAT_WORD_ORER == __LITTLE_ENDIAN */ unsigned char *ci, *co; ci = (unsigned char *)data; co = (unsigned char *)&dVal; co[0] = ci[4]; co[1] = ci[5]; co[2] = ci[6]; co[3] = ci[7]; co[4] = ci[0]; co[5] = ci[1]; co[6] = ci[2]; co[7] = ci[3]; #endif #endif return dVal; } int AMF_DecodeBoolean(const char *data) { return *data != 0; } char * AMF_EncodeInt16(char *output, char *outend, short nVal) { if (output+2 > outend) return NULL; output[1] = nVal & 0xff; output[0] = nVal >> 8; return output+2; } //3字節的int數據進行AMF編碼,AMF采用大端模式 char * AMF_EncodeInt24(char *output, char *outend, int nVal) { if (output+3 > outend) return NULL; //倒過來 output[2] = nVal & 0xff; output[1] = nVal >> 8; output[0] = nVal >> 16; //返回指針指向編碼后數據的尾部 return output+3; } char * AMF_EncodeInt32(char *output, char *outend, int nVal) { if (output+4 > outend) return NULL; output[3] = nVal & 0xff; output[2] = nVal >> 8; output[1] = nVal >> 16; output[0] = nVal >> 24; return output+4; } char * AMF_EncodeString(char *output, char *outend, const AVal *bv) { if ((bv->av_len < 65536 && output + 1 + 2 + bv->av_len > outend) || output + 1 + 4 + bv->av_len > outend) return NULL; if (bv->av_len < 65536) { *output++ = AMF_STRING; output = AMF_EncodeInt16(output, outend, bv->av_len); } else { *output++ = AMF_LONG_STRING; output = AMF_EncodeInt32(output, outend, bv->av_len); } memcpy(output, bv->av_val, bv->av_len); output += bv->av_len; return output; } char * AMF_EncodeNumber(char *output, char *outend, double dVal) { if (output+1+8 > outend) return NULL; *output++ = AMF_NUMBER; /* type: Number */ #if __FLOAT_WORD_ORDER == __BYTE_ORDER #if __BYTE_ORDER == __BIG_ENDIAN memcpy(output, &dVal, 8); #elif __BYTE_ORDER == __LITTLE_ENDIAN { unsigned char *ci, *co; ci = (unsigned char *)&dVal; co = (unsigned char *)output; co[0] = ci[7]; co[1] = ci[6]; co[2] = ci[5]; co[3] = ci[4]; co[4] = ci[3]; co[5] = ci[2]; co[6] = ci[1]; co[7] = ci[0]; } #endif #else #if __BYTE_ORDER == __LITTLE_ENDIAN /* __FLOAT_WORD_ORER == __BIG_ENDIAN */ { unsigned char *ci, *co; ci = (unsigned char *)&dVal; co = (unsigned char *)output; co[0] = ci[3]; co[1] = ci[2]; co[2] = ci[1]; co[3] = ci[0]; co[4] = ci[7]; co[5] = ci[6]; co[6] = ci[5]; co[7] = ci[4]; } #else /* __BYTE_ORDER == __BIG_ENDIAN && __FLOAT_WORD_ORER == __LITTLE_ENDIAN */ { unsigned char *ci, *co; ci = (unsigned char *)&dVal; co = (unsigned char *)output; co[0] = ci[4]; co[1] = ci[5]; co[2] = ci[6]; co[3] = ci[7]; co[4] = ci[0]; co[5] = ci[1]; co[6] = ci[2]; co[7] = ci[3]; } #endif #endif return output+8; } char * AMF_EncodeBoolean(char *output, char *outend, int bVal) { if (output+2 > outend) return NULL; *output++ = AMF_BOOLEAN; *output++ = bVal ? 0x01 : 0x00; return output; } char * AMF_EncodeNamedString(char *output, char *outend, const AVal *strName, const AVal *strValue) { if (output+2+strName->av_len > outend) return NULL; output = AMF_EncodeInt16(output, outend, strName->av_len); memcpy(output, strName->av_val, strName->av_len); output += strName->av_len; return AMF_EncodeString(output, outend, strValue); } char * AMF_EncodeNamedNumber(char *output, char *outend, const AVal *strName, double dVal) { if (output+2+strName->av_len > outend) return NULL; output = AMF_EncodeInt16(output, outend, strName->av_len); memcpy(output, strName->av_val, strName->av_len); output += strName->av_len; return AMF_EncodeNumber(output, outend, dVal); } char * AMF_EncodeNamedBoolean(char *output, char *outend, const AVal *strName, int bVal) { if (output+2+strName->av_len > outend) return NULL; output = AMF_EncodeInt16(output, outend, strName->av_len); memcpy(output, strName->av_val, strName->av_len); output += strName->av_len; return AMF_EncodeBoolean(output, outend, bVal); } void AMFProp_GetName(AMFObjectProperty *prop, AVal *name) { *name = prop->p_name; } void AMFProp_SetName(AMFObjectProperty *prop, AVal *name) { prop->p_name = *name; } AMFDataType AMFProp_GetType(AMFObjectProperty *prop) { return prop->p_type; } double AMFProp_GetNumber(AMFObjectProperty *prop) { return prop->p_vu.p_number; } int AMFProp_GetBoolean(AMFObjectProperty *prop) { return prop->p_vu.p_number != 0; } void AMFProp_GetString(AMFObjectProperty *prop, AVal *str) { *str = prop->p_vu.p_aval; } void AMFProp_GetObject(AMFObjectProperty *prop, AMFObject *obj) { *obj = prop->p_vu.p_object; } int AMFProp_IsValid(AMFObjectProperty *prop) { return prop->p_type != AMF_INVALID; } char * AMFProp_Encode(AMFObjectProperty *prop, char *pBuffer, char *pBufEnd) { if (prop->p_type == AMF_INVALID) return NULL; if (prop->p_type != AMF_NULL && pBuffer + prop->p_name.av_len + 2 + 1 >= pBufEnd) return NULL; if (prop->p_type != AMF_NULL && prop->p_name.av_len) { *pBuffer++ = prop->p_name.av_len >> 8; *pBuffer++ = prop->p_name.av_len & 0xff; memcpy(pBuffer, prop->p_name.av_val, prop->p_name.av_len); pBuffer += prop->p_name.av_len; } switch (prop->p_type) { case AMF_NUMBER: pBuffer = AMF_EncodeNumber(pBuffer, pBufEnd, prop->p_vu.p_number); break; case AMF_BOOLEAN: pBuffer = AMF_EncodeBoolean(pBuffer, pBufEnd, prop->p_vu.p_number != 0); break; case AMF_STRING: pBuffer = AMF_EncodeString(pBuffer, pBufEnd, &prop->p_vu.p_aval); break; case AMF_NULL: if (pBuffer+1 >= pBufEnd) return NULL; *pBuffer++ = AMF_NULL; break; case AMF_OBJECT: pBuffer = AMF_Encode(&prop->p_vu.p_object, pBuffer, pBufEnd); break; default: RTMP_Log(RTMP_LOGERROR, "%s, invalid type. %d", __FUNCTION__, prop->p_type); pBuffer = NULL; }; return pBuffer; } #define AMF3_INTEGER_MAX 268435455 #define AMF3_INTEGER_MIN -268435456 int AMF3ReadInteger(const char *data, int32_t *valp) { int i = 0; int32_t val = 0; while (i <= 2) { /* handle first 3 bytes */ if (data[i] & 0x80) { /* byte used */ val <<= 7; /* shift up */ val |= (data[i] & 0x7f); /* add bits */ i++; } else { break; } } if (i > 2) { /* use 4th byte, all 8bits */ val <<= 8; val |= data[3]; /* range check */ if (val > AMF3_INTEGER_MAX) val -= (1 << 29); } else { /* use 7bits of last unparsed byte (0xxxxxxx) */ val <<= 7; val |= data[i]; } *valp = val; return i > 2 ? 4 : i + 1; } int AMF3ReadString(const char *data, AVal *str) { int32_t ref = 0; int len; assert(str != 0); len = AMF3ReadInteger(data, &ref); data += len; if ((ref & 0x1) == 0) { /* reference: 0xxx */ uint32_t refIndex = (ref >> 1); RTMP_Log(RTMP_LOGDEBUG, "%s, string reference, index: %d, not supported, ignoring!", __FUNCTION__, refIndex); return len; } else { uint32_t nSize = (ref >> 1); str->av_val = (char *)data; str->av_len = nSize; return len + nSize; } return len; } int AMF3Prop_Decode(AMFObjectProperty *prop, const char *pBuffer, int nSize, int bDecodeName) { int nOriginalSize = nSize; AMF3DataType type; prop->p_name.av_len = 0; prop->p_name.av_val = NULL; if (nSize == 0 || !pBuffer) { RTMP_Log(RTMP_LOGDEBUG, "empty buffer/no buffer pointer!"); return -1; } /* decode name */ if (bDecodeName) { AVal name; int nRes = AMF3ReadString(pBuffer, &name); if (name.av_len <= 0) return nRes; prop->p_name = name; pBuffer += nRes; nSize -= nRes; } /* decode */ type = (AMF3DataType) *pBuffer++; nSize--; switch (type) { case AMF3_UNDEFINED: case AMF3_NULL: prop->p_type = AMF_NULL; break; case AMF3_FALSE: prop->p_type = AMF_BOOLEAN; prop->p_vu.p_number = 0.0; break; case AMF3_TRUE: prop->p_type = AMF_BOOLEAN; prop->p_vu.p_number = 1.0; break; case AMF3_INTEGER: { int32_t res = 0; int len = AMF3ReadInteger(pBuffer, &res); prop->p_vu.p_number = (double)res; prop->p_type = AMF_NUMBER; nSize -= len; break; } case AMF3_DOUBLE: if (nSize < 8) return -1; prop->p_vu.p_number = AMF_DecodeNumber(pBuffer); prop->p_type = AMF_NUMBER; nSize -= 8; break; case AMF3_STRING: case AMF3_XML_DOC: case AMF3_XML: { int len = AMF3ReadString(pBuffer, &prop->p_vu.p_aval); prop->p_type = AMF_STRING; nSize -= len; break; } case AMF3_DATE: { int32_t res = 0; int len = AMF3ReadInteger(pBuffer, &res); nSize -= len; pBuffer += len; if ((res & 0x1) == 0) { /* reference */ uint32_t nIndex = (res >> 1); RTMP_Log(RTMP_LOGDEBUG, "AMF3_DATE reference: %d, not supported!", nIndex); } else { if (nSize < 8) return -1; prop->p_vu.p_number = AMF_DecodeNumber(pBuffer); nSize -= 8; prop->p_type = AMF_NUMBER; } break; } case AMF3_OBJECT: { int nRes = AMF3_Decode(&prop->p_vu.p_object, pBuffer, nSize, TRUE); if (nRes == -1) return -1; nSize -= nRes; prop->p_type = AMF_OBJECT; break; } case AMF3_ARRAY: case AMF3_BYTE_ARRAY: default: RTMP_Log(RTMP_LOGDEBUG, "%s - AMF3 unknown/unsupported datatype 0x%02x, @0x%08X", __FUNCTION__, (unsigned char)(*pBuffer), pBuffer); return -1; } return nOriginalSize - nSize; } //對AMF數據類型解析 int AMFProp_Decode(AMFObjectProperty *prop, const char *pBuffer, int nSize, int bDecodeName) { int nOriginalSize = nSize; int nRes; prop->p_name.av_len = 0; prop->p_name.av_val = NULL; if (nSize == 0 || !pBuffer) { RTMP_Log(RTMP_LOGDEBUG, "%s: Empty buffer/no buffer pointer!", __FUNCTION__); return -1; } if (bDecodeName && nSize < 4) { /* at least name (length + at least 1 byte) and 1 byte of data */ RTMP_Log(RTMP_LOGDEBUG, "%s: Not enough data for decoding with name, less than 4 bytes!", __FUNCTION__); return -1; } if (bDecodeName) { unsigned short nNameSize = AMF_DecodeInt16(pBuffer); if (nNameSize > nSize - 2) { RTMP_Log(RTMP_LOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2", __FUNCTION__, nNameSize, nSize); return -1; } AMF_DecodeString(pBuffer, &prop->p_name); nSize -= 2 + nNameSize; pBuffer += 2 + nNameSize; } if (nSize == 0) { return -1; } nSize--; prop->p_type = (AMFDataType) *pBuffer++; switch (prop->p_type) { //Number數據類型 case AMF_NUMBER: if (nSize < 8) return -1; prop->p_vu.p_number = AMF_DecodeNumber(pBuffer); nSize -= 8; break; //Boolean數據類型 case AMF_BOOLEAN: if (nSize < 1) return -1; prop->p_vu.p_number = (double)AMF_DecodeBoolean(pBuffer); nSize--; break; //String數據類型 case AMF_STRING: { unsigned short nStringSize = AMF_DecodeInt16(pBuffer); if (nSize < (long)nStringSize + 2) return -1; AMF_DecodeString(pBuffer, &prop->p_vu.p_aval); nSize -= (2 + nStringSize); break; } //Object數據類型 case AMF_OBJECT: { int nRes = AMF_Decode(&prop->p_vu.p_object, pBuffer, nSize, TRUE); if (nRes == -1) return -1; nSize -= nRes; break; } case AMF_MOVIECLIP: { RTMP_Log(RTMP_LOGERROR, "AMF_MOVIECLIP reserved!"); return -1; break; } case AMF_NULL: case AMF_UNDEFINED: case AMF_UNSUPPORTED: prop->p_type = AMF_NULL; break; case AMF_REFERENCE: { RTMP_Log(RTMP_LOGERROR, "AMF_REFERENCE not supported!"); return -1; break; } case AMF_ECMA_ARRAY: { nSize -= 4; /* next comes the rest, mixed array has a final 0x000009 mark and names, so its an object */ nRes = AMF_Decode(&prop->p_vu.p_object, pBuffer + 4, nSize, TRUE); if (nRes == -1) return -1; nSize -= nRes; prop->p_type = AMF_OBJECT; break; } case AMF_OBJECT_END: { return -1; break; } case AMF_STRICT_ARRAY: { unsigned int nArrayLen = AMF_DecodeInt32(pBuffer); nSize -= 4; nRes = AMF_DecodeArray(&prop->p_vu.p_object, pBuffer + 4, nSize, nArrayLen, FALSE); if (nRes == -1) return -1; nSize -= nRes; prop->p_type = AMF_OBJECT; break; } case AMF_DATE: { RTMP_Log(RTMP_LOGDEBUG, "AMF_DATE"); if (nSize < 10) return -1; prop->p_vu.p_number = AMF_DecodeNumber(pBuffer); prop->p_UTCoffset = AMF_DecodeInt16(pBuffer + 8); nSize -= 10; break; } case AMF_LONG_STRING: { unsigned int nStringSize = AMF_DecodeInt32(pBuffer); if (nSize < (long)nStringSize + 4) return -1; AMF_DecodeLongString(pBuffer, &prop->p_vu.p_aval); nSize -= (4 + nStringSize); prop->p_type = AMF_STRING; break; } case AMF_RECORDSET: { RTMP_Log(RTMP_LOGERROR, "AMF_RECORDSET reserved!"); return -1; break; } case AMF_XML_DOC: { RTMP_Log(RTMP_LOGERROR, "AMF_XML_DOC not supported!"); return -1; break; } case AMF_TYPED_OBJECT: { RTMP_Log(RTMP_LOGERROR, "AMF_TYPED_OBJECT not supported!"); return -1; break; } case AMF_AVMPLUS: { int nRes = AMF3_Decode(&prop->p_vu.p_object, pBuffer, nSize, TRUE); if (nRes == -1) return -1; nSize -= nRes; prop->p_type = AMF_OBJECT; break; } default: RTMP_Log(RTMP_LOGDEBUG, "%s - unknown datatype 0x%02x, @0x%08X", __FUNCTION__, prop->p_type, pBuffer - 1); return -1; } return nOriginalSize - nSize; } void AMFProp_Dump(AMFObjectProperty *prop) { char strRes[256]; char str[256]; AVal name; if (prop->p_type == AMF_INVALID) { RTMP_Log(RTMP_LOGDEBUG, "Property: INVALID"); return; } if (prop->p_type == AMF_NULL) { RTMP_Log(RTMP_LOGDEBUG, "Property: NULL"); return; } if (prop->p_name.av_len) { name = prop->p_name; } else { name.av_val = "no-name."; name.av_len = sizeof("no-name.") - 1; } if (name.av_len > 18) name.av_len = 18; snprintf(strRes, 255, "Name: %18.*s, ", name.av_len, name.av_val); if (prop->p_type == AMF_OBJECT) { RTMP_Log(RTMP_LOGDEBUG, "Property: <%sOBJECT>", strRes); AMF_Dump(&prop->p_vu.p_object); return; } switch (prop->p_type) { case AMF_NUMBER: snprintf(str, 255, "NUMBER:\t%.2f", prop->p_vu.p_number); break; case AMF_BOOLEAN: snprintf(str, 255, "BOOLEAN:\t%s", prop->p_vu.p_number != 0.0 ? "TRUE" : "FALSE"); break; case AMF_STRING: snprintf(str, 255, "STRING:\t%.*s", prop->p_vu.p_aval.av_len, prop->p_vu.p_aval.av_val); break; case AMF_DATE: snprintf(str, 255, "DATE:\ttimestamp: %.2f, UTC offset: %d", prop->p_vu.p_number, prop->p_UTCoffset); break; default: snprintf(str, 255, "INVALID TYPE 0x%02x", (unsigned char)prop->p_type); } RTMP_Log(RTMP_LOGDEBUG, "Property: <%s%s>", strRes, str); } void AMFProp_Reset(AMFObjectProperty *prop) { if (prop->p_type == AMF_OBJECT) AMF_Reset(&prop->p_vu.p_object); else { prop->p_vu.p_aval.av_len = 0; prop->p_vu.p_aval.av_val = NULL; } prop->p_type = AMF_INVALID; } /* AMFObject */ char * AMF_Encode(AMFObject *obj, char *pBuffer, char *pBufEnd) { int i; if (pBuffer+4 >= pBufEnd) return NULL; *pBuffer++ = AMF_OBJECT; for (i = 0; i < obj->o_num; i++) { char *res = AMFProp_Encode(&obj->o_props[i], pBuffer, pBufEnd); if (res == NULL) { RTMP_Log(RTMP_LOGERROR, "AMF_Encode - failed to encode property in index %d", i); break; } else { pBuffer = res; } } if (pBuffer + 3 >= pBufEnd) return NULL; /* no room for the end marker */ pBuffer = AMF_EncodeInt24(pBuffer, pBufEnd, AMF_OBJECT_END); return pBuffer; } int AMF_DecodeArray(AMFObject *obj, const char *pBuffer, int nSize, int nArrayLen, int bDecodeName) { int nOriginalSize = nSize; int bError = FALSE; obj->o_num = 0; obj->o_props = NULL; while (nArrayLen > 0) { AMFObjectProperty prop; int nRes; nArrayLen--; nRes = AMFProp_Decode(&prop, pBuffer, nSize, bDecodeName); if (nRes == -1) bError = TRUE; else { nSize -= nRes; pBuffer += nRes; AMF_AddProp(obj, &prop); } } if (bError) return -1; return nOriginalSize - nSize; } int AMF3_Decode(AMFObject *obj, const char *pBuffer, int nSize, int bAMFData) { int nOriginalSize = nSize; int32_t ref; int len; obj->o_num = 0; obj->o_props = NULL; if (bAMFData) { if (*pBuffer != AMF3_OBJECT) RTMP_Log(RTMP_LOGERROR, "AMF3 Object encapsulated in AMF stream does not start with AMF3_OBJECT!"); pBuffer++; nSize--; } ref = 0; len = AMF3ReadInteger(pBuffer, &ref); pBuffer += len; nSize -= len; if ((ref & 1) == 0) { /* object reference, 0xxx */ uint32_t objectIndex = (ref >> 1); RTMP_Log(RTMP_LOGDEBUG, "Object reference, index: %d", objectIndex); } else /* object instance */ { int32_t classRef = (ref >> 1); AMF3ClassDef cd = { {0, 0} }; AMFObjectProperty prop; if ((classRef & 0x1) == 0) { /* class reference */ uint32_t classIndex = (classRef >> 1); RTMP_Log(RTMP_LOGDEBUG, "Class reference: %d", classIndex); } else { int32_t classExtRef = (classRef >> 1); int i; cd.cd_externalizable = (classExtRef & 0x1) == 1; cd.cd_dynamic = ((classExtRef >> 1) & 0x1) == 1; cd.cd_num = classExtRef >> 2; /* class name */ len = AMF3ReadString(pBuffer, &cd.cd_name); nSize -= len; pBuffer += len; /*std::string str = className; */ RTMP_Log(RTMP_LOGDEBUG, "Class name: %s, externalizable: %d, dynamic: %d, classMembers: %d", cd.cd_name.av_val, cd.cd_externalizable, cd.cd_dynamic, cd.cd_num); for (i = 0; i < cd.cd_num; i++) { AVal memberName; len = AMF3ReadString(pBuffer, &memberName); RTMP_Log(RTMP_LOGDEBUG, "Member: %s", memberName.av_val); AMF3CD_AddProp(&cd, &memberName); nSize -= len; pBuffer += len; } } /* add as referencable object */ if (cd.cd_externalizable) { int nRes; AVal name = AVC("DEFAULT_ATTRIBUTE"); RTMP_Log(RTMP_LOGDEBUG, "Externalizable, TODO check"); nRes = AMF3Prop_Decode(&prop, pBuffer, nSize, FALSE); if (nRes == -1) RTMP_Log(RTMP_LOGDEBUG, "%s, failed to decode AMF3 property!", __FUNCTION__); else { nSize -= nRes; pBuffer += nRes; } AMFProp_SetName(&prop, &name); AMF_AddProp(obj, &prop); } else { int nRes, i; for (i = 0; i < cd.cd_num; i++) /* non-dynamic */ { nRes = AMF3Prop_Decode(&prop, pBuffer, nSize, FALSE); if (nRes == -1) RTMP_Log(RTMP_LOGDEBUG, "%s, failed to decode AMF3 property!", __FUNCTION__); AMFProp_SetName(&prop, AMF3CD_GetProp(&cd, i)); AMF_AddProp(obj, &prop); pBuffer += nRes; nSize -= nRes; } if (cd.cd_dynamic) { int len = 0; do { nRes = AMF3Prop_Decode(&prop, pBuffer, nSize, TRUE); AMF_AddProp(obj, &prop); pBuffer += nRes; nSize -= nRes; len = prop.p_name.av_len; } while (len > 0); } } RTMP_Log(RTMP_LOGDEBUG, "class object!"); } return nOriginalSize - nSize; } //解AMF編碼的Object數據類型 int AMF_Decode(AMFObject *obj, const char *pBuffer, int nSize, int bDecodeName) { int nOriginalSize = nSize; int bError = FALSE; /* if there is an error while decoding - try to at least find the end mark AMF_OBJECT_END */ obj->o_num = 0; obj->o_props = NULL; while (nSize > 0) { AMFObjectProperty prop; int nRes; if (nSize >=3 && AMF_DecodeInt24(pBuffer) == AMF_OBJECT_END) { nSize -= 3; bError = FALSE; break; } if (bError) { RTMP_Log(RTMP_LOGERROR, "DECODING ERROR, IGNORING BYTES UNTIL NEXT KNOWN PATTERN!"); nSize--; pBuffer++; continue; } //解Object里的Property nRes = AMFProp_Decode(&prop, pBuffer, nSize, bDecodeName); if (nRes == -1) bError = TRUE; else { nSize -= nRes; pBuffer += nRes; AMF_AddProp(obj, &prop); } } if (bError) return -1; return nOriginalSize - nSize; } void AMF_AddProp(AMFObject *obj, const AMFObjectProperty *prop) { if (!(obj->o_num & 0x0f)) obj->o_props = (AMFObjectProperty *) realloc(obj->o_props, (obj->o_num + 16) * sizeof(AMFObjectProperty)); obj->o_props[obj->o_num++] = *prop; } int AMF_CountProp(AMFObject *obj) { return obj->o_num; } AMFObjectProperty * AMF_GetProp(AMFObject *obj, const AVal *name, int nIndex) { if (nIndex >= 0) { if (nIndex <= obj->o_num) return &obj->o_props[nIndex]; } else { int n; for (n = 0; n < obj->o_num; n++) { if (AVMATCH(&obj->o_props[n].p_name, name)) return &obj->o_props[n]; } } return (AMFObjectProperty *)&AMFProp_Invalid; } void AMF_Dump(AMFObject *obj) { int n; RTMP_Log(RTMP_LOGDEBUG, "(object begin)"); for (n = 0; n < obj->o_num; n++) { AMFProp_Dump(&obj->o_props[n]); } RTMP_Log(RTMP_LOGDEBUG, "(object end)"); } void AMF_Reset(AMFObject *obj) { int n; for (n = 0; n < obj->o_num; n++) { AMFProp_Reset(&obj->o_props[n]); } free(obj->o_props); obj->o_props = NULL; obj->o_num = 0; } /* AMF3ClassDefinition */ void AMF3CD_AddProp(AMF3ClassDef *cd, AVal *prop) { if (!(cd->cd_num & 0x0f)) cd->cd_props = (AVal *)realloc(cd->cd_props, (cd->cd_num + 16) * sizeof(AVal)); cd->cd_props[cd->cd_num++] = *prop; } AVal * AMF3CD_GetProp(AMF3ClassDef *cd, int nIndex) { if (nIndex >= cd->cd_num) return (AVal *)&AV_empty; return &cd->cd_props[nIndex]; }
可參考文件:
AMF3 中文版介紹:http://download.csdn.net/detail/leixiaohua1020/6389977
4: 連接第一步——握手(Hand Shake)
在這里分析一下RTMPdump(libRTMP)連接到支持RTMP協議的服務器的第一步:握手(Hand Shake)。
RTMP連接的過程曾經分析過:RTMP流媒體播放過程
在這里不再細說,分析一下位於handshake.h文件里面實現握手(HandShake)功能的函數:
注意:handshake.h里面代碼量很大,但是很多代碼都是為了處理RTMP的加密版協議的,例如rtmps;因此在這里就不做過多分析了,我們只考慮普通的RTMP協議。
static int HandShake(RTMP * r, int FP9HandShake) { int i, offalg = 0; int dhposClient = 0; int digestPosClient = 0; int encrypted = r->Link.protocol & RTMP_FEATURE_ENC; RC4_handle keyIn = 0; RC4_handle keyOut = 0; int32_t *ip; uint32_t uptime; uint8_t clientbuf[RTMP_SIG_SIZE + 4], *clientsig=clientbuf+4; uint8_t serversig[RTMP_SIG_SIZE], client2[RTMP_SIG_SIZE], *reply; uint8_t type; getoff *getdh = NULL, *getdig = NULL; if (encrypted || r->Link.SWFSize) FP9HandShake = TRUE; else //普通的 FP9HandShake = FALSE; r->Link.rc4keyIn = r->Link.rc4keyOut = 0; if (encrypted) { clientsig[-1] = 0x06; /* 0x08 is RTMPE as well */ offalg = 1; } else //0x03代表RTMP協議的版本(客戶端要求的) //數組竟然能有“-1”下標 //C0中的字段(1B) clientsig[-1] = 0x03; uptime = htonl(RTMP_GetTime()); //void *memcpy(void *dest, const void *src, int n); //由src指向地址為起始地址的連續n個字節的數據復制到以dest指向地址為起始地址的空間內 //把uptime的前4字節(其實一共就4字節)數據拷貝到clientsig指向的地址中 //C1中的字段(4B) memcpy(clientsig, &uptime, 4); if (FP9HandShake) { /* set version to at least 9.0.115.0 */ if (encrypted) { clientsig[4] = 128; clientsig[6] = 3; } else { clientsig[4] = 10; clientsig[6] = 45; } clientsig[5] = 0; clientsig[7] = 2; RTMP_Log(RTMP_LOGDEBUG, "%s: Client type: %02X", __FUNCTION__, clientsig[-1]); getdig = digoff[offalg]; getdh = dhoff[offalg]; } else { //void *memset(void *s, int ch, size_t n);將s中前n個字節替換為ch並返回s; //將clientsig[4]開始的4個字節替換為0 //這是C1的字段 memset(&clientsig[4], 0, 4); } /* generate random data */ #ifdef _DEBUG //將clientsig+8開始的1528個字節替換為0(這是一種簡單的方法) //這是C1中的random字段 memset(clientsig+8, 0, RTMP_SIG_SIZE-8); #else //實際中使用rand()循環生成1528字節的偽隨機數 ip = (int32_t *)(clientsig+8); for (i = 2; i < RTMP_SIG_SIZE/4; i++) *ip++ = rand(); #endif /* set handshake digest */ if (FP9HandShake) { if (encrypted) { /* generate Diffie-Hellmann parameters */ r->Link.dh = DHInit(1024); if (!r->Link.dh) { RTMP_Log(RTMP_LOGERROR, "%s: Couldn't initialize Diffie-Hellmann!", __FUNCTION__); return FALSE; } dhposClient = getdh(clientsig, RTMP_SIG_SIZE); RTMP_Log(RTMP_LOGDEBUG, "%s: DH pubkey position: %d", __FUNCTION__, dhposClient); if (!DHGenerateKey((DH *)r->Link.dh)) { RTMP_Log(RTMP_LOGERROR, "%s: Couldn't generate Diffie-Hellmann public key!", __FUNCTION__); return FALSE; } if (!DHGetPublicKey((DH *)r->Link.dh, &clientsig[dhposClient], 128)) { RTMP_Log(RTMP_LOGERROR, "%s: Couldn't write public key!", __FUNCTION__); return FALSE; } } digestPosClient = getdig(clientsig, RTMP_SIG_SIZE); /* reuse this value in verification */ RTMP_Log(RTMP_LOGDEBUG, "%s: Client digest offset: %d", __FUNCTION__, digestPosClient); CalculateDigest(digestPosClient, clientsig, GenuineFPKey, 30, &clientsig[digestPosClient]); RTMP_Log(RTMP_LOGDEBUG, "%s: Initial client digest: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, clientsig + digestPosClient, SHA256_DIGEST_LENGTH); } #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "Clientsig: "); RTMP_LogHex(RTMP_LOGDEBUG, clientsig, RTMP_SIG_SIZE); #endif //發送數據報C0+C1 //從clientsig-1開始發,長度1536+1,兩個包合並 //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。發送握手數據C0+C1"); //----------------------------- if (!WriteN(r, (char *)clientsig-1, RTMP_SIG_SIZE + 1)) return FALSE; //讀取數據報,長度1,存入type //是服務器的S0,表示服務器使用的RTMP版本 if (ReadN(r, (char *)&type, 1) != 1) /* 0x03 or 0x06 */ return FALSE; //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。接收握手數據S0"); //----------------------------- RTMP_Log(RTMP_LOGDEBUG, "%s: Type Answer : %02X", __FUNCTION__, type); //客戶端要求的版本和服務器提供的版本不同 if (type != clientsig[-1]) RTMP_Log(RTMP_LOGWARNING, "%s: Type mismatch: client sent %d, server answered %d", __FUNCTION__, clientsig[-1], type); //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。成功接收握手數據S0,服務器和客戶端版本相同"); //----------------------------- //客戶端和服務端隨機序列長度是否相同 //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。接收握手數據S1"); //----------------------------- if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) return FALSE; /* decode server response */ //把serversig的前四個字節賦值給uptime memcpy(&uptime, serversig, 4); //大端轉小端 uptime = ntohl(uptime); RTMP_Log(RTMP_LOGDEBUG, "%s: Server Uptime : %d", __FUNCTION__, uptime); RTMP_Log(RTMP_LOGDEBUG, "%s: FMS Version : %d.%d.%d.%d", __FUNCTION__, serversig[4], serversig[5], serversig[6], serversig[7]); if (FP9HandShake && type == 3 && !serversig[4]) FP9HandShake = FALSE; #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "Server signature:"); RTMP_LogHex(RTMP_LOGDEBUG, serversig, RTMP_SIG_SIZE); #endif if (FP9HandShake) { uint8_t digestResp[SHA256_DIGEST_LENGTH]; uint8_t *signatureResp = NULL; /* we have to use this signature now to find the correct algorithms for getting the digest and DH positions */ int digestPosServer = getdig(serversig, RTMP_SIG_SIZE); if (!VerifyDigest(digestPosServer, serversig, GenuineFMSKey, 36)) { RTMP_Log(RTMP_LOGWARNING, "Trying different position for server digest!"); offalg ^= 1; getdig = digoff[offalg]; getdh = dhoff[offalg]; digestPosServer = getdig(serversig, RTMP_SIG_SIZE); if (!VerifyDigest(digestPosServer, serversig, GenuineFMSKey, 36)) { RTMP_Log(RTMP_LOGERROR, "Couldn't verify the server digest"); /* continuing anyway will probably fail */ return FALSE; } } /* generate SWFVerification token (SHA256 HMAC hash of decompressed SWF, key are the last 32 bytes of the server handshake) */ if (r->Link.SWFSize) { const char swfVerify[] = { 0x01, 0x01 }; char *vend = r->Link.SWFVerificationResponse+sizeof(r->Link.SWFVerificationResponse); memcpy(r->Link.SWFVerificationResponse, swfVerify, 2); AMF_EncodeInt32(&r->Link.SWFVerificationResponse[2], vend, r->Link.SWFSize); AMF_EncodeInt32(&r->Link.SWFVerificationResponse[6], vend, r->Link.SWFSize); HMACsha256(r->Link.SWFHash, SHA256_DIGEST_LENGTH, &serversig[RTMP_SIG_SIZE - SHA256_DIGEST_LENGTH], SHA256_DIGEST_LENGTH, (uint8_t *)&r->Link.SWFVerificationResponse[10]); } /* do Diffie-Hellmann Key exchange for encrypted RTMP */ if (encrypted) { /* compute secret key */ uint8_t secretKey[128] = { 0 }; int len, dhposServer; dhposServer = getdh(serversig, RTMP_SIG_SIZE); RTMP_Log(RTMP_LOGDEBUG, "%s: Server DH public key offset: %d", __FUNCTION__, dhposServer); len = DHComputeSharedSecretKey((DH *)r->Link.dh, &serversig[dhposServer], 128, secretKey); if (len < 0) { RTMP_Log(RTMP_LOGDEBUG, "%s: Wrong secret key position!", __FUNCTION__); return FALSE; } RTMP_Log(RTMP_LOGDEBUG, "%s: Secret key: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, secretKey, 128); InitRC4Encryption(secretKey, (uint8_t *) & serversig[dhposServer], (uint8_t *) & clientsig[dhposClient], &keyIn, &keyOut); } reply = client2; #ifdef _DEBUG memset(reply, 0xff, RTMP_SIG_SIZE); #else ip = (int32_t *)reply; for (i = 0; i < RTMP_SIG_SIZE/4; i++) *ip++ = rand(); #endif /* calculate response now */ signatureResp = reply+RTMP_SIG_SIZE-SHA256_DIGEST_LENGTH; HMACsha256(&serversig[digestPosServer], SHA256_DIGEST_LENGTH, GenuineFPKey, sizeof(GenuineFPKey), digestResp); HMACsha256(reply, RTMP_SIG_SIZE - SHA256_DIGEST_LENGTH, digestResp, SHA256_DIGEST_LENGTH, signatureResp); /* some info output */ RTMP_Log(RTMP_LOGDEBUG, "%s: Calculated digest key from secure key and server digest: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, digestResp, SHA256_DIGEST_LENGTH); #ifdef FP10 if (type == 8 ) { uint8_t *dptr = digestResp; uint8_t *sig = signatureResp; /* encrypt signatureResp */ for (i=0; i<SHA256_DIGEST_LENGTH; i+=8) rtmpe8_sig(sig+i, sig+i, dptr[i] % 15); } #if 0 else if (type == 9)) { uint8_t *dptr = digestResp; uint8_t *sig = signatureResp; /* encrypt signatureResp */ for (i=0; i<SHA256_DIGEST_LENGTH; i+=8) rtmpe9_sig(sig+i, sig+i, dptr[i] % 15); } #endif #endif RTMP_Log(RTMP_LOGDEBUG, "%s: Client signature calculated:", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, signatureResp, SHA256_DIGEST_LENGTH); } else { //直接賦值 reply = serversig; #if 0 uptime = htonl(RTMP_GetTime()); memcpy(reply+4, &uptime, 4); #endif } #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "%s: Sending handshake response: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, reply, RTMP_SIG_SIZE); #endif //把reply中的1536字節數據發送出去 //對應C2 //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。發送握手數據C2"); //----------------------------- if (!WriteN(r, (char *)reply, RTMP_SIG_SIZE)) return FALSE; /* 2nd part of handshake */ //讀取1536字節數據到serversig //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。讀取握手數據S2"); //----------------------------- if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) return FALSE; #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "%s: 2nd handshake: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, serversig, RTMP_SIG_SIZE); #endif if (FP9HandShake) { uint8_t signature[SHA256_DIGEST_LENGTH]; uint8_t digest[SHA256_DIGEST_LENGTH]; if (serversig[4] == 0 && serversig[5] == 0 && serversig[6] == 0 && serversig[7] == 0) { RTMP_Log(RTMP_LOGDEBUG, "%s: Wait, did the server just refuse signed authentication?", __FUNCTION__); } RTMP_Log(RTMP_LOGDEBUG, "%s: Server sent signature:", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, &serversig[RTMP_SIG_SIZE - SHA256_DIGEST_LENGTH], SHA256_DIGEST_LENGTH); /* verify server response */ HMACsha256(&clientsig[digestPosClient], SHA256_DIGEST_LENGTH, GenuineFMSKey, sizeof(GenuineFMSKey), digest); HMACsha256(serversig, RTMP_SIG_SIZE - SHA256_DIGEST_LENGTH, digest, SHA256_DIGEST_LENGTH, signature); /* show some information */ RTMP_Log(RTMP_LOGDEBUG, "%s: Digest key: ", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, digest, SHA256_DIGEST_LENGTH); #ifdef FP10 if (type == 8 ) { uint8_t *dptr = digest; uint8_t *sig = signature; /* encrypt signature */ for (i=0; i<SHA256_DIGEST_LENGTH; i+=8) rtmpe8_sig(sig+i, sig+i, dptr[i] % 15); } #if 0 else if (type == 9) { uint8_t *dptr = digest; uint8_t *sig = signature; /* encrypt signatureResp */ for (i=0; i<SHA256_DIGEST_LENGTH; i+=8) rtmpe9_sig(sig+i, sig+i, dptr[i] % 15); } #endif #endif RTMP_Log(RTMP_LOGDEBUG, "%s: Signature calculated:", __FUNCTION__); RTMP_LogHex(RTMP_LOGDEBUG, signature, SHA256_DIGEST_LENGTH); if (memcmp (signature, &serversig[RTMP_SIG_SIZE - SHA256_DIGEST_LENGTH], SHA256_DIGEST_LENGTH) != 0) { RTMP_Log(RTMP_LOGWARNING, "%s: Server not genuine Adobe!", __FUNCTION__); return FALSE; } else { RTMP_Log(RTMP_LOGDEBUG, "%s: Genuine Adobe Flash Media Server", __FUNCTION__); } if (encrypted) { char buff[RTMP_SIG_SIZE]; /* set keys for encryption from now on */ r->Link.rc4keyIn = keyIn; r->Link.rc4keyOut = keyOut; /* update the keystreams */ if (r->Link.rc4keyIn) { RC4_encrypt((RC4_KEY *)r->Link.rc4keyIn, RTMP_SIG_SIZE, (uint8_t *) buff); } if (r->Link.rc4keyOut) { RC4_encrypt((RC4_KEY *)r->Link.rc4keyOut, RTMP_SIG_SIZE, (uint8_t *) buff); } } } else { //int memcmp(const void *buf1, const void *buf2, unsigned int count); 當buf1=buf2時,返回值=0 //比較serversig和clientsig是否相等 //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。比較握手數據簽名"); //----------------------------- if (memcmp(serversig, clientsig, RTMP_SIG_SIZE) != 0) { //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。握手數據簽名不匹配!"); //----------------------------- RTMP_Log(RTMP_LOGWARNING, "%s: client signature does not match!", __FUNCTION__); } } //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。握手成功"); //----------------------------- RTMP_Log(RTMP_LOGDEBUG, "%s: Handshaking finished....", __FUNCTION__); return TRUE; }
5: 建立一個流媒體連接 (NetConnection部分)
本篇文章分析一下RTMPdump里面的建立一個流媒體連接過程中的函數調用。
之前已經簡單分析過流媒體鏈接的建立過程:
而且分析過其函數調用過程:
在這里就不詳細敘述了,其實主要是這兩個函數:
RTMP_Connect()
RTMP_ConnectStream()
第一個函數用於建立RTMP中的NetConnection,第二個函數用於建立RTMP中的NetStream。一般是先調用第一個函數,然后調用第二個函數。
下面先來看看RTMP_Connect():
注意:貼上去的源代碼是修改過的RTMPdump,我添加了輸出信息的代碼,形如:r->dlg->AppendCInfo("建立連接:第0次連接。開始建立Socket連接");改代碼不影響程序運行,可忽略。
RTMP_Connect()
//連接 int RTMP_Connect(RTMP *r, RTMPPacket *cp) { //Socket結構體 struct sockaddr_in service; if (!r->Link.hostname.av_len) return FALSE; memset(&service, 0, sizeof(struct sockaddr_in)); service.sin_family = AF_INET; if (r->Link.socksport) { //加入地址信息 /* 使用SOCKS連接 */ if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport)) return FALSE; } else { /* 直接連接 */ if (!add_addr_info(&service, &r->Link.hostname, r->Link.port)) return FALSE; } //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。開始建立Socket連接"); //----------------------------- if (!RTMP_Connect0(r, (struct sockaddr *)&service)){ r->dlg->AppendCInfo("建立連接:第0次連接。建立Socket連接失敗"); return FALSE; } //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。建立Socket連接成功"); //----------------------------- r->m_bSendCounter = TRUE; return RTMP_Connect1(r, cp); }
我們可以看出調用了兩個函數RTMP_Connect0()以及RTMP_Connect1()。按照按先后順序看看吧:
RTMP_Connect0()
//sockaddr是Linux網絡編程的地址結構體一種,其定義如下: //struct sockaddr{ // unsigned short sa_family; // char sa_data[14]; //}; //說明:sa_family:是地址家族,也稱作,協議族,一般都是“AF_xxx”的形式。通常大多用的是都是AF_INET。 // sa_data:是14字節協議地址。 //有時不使用sockaddr,而使用sockaddr_in(多用在windows)(等價) //struct sockaddr_in { // short int sin_family; /* Address family */ // unsigned short int sin_port; /* Port number */ // struct in_addr sin_addr; /* Internet address */ // unsigned char sin_zero[8]; /* Same size as struct sockaddr */ //}; //union { // struct{ // unsigned char s_b1,s_b2,s_b3,s_b4; // } S_un_b; // struct { // unsigned short s_w1,s_w2; // } S_un_w; // unsigned long S_addr; // } S_un; //} in_addr; //第0次連接,建立Socket連接 int RTMP_Connect0(RTMP *r, struct sockaddr * service) { int on = 1; r->m_sb.sb_timedout = FALSE; r->m_pausing = 0; r->m_fDuration = 0.0; //創建一個Socket,並把Socket序號賦值給相應變量 //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。create一個Socket"); //----------------------------- r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (r->m_sb.sb_socket != -1) { //定義函數 int connect (int sockfd,struct sockaddr * serv_addr,int addrlen); //函數說明 connect()用來將參數sockfd 的Socket(剛剛創建)連至參數serv_addr //指定的網絡地址。參數addrlen為sockaddr的結構長度。 //連接 RTMP_LogPrintf("建立Socket連接!\n"); //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。connect該Socket"); //----------------------------- if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0) { //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。connect該Socket失敗"); //----------------------------- int err = GetSockError(); RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket. %d (%s)", __FUNCTION__, err, strerror(err)); RTMP_Close(r); return FALSE; } //----------------- r->dlg->AppendCInfo("建立連接:第0次連接。connect該Socket成功"); //----------------------------- //指定了端口號。注:這不是必需的。 if (r->Link.socksport) { RTMP_Log(RTMP_LOGDEBUG, "%s ... SOCKS negotiation", __FUNCTION__); //談判?發送數據報以進行談判?! if (!SocksNegotiate(r)) { RTMP_Log(RTMP_LOGERROR, "%s, SOCKS negotiation failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } } } else { RTMP_Log(RTMP_LOGERROR, "%s, failed to create socket. Error: %d", __FUNCTION__, GetSockError()); return FALSE; } /* set timeout */ //超時 { SET_RCVTIMEO(tv, r->Link.timeout); if (setsockopt (r->m_sb.sb_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv))) { RTMP_Log(RTMP_LOGERROR, "%s, Setting socket timeout to %ds failed!", __FUNCTION__, r->Link.timeout); } } setsockopt(r->m_sb.sb_socket, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)); return TRUE; }
可見RTMP_Connect0()主要用於建立Socket連接,並未開始真正的建立RTMP連接。
再來看看RTMP_Connect1(),這是真正建立RTMP連接的函數:
RTMP_Connect1()
//第1次連接,從握手開始 int RTMP_Connect1(RTMP *r, RTMPPacket *cp) { if (r->Link.protocol & RTMP_FEATURE_SSL) { #if defined(CRYPTO) && !defined(NO_SSL) TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl); TLS_setfd((SSL *)r->m_sb.sb_ssl, r->m_sb.sb_socket); if (TLS_connect((SSL *)r->m_sb.sb_ssl) < 0) { RTMP_Log(RTMP_LOGERROR, "%s, TLS_Connect failed", __FUNCTION__); RTMP_Close(r); return FALSE; } #else RTMP_Log(RTMP_LOGERROR, "%s, no SSL/TLS support", __FUNCTION__); RTMP_Close(r); return FALSE; #endif } //使用HTTP if (r->Link.protocol & RTMP_FEATURE_HTTP) { r->m_msgCounter = 1; r->m_clientID.av_val = NULL; r->m_clientID.av_len = 0; HTTP_Post(r, RTMPT_OPEN, "", 1); HTTP_read(r, 1); r->m_msgCounter = 0; } RTMP_Log(RTMP_LOGDEBUG, "%s, ... connected, handshaking", __FUNCTION__); //握手---------------- r->dlg->AppendCInfo("建立連接:第1次連接。開始握手(HandShake)"); //----------------------------- RTMP_LogPrintf("開始握手(HandShake)!\n"); if (!HandShake(r, TRUE)) { //---------------- r->dlg->AppendCInfo("建立連接:第1次連接。握手(HandShake)失敗!"); //----------------------------- RTMP_Log(RTMP_LOGERROR, "%s, handshake failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } //---------------- r->dlg->AppendCInfo("建立連接:第1次連接。握手(HandShake)成功"); //----------------------------- RTMP_LogPrintf("握手(HandShake)完畢!\n"); RTMP_Log(RTMP_LOGDEBUG, "%s, handshaked", __FUNCTION__); //發送“connect”命令-------------- //---------------- r->dlg->AppendCInfo("建立連接:第1次連接。開始建立網絡連接(NetConnection)"); //----------------------------- RTMP_LogPrintf("開始建立網絡連接(NetConnection)!\n"); //---------------- r->dlg->AppendCInfo("發送數據。消息 命令 (typeID=20) (Connect)。"); //----------------------------- if (!SendConnectPacket(r, cp)) { //---------------- r->dlg->AppendCInfo("建立連接:第1次連接。建立網絡連接(NetConnection)失敗!"); //----------------------------- RTMP_Log(RTMP_LOGERROR, "%s, RTMP connect failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } //---------------- r->dlg->AppendCInfo("建立連接:第1次連接。建立網絡連接(NetConnection)成功"); //----------------------------- RTMP_LogPrintf("命令消息“Connect”發送完畢!\n"); return TRUE; }
該函數做了以下事情:
HandShake()完成握手,之前已經分析過:RTMPdump 源代碼分析 4: 連接第一步——握手(Hand Shake)
SendConnectPacket()發送包含“connect”命令的數據報,用於開始建立RTMP連接。具體該函數是怎么調用的,以后有機會再進行分析。
至此RTMP_Connect()分析完畢。
6: 建立一個流媒體連接 (NetStream部分 1)
前文已經分析了 RTMPdump中建立一個NetConnection的過程:RTMPdump 源代碼分析 5: 建立一個流媒體連接 (NetConnection部分)
多余的話不多說,下面先來看看RTMP_ConnectStream(),該函數主要用於在NetConnection基礎上建立一個NetStream。
RTMP_ConnectStream()
//創建流 int RTMP_ConnectStream(RTMP *r, int seekTime) { RTMPPacket packet = { 0 }; /* seekTime was already set by SetupStream / SetupURL. * This is only needed by ReconnectStream. */ if (seekTime > 0) r->Link.seekTime = seekTime; r->m_mediaChannel = 0; while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet)) { if (RTMPPacket_IsReady(&packet)) { if (!packet.m_nBodySize) continue; if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) || (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) || (packet.m_packetType == RTMP_PACKET_TYPE_INFO)) { RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring."); RTMPPacket_Free(&packet); continue; } //處理Packet! //---------------- r->dlg->AppendCInfo("建立網絡流:處理收到的數據。開始處理收到的數據"); //----------------------------- RTMP_ClientPacket(r, &packet); //---------------- r->dlg->AppendCInfo("建立網絡流:處理收到的數據。處理完畢,清除數據。"); //----------------------------- RTMPPacket_Free(&packet); } } return r->m_bPlaying; }
乍一看,這個函數的代碼量好像挺少的,實際上不然,其復雜度還是挺高的。我覺得比RTMP_Connect()要復雜不少。
其關鍵就在於這個While()循環。首先,循環的三個條件都滿足,就能進行循環。只有出錯或者建立網絡流(NetStream)的步驟完成后,才能跳出循環。
在這個函數中有兩個函數尤為重要:
RTMP_ReadPacket()
RTMP_ClientPacket()
第一個函數的作用是讀取通過Socket接收下來的消息(Message)包,但是不做任何處理。第二個函數則是處理消息(Message),並做出響應。這兩個函數結合,就可以完成接收消息然后響應消息的步驟。
下面來開一下RTMP_ReadPacket():
//讀取收下來的Chunk int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet) { //packet 存讀取完后的的數據 //Chunk Header最大值18 uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 }; //header 指向的是從Socket中收下來的數據 char *header = (char *)hbuf; int nSize, hSize, nToRead, nChunk; int didAlloc = FALSE; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket); //收下來的數據存入hbuf if (ReadN(r, (char *)hbuf, 1) == 0) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__); return FALSE; } //塊類型fmt packet->m_headerType = (hbuf[0] & 0xc0) >> 6; //塊流ID(2-63) packet->m_nChannel = (hbuf[0] & 0x3f); header++; //塊流ID第1字節為0時,塊流ID占2個字節 if (packet->m_nChannel == 0) { if (ReadN(r, (char *)&hbuf[1], 1) != 1) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte", __FUNCTION__); return FALSE; } //計算塊流ID(64-319) packet->m_nChannel = hbuf[1]; packet->m_nChannel += 64; header++; } //塊流ID第1字節為0時,塊流ID占3個字節 else if (packet->m_nChannel == 1) { int tmp; if (ReadN(r, (char *)&hbuf[1], 2) != 2) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte", __FUNCTION__); return FALSE; } tmp = (hbuf[2] << 8) + hbuf[1]; //計算塊流ID(64-65599) packet->m_nChannel = tmp + 64; RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel); header += 2; } //ChunkHeader的大小(4種) nSize = packetSize[packet->m_headerType]; if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */ packet->m_hasAbsTimestamp = TRUE; //11字節的完整ChunkMsgHeader的TimeStamp是絕對值 else if (nSize < RTMP_LARGE_HEADER_SIZE) { /* using values from the last message of this channel */ if (r->m_vecChannelsIn[packet->m_nChannel]) memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel], sizeof(RTMPPacket)); } nSize--; if (nSize > 0 && ReadN(r, header, nSize) != nSize) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x", __FUNCTION__, (unsigned int)hbuf[0]); return FALSE; } hSize = nSize + (header - (char *)hbuf); if (nSize >= 3) { //TimeStamp(注意 BigEndian to SmallEndian)(11,7,3字節首部都有) packet->m_nTimeStamp = AMF_DecodeInt24(header); /*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */ //消息長度(11,7字節首部都有) if (nSize >= 6) { packet->m_nBodySize = AMF_DecodeInt24(header + 3); packet->m_nBytesRead = 0; RTMPPacket_Free(packet); //(11,7字節首部都有) if (nSize > 6) { //Msg type ID packet->m_packetType = header[6]; //Msg Stream ID if (nSize == 11) packet->m_nInfoField2 = DecodeInt32LE(header + 7); } } //Extend TimeStamp if (packet->m_nTimeStamp == 0xffffff) { if (ReadN(r, header + nSize, 4) != 4) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp", __FUNCTION__); return FALSE; } packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize); hSize += 4; } } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize); if (packet->m_nBodySize > 0 && packet->m_body == NULL) { if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)) { RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__); return FALSE; } didAlloc = TRUE; packet->m_headerType = (hbuf[0] & 0xc0) >> 6; } nToRead = packet->m_nBodySize - packet->m_nBytesRead; nChunk = r->m_inChunkSize; if (nToRead < nChunk) nChunk = nToRead; /* Does the caller want the raw chunk? */ if (packet->m_chunk) { packet->m_chunk->c_headerSize = hSize; memcpy(packet->m_chunk->c_header, hbuf, hSize); packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead; packet->m_chunk->c_chunkSize = nChunk; } if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu", __FUNCTION__, packet->m_nBodySize); return FALSE; } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk); packet->m_nBytesRead += nChunk; /* keep the packet as ref for other packets on this channel */ if (!r->m_vecChannelsIn[packet->m_nChannel]) r->m_vecChannelsIn[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket)); //讀取完畢 if (RTMPPacket_IsReady(packet)) { /* make packet's timestamp absolute */ if (!packet->m_hasAbsTimestamp) packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */ r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp; /* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */ /* arrives and requests to re-use some info (small packet header) */ r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL; r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0; r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */ } else { packet->m_body = NULL; /* so it won't be erased on free */ } return TRUE; }
在這里要注意的是,接收下來的實際上是塊(Chunk)而不是消息(Message),因為消息(Message)在網絡上傳播的時候,實際上要分割成塊(Chunk)。
這里解析的就是塊(Chunk)
可參考:RTMP規范簡單分析
具體的解析代碼我就不多說了,直接參考RTMP協議規范就可以了,一個字節一個字節的解析就OK了。
7: 建立一個流媒體連接 (NetStream部分 2)
上回說到,有兩個函數尤為重要:
RTMP_ReadPacket()
RTMP_ClientPacket()
而且分析了第一個函數。現在我們再來看看第二個函數吧。第二個函數的主要作用是:處理消息(Message),並做出響應。
先把帶注釋的代碼貼上:
//處理接收到的Chunk int RTMP_ClientPacket(RTMP *r, RTMPPacket *packet) { int bHasMediaPacket = 0; switch (packet->m_packetType) { //RTMP消息類型ID=1,設置塊大小 case 0x01: /* chunk size */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Set Chunk Size (typeID=1)。"); //----------------------------- RTMP_LogPrintf("處理消息 Set Chunk Size (typeID=1)\n"); HandleChangeChunkSize(r, packet); break; //RTMP消息類型ID=3,致謝 case 0x03: /* bytes read report */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__); break; //RTMP消息類型ID=4,用戶控制 case 0x04: /* ctrl */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 User Control (typeID=4)。"); //----------------------------- RTMP_LogPrintf("處理消息 User Control (typeID=4)\n"); HandleCtrl(r, packet); break; //RTMP消息類型ID=5 case 0x05: /* server bw */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Window Acknowledgement Size (typeID=5)。"); //----------------------------- RTMP_LogPrintf("處理消息 Window Acknowledgement Size (typeID=5)\n"); HandleServerBW(r, packet); break; //RTMP消息類型ID=6 case 0x06: /* client bw */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Set Peer Bandwidth (typeID=6)。"); //----------------------------- RTMP_LogPrintf("處理消息 Set Peer Bandwidth (typeID=6)\n"); HandleClientBW(r, packet); break; //RTMP消息類型ID=8,音頻數據 case 0x08: /* audio data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleAudio(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; //RTMP消息類型ID=9,視頻數據 case 0x09: /* video data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleVideo(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; //RTMP消息類型ID=15,AMF3編碼,忽略 case 0x0F: /* flex stream send */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex stream send, size %lu bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; //RTMP消息類型ID=16,AMF3編碼,忽略 case 0x10: /* flex shared object */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex shared object, size %lu bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; //RTMP消息類型ID=17,AMF3編碼,忽略 case 0x11: /* flex message */ { RTMP_Log(RTMP_LOGDEBUG, "%s, flex message, size %lu bytes, not fully supported", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ /* some DEBUG code */ #if 0 RTMP_LIB_AMFObject obj; int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1); if(nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__); /*return; */ } obj.Dump(); #endif if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1) bHasMediaPacket = 2; break; } //RTMP消息類型ID=18,AMF0編碼,數據消息 case 0x12: /* metadata (notify) */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %lu bytes", __FUNCTION__, packet->m_nBodySize); //處理元數據,暫時注釋 /* if (HandleMetadata(r, packet->m_body, packet->m_nBodySize)) bHasMediaPacket = 1; break; */ //RTMP消息類型ID=19,AMF0編碼,忽略 case 0x13: RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring", __FUNCTION__); break; //RTMP消息類型ID=20,AMF0編碼,命令消息 //處理命令消息! case 0x14: //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 命令 (AMF0編碼) (typeID=20)。"); //----------------------------- /* invoke */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %lu bytes", __FUNCTION__, packet->m_nBodySize); RTMP_LogPrintf("處理命令消息 (typeID=20,AMF0編碼)\n"); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1) bHasMediaPacket = 2; break; //RTMP消息類型ID=22 case 0x16: { /* go through FLV packets and handle metadata packets */ unsigned int pos = 0; uint32_t nTimeStamp = packet->m_nTimeStamp; while (pos + 11 < packet->m_nBodySize) { uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */ if (pos + 11 + dataSize + 4 > packet->m_nBodySize) { RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!"); break; } if (packet->m_body[pos] == 0x12) { HandleMetadata(r, packet->m_body + pos + 11, dataSize); } else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9) { nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4); nTimeStamp |= (packet->m_body[pos + 7] << 24); } pos += (11 + dataSize + 4); } if (!r->m_pausing) r->m_mediaStamp = nTimeStamp; /* FLV tag(s) */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */ bHasMediaPacket = 1; break; } default: RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__, packet->m_packetType); #ifdef _DEBUG RTMP_LogHex(RTMP_LOGDEBUG, (const uint8_t *)packet->m_body, packet->m_nBodySize); #endif } return bHasMediaPacket; }
里面注釋的比較多,可以看出,大體的思路是,根據接收到的消息(Message)類型的不同,做出不同的響應。例如收到的消息類型為0x01,那么就是設置塊(Chunk)大小的協議,那么就調用相應的函數進行處理。
因此,本函數可以說是程序的靈魂,收到的各種命令消息都要經過本函數的判斷決定調用哪個函數進行相應的處理。
在這里注意一下消息類型為0x14的消息,即消息類型ID為20的消息,是AMF0編碼的命令消息。這在RTMP連接中是非常常見的,比如說各種控制命令:播放,暫停,停止等等。我們來仔細看看它的調用。
可以發現它調用了HandleInvoke()函數來處理服務器發來的AMF0編碼的命令,來看看細節:
/* Returns 0 for OK/Failed/error, 1 for 'Stop or Complete' */ static int HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize) { AMFObject obj; AVal method; int txn; int ret = 0, nRes; if (body[0] != 0x02) /* make sure it is a string method name we start with */ { RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", __FUNCTION__); return 0; } nRes = AMF_Decode(&obj, body, nBodySize, FALSE); if (nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return 0; } AMF_Dump(&obj); AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method); txn = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); if (AVMATCH(&method, &av__result)) { AVal methodInvoked = {0}; int i; for (i=0; i<r->m_numCalls; i++) { if (r->m_methodCalls[i].num == txn) { methodInvoked = r->m_methodCalls[i].name; AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE); break; } } if (!methodInvoked.av_val) { RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %d without matching request", __FUNCTION__, txn); goto leave; } //---------------- char temp_str[100]; sprintf(temp_str,"接收數據。消息 %s 的 Result",methodInvoked.av_val); r->dlg->AppendCInfo(temp_str); //----------------------------- RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, methodInvoked.av_val); if (AVMATCH(&methodInvoked, &av_connect)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","Result (Connect)"); //----------------------------- if (r->Link.token.av_len) { AMFObjectProperty p; if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p)) { DecodeTEA(&r->Link.token, &p.p_vu.p_aval); SendSecureTokenResponse(r, &p.p_vu.p_aval); } } if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendReleaseStream(r); SendFCPublish(r); } else { //---------------- r->dlg->AppendCInfo("發送數據。消息 Window Acknowledgement Size (typeID=5)。"); //----------------------------- RTMP_LogPrintf("發送消息Window Acknowledgement Size(typeID=5)\n"); RTMP_SendServerBW(r); RTMP_SendCtrl(r, 3, 0, 300); } //---------------- r->dlg->AppendCInfo("發送數據。消息 命令 (typeID=20) (CreateStream)。"); //----------------------------- RTMP_LogPrintf("發送命令消息“CreateStream” (typeID=20)\n"); RTMP_SendCreateStream(r); if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) { /* Send the FCSubscribe if live stream or if subscribepath is set */ if (r->Link.subscribepath.av_len) SendFCSubscribe(r, &r->Link.subscribepath); else if (r->Link.lFlags & RTMP_LF_LIVE) SendFCSubscribe(r, &r->Link.playpath); } } else if (AVMATCH(&methodInvoked, &av_createStream)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","Result (CreateStream)"); //----------------------------- r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendPublish(r); } else { if (r->Link.lFlags & RTMP_LF_PLST) SendPlaylist(r); //---------------- r->dlg->AppendCInfo("發送數據。消息 命令 (typeID=20) (Play)。"); //----------------------------- RTMP_LogPrintf("發送命令消息“play” (typeID=20)\n"); SendPlay(r); RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); } } else if (AVMATCH(&methodInvoked, &av_play) || AVMATCH(&methodInvoked, &av_publish)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","Result (Play or Publish)"); //----------------------------- r->m_bPlaying = TRUE; } free(methodInvoked.av_val); } else if (AVMATCH(&method, &av_onBWDone)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","onBWDone"); //----------------------------- if (!r->m_nBWCheckCounter) SendCheckBW(r); } else if (AVMATCH(&method, &av_onFCSubscribe)) { /* SendOnFCSubscribe(); */ } else if (AVMATCH(&method, &av_onFCUnsubscribe)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","onFCUnsubscribe"); //----------------------------- RTMP_Close(r); ret = 1; } else if (AVMATCH(&method, &av_ping)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","Ping"); //----------------------------- SendPong(r, txn); } else if (AVMATCH(&method, &av__onbwcheck)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","onBWcheck"); //----------------------------- SendCheckBWResult(r, txn); } else if (AVMATCH(&method, &av__onbwdone)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","onBWdone"); //----------------------------- int i; for (i = 0; i < r->m_numCalls; i++) if (AVMATCH(&r->m_methodCalls[i].name, &av__checkbw)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } else if (AVMATCH(&method, &av__error)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","error"); //----------------------------- RTMP_Log(RTMP_LOGERROR, "rtmp server sent error"); } else if (AVMATCH(&method, &av_close)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","close"); //----------------------------- RTMP_Log(RTMP_LOGERROR, "rtmp server requested close"); RTMP_Close(r); } else if (AVMATCH(&method, &av_onStatus)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","onStatus"); //----------------------------- AMFObject obj2; AVal code, level; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2); AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code); AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level); RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); if (AVMATCH(&code, &av_NetStream_Failed) || AVMATCH(&code, &av_NetStream_Play_Failed) || AVMATCH(&code, &av_NetStream_Play_StreamNotFound) || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp)) { r->m_stream_id = -1; RTMP_Close(r); RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val); } else if (AVMATCH(&code, &av_NetStream_Play_Start)) { int i; r->m_bPlaying = TRUE; for (i = 0; i < r->m_numCalls; i++) { if (AVMATCH(&r->m_methodCalls[i].name, &av_play)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } } else if (AVMATCH(&code, &av_NetStream_Publish_Start)) { int i; r->m_bPlaying = TRUE; for (i = 0; i < r->m_numCalls; i++) { if (AVMATCH(&r->m_methodCalls[i].name, &av_publish)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } } /* Return 1 if this is a Play.Complete or Play.Stop */ else if (AVMATCH(&code, &av_NetStream_Play_Complete) || AVMATCH(&code, &av_NetStream_Play_Stop) || AVMATCH(&code, &av_NetStream_Play_UnpublishNotify)) { RTMP_Close(r); ret = 1; } else if (AVMATCH(&code, &av_NetStream_Seek_Notify)) { r->m_read.flags &= ~RTMP_READ_SEEKING; } else if (AVMATCH(&code, &av_NetStream_Pause_Notify)) { if (r->m_pausing == 1 || r->m_pausing == 2) { RTMP_SendPause(r, FALSE, r->m_pauseStamp); r->m_pausing = 3; } } } else if (AVMATCH(&method, &av_playlist_ready)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","playlist_ready"); //----------------------------- int i; for (i = 0; i < r->m_numCalls; i++) { if (AVMATCH(&r->m_methodCalls[i].name, &av_set_playlist)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } } else { } leave: AMF_Reset(&obj); return ret; } int RTMP_FindFirstMatchingProperty(AMFObject *obj, const AVal *name, AMFObjectProperty * p) { int n; /* this is a small object search to locate the "duration" property */ for (n = 0; n < obj->o_num; n++) { AMFObjectProperty *prop = AMF_GetProp(obj, NULL, n); if (AVMATCH(&prop->p_name, name)) { *p = *prop; return TRUE; } if (prop->p_type == AMF_OBJECT) { if (RTMP_FindFirstMatchingProperty(&prop->p_vu.p_object, name, p)) return TRUE; } } return FALSE; }
該函數主要做了以下幾步:
1.調用AMF_Decode()解碼AMF命令數據
2.調用AMFProp_GetString()獲取具體命令的字符串
3.調用AVMATCH()比較字符串,不同的命令做不同的處理,例如以下幾個:
AVMATCH(&methodInvoked, &av_connect) AVMATCH(&methodInvoked, &av_createStream) AVMATCH(&methodInvoked, &av_play) AVMATCH(&methodInvoked, &av_publish) AVMATCH(&method, &av_onBWDone)
等等,不一一例舉了
具體的處理過程如下所示。在這里說一個“建立網絡流”(createStream)的例子,通常發生在建立網絡連接(NetConnection)之后,播放(Play)之前。
else if (AVMATCH(&methodInvoked, &av_createStream)) { //---------------- r->dlg->AppendMLInfo(20,0,"命令消息","Result (CreateStream)"); //----------------------------- r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendPublish(r); } else { if (r->Link.lFlags & RTMP_LF_PLST) SendPlaylist(r); //---------------- r->dlg->AppendCInfo("發送數據。消息 命令 (typeID=20) (Play)。"); //----------------------------- RTMP_LogPrintf("發送命令消息“play” (typeID=20)\n"); SendPlay(r); RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); } }
由代碼可見,程序先獲取了stream_id,然后發送了兩個消息(Message),分別是SendPlaylist()和SendPlay(),用於獲取播放列表,以及開始播放流媒體數據。
8: 發送消息(Message)
之前寫了一系列的文章介紹RTMPDump各種函數。比如怎么建立網絡連接(NetConnection),怎么建立網絡流(NetStream)之類的,唯獨沒有介紹這些發送或接收的數據,在底層到底是怎么實現的。本文就是要剖析一下其內部的實現。即這些消息(Message)到底是怎么發送和接收的。
先來看看發送消息吧。
發送connect命令使用函數SendConnectPacket()
發送createstream命令使用RTMP_SendCreateStream()
發送realeaseStream命令使用SendReleaseStream()
發送publish命令使用SendPublish()
發送deleteStream的命令使用SendDeleteStream()
發送pause命令使用RTMP_SendPause()
不再一一例舉,發現函數命名有兩種規律:RTMP_Send***()或者Send***(),其中*號代表命令的名稱。
SendConnectPacket()這個命令是每次程序開始運行的時候發送的第一個命令消息,內容比較多,包含了很多AMF編碼的內容,在此不多做分析,貼上代碼:
//發送“connect”命令 static int SendConnectPacket(RTMP *r, RTMPPacket *cp) { RTMPPacket packet; char pbuf[4096], *pend = pbuf + sizeof(pbuf); char *enc; if (cp) return RTMP_SendPacket(r, cp, TRUE); packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; /* INVOKE */ packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_connect); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_OBJECT; enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app); if (!enc) return FALSE; if (r->Link.protocol & RTMP_FEATURE_WRITE) { enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate); if (!enc) return FALSE; } if (r->Link.flashVer.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer); if (!enc) return FALSE; } if (r->Link.swfUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl); if (!enc) return FALSE; } if (r->Link.tcUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl); if (!enc) return FALSE; } if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) { enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE); if (!enc) return FALSE; enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0); if (!enc) return FALSE; enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs); if (!enc) return FALSE; enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs); if (!enc) return FALSE; enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0); if (!enc) return FALSE; if (r->Link.pageUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl); if (!enc) return FALSE; } } if (r->m_fEncoding != 0.0 || r->m_bSendEncoding) { /* AMF0, AMF3 not fully supported yet */ enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding); if (!enc) return FALSE; } if (enc + 3 >= pend) return FALSE; *enc++ = 0; *enc++ = 0; /* end of object - 0x00 0x00 0x09 */ *enc++ = AMF_OBJECT_END; /* add auth string */ if (r->Link.auth.av_len) { enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH); if (!enc) return FALSE; enc = AMF_EncodeString(enc, pend, &r->Link.auth); if (!enc) return FALSE; } if (r->Link.extras.o_num) { int i; for (i = 0; i < r->Link.extras.o_num; i++) { enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend); if (!enc) return FALSE; } } packet.m_nBodySize = enc - packet.m_body; //---------------- r->dlg->AppendMLInfo(20,1,"命令消息","Connect"); //----------------------------- return RTMP_SendPacket(r, &packet, TRUE); }
RTMP_SendCreateStream()命令相對而言比較簡單,代碼如下:
//發送“createstream”命令 int RTMP_SendCreateStream(RTMP *r) { RTMPPacket packet; char pbuf[256], *pend = pbuf + sizeof(pbuf); char *enc; packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; packet.m_packetType = 0x14; /* INVOKE */ packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_createStream); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; /* NULL */ packet.m_nBodySize = enc - packet.m_body; //---------------- r->dlg->AppendMLInfo(20,1,"命令消息","CreateStream"); //----------------------------- return RTMP_SendPacket(r, &packet, TRUE); }
同樣,SendReleaseStream()內容也比較簡單,我對其中部分內容作了注釋:
//發送RealeaseStream命令 static int SendReleaseStream(RTMP *r) { RTMPPacket packet; char pbuf[1024], *pend = pbuf + sizeof(pbuf); char *enc; packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; packet.m_packetType = 0x14; /* INVOKE */ packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; //對“releaseStream”字符串進行AMF編碼 enc = AMF_EncodeString(enc, pend, &av_releaseStream); //對傳輸ID(0)進行AMF編碼? enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); //命令對象 *enc++ = AMF_NULL; //對播放路徑字符串進行AMF編碼 enc = AMF_EncodeString(enc, pend, &r->Link.playpath); if (!enc) return FALSE; packet.m_nBodySize = enc - packet.m_body; //---------------- r->dlg->AppendMLInfo(20,1,"命令消息","ReleaseStream"); //----------------------------- return RTMP_SendPacket(r, &packet, FALSE); }
再來看一個SendPublish()函數,用於發送“publish”命令
//發送Publish命令 static int SendPublish(RTMP *r) { RTMPPacket packet; char pbuf[1024], *pend = pbuf + sizeof(pbuf); char *enc; //塊流ID為4 packet.m_nChannel = 0x04; /* source channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_LARGE; //命令消息,類型20 packet.m_packetType = 0x14; /* INVOKE */ packet.m_nTimeStamp = 0; //流ID packet.m_nInfoField2 = r->m_stream_id; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; //指向Chunk的負載 enc = packet.m_body; //對“publish”字符串進行AMF編碼 enc = AMF_EncodeString(enc, pend, &av_publish); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); //命令對象為空 *enc++ = AMF_NULL; enc = AMF_EncodeString(enc, pend, &r->Link.playpath); if (!enc) return FALSE; /* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */ enc = AMF_EncodeString(enc, pend, &av_live); if (!enc) return FALSE; packet.m_nBodySize = enc - packet.m_body; //---------------- r->dlg->AppendMLInfo(20,1,"命令消息","Pulish"); //----------------------------- return RTMP_SendPacket(r, &packet, TRUE); }
其他的命令不再一一例舉,總體的思路是聲明一個RTMPPacket類型的結構體,然后設置各種屬性值,最后交給RTMP_SendPacket()進行發送。
RTMPPacket類型的結構體定義如下,一個RTMPPacket對應RTMP協議規范里面的一個塊(Chunk)。
//Chunk信息 typedef struct RTMPPacket { uint8_t m_headerType;//ChunkMsgHeader的類型(4種) uint8_t m_packetType;//Message type ID(1-7協議控制;8,9音視頻;10以后為AMF編碼消息) uint8_t m_hasAbsTimestamp; /* Timestamp 是絕對值還是相對值? */ int m_nChannel; //塊流ID uint32_t m_nTimeStamp; // Timestamp int32_t m_nInfoField2; /* last 4 bytes in a long header,消息流ID */ uint32_t m_nBodySize; //消息長度 uint32_t m_nBytesRead; RTMPChunk *m_chunk; char *m_body; } RTMPPacket;
下面我們來看看RTMP_SendPacket()吧,各種的RTMPPacket(即各種Chunk)都需要用這個函數進行發送。
//自己編一個數據報發送出去! //非常常用 int RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue) { const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel]; uint32_t last = 0; int nSize; int hSize, cSize; char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c; uint32_t t; char *buffer, *tbuf = NULL, *toff = NULL; int nChunkSize; int tlen; //不是完整ChunkMsgHeader if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE) { /* compress a bit by using the prev packet's attributes */ //獲取ChunkMsgHeader的類型 //前一個Chunk和這個Chunk對比 if (prevPacket->m_nBodySize == packet->m_nBodySize && prevPacket->m_packetType == packet->m_packetType && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM) packet->m_headerType = RTMP_PACKET_SIZE_SMALL; if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp && packet->m_headerType == RTMP_PACKET_SIZE_SMALL) packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM; //上一個packet的TimeStamp last = prevPacket->m_nTimeStamp; } if (packet->m_headerType > 3) /* sanity */ { RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", (unsigned char)packet->m_headerType); return FALSE; } //chunk包頭大小;packetSize[] = { 12, 8, 4, 1 } nSize = packetSize[packet->m_headerType]; hSize = nSize; cSize = 0; //相對的TimeStamp t = packet->m_nTimeStamp - last; if (packet->m_body) { //Header的Start //m_body是指向負載數據首地址的指針;“-”號用於指針前移 header = packet->m_body - nSize; //Header的End hend = packet->m_body; } else { header = hbuf + 6; hend = hbuf + sizeof(hbuf); } //當ChunkStreamID大於319時 if (packet->m_nChannel > 319) //ChunkBasicHeader是3個字節 cSize = 2; //當ChunkStreamID大於63時 else if (packet->m_nChannel > 63) //ChunkBasicHeader是2個字節 cSize = 1; if (cSize) { //header指針指向ChunkMsgHeader header -= cSize; //hsize加上ChunkBasicHeader的長度 hSize += cSize; } //相對TimeStamp大於0xffffff,此時需要使用ExtendTimeStamp if (nSize > 1 && t >= 0xffffff) { header -= 4; hSize += 4; } hptr = header; //把ChunkBasicHeader的Fmt類型左移6位 c = packet->m_headerType << 6; switch (cSize) { //把ChunkBasicHeader的低6位設置成ChunkStreamID case 0: c |= packet->m_nChannel; break; //同理,但低6位設置成000000 case 1: break; //同理,但低6位設置成000001 case 2: c |= 1; break; } //可以拆分成兩句*hptr=c;hptr++,此時hptr指向第2個字節 *hptr++ = c; //CSize>0,即ChunkBasicHeader大於1字節 if (cSize) { //將要放到第2字節的內容tmp int tmp = packet->m_nChannel - 64; //獲取低位存儲與第2字節 *hptr++ = tmp & 0xff; //ChunkBasicHeader是最大的3字節時 if (cSize == 2) //獲取高位存儲於最后1個字節(注意:排序使用大端序列,和主機相反) *hptr++ = tmp >> 8; } //ChunkMsgHeader。注意一共有4種,包含的字段數不同。 //TimeStamp(3B) if (nSize > 1) { //相對TimeStamp和絕對TimeStamp? hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t); } //MessageLength+MessageTypeID(4B) if (nSize > 4) { //MessageLength hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize); //MessageTypeID *hptr++ = packet->m_packetType; } //MessageStreamID(4B) if (nSize > 8) hptr += EncodeInt32LE(hptr, packet->m_nInfoField2); //ExtendedTimeStamp if (nSize > 1 && t >= 0xffffff) hptr = AMF_EncodeInt32(hptr, hend, t); //負載長度,指向負載的指針 nSize = packet->m_nBodySize; buffer = packet->m_body; //Chunk大小,默認128字節 nChunkSize = r->m_outChunkSize; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, nSize); /* send all chunks in one HTTP request */ //使用HTTP if (r->Link.protocol & RTMP_FEATURE_HTTP) { //nSize:Message負載長度;nChunkSize:Chunk長度; //例nSize:307,nChunkSize:128; //可分為(307+128-1)/128=3個 //為什么+nChunkSize-1?因為除法會只取整數部分! int chunks = (nSize+nChunkSize-1) / nChunkSize; //Chunk個數超過一個 if (chunks > 1) { //注意:CSize=1表示ChunkBasicHeader是2字節 //消息分n塊后總的開銷: //n個ChunkBasicHeader,1個ChunkMsgHeader,1個Message負載 //實際中只有第一個Chunk是完整的,剩下的只有ChunkBasicHeader tlen = chunks * (cSize + 1) + nSize + hSize; //分配內存 tbuf = (char *) malloc(tlen); if (!tbuf) return FALSE; toff = tbuf; } //消息的負載+頭 } while (nSize + hSize) { int wrote; //消息負載<Chunk大小(不用分塊) if (nSize < nChunkSize) //Chunk可能小於設定值 nChunkSize = nSize; RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize); RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize); if (tbuf) { //void *memcpy(void *dest, const void *src, int n); //由src指向地址為起始地址的連續n個字節的數據復制到以dest指向地址為起始地址的空間內 memcpy(toff, header, nChunkSize + hSize); toff += nChunkSize + hSize; } else { wrote = WriteN(r, header, nChunkSize + hSize); if (!wrote) return FALSE; } //消息負載長度-Chunk負載長度 nSize -= nChunkSize; //Buffer指針前移1個Chunk負載長度 buffer += nChunkSize; hSize = 0; //如果消息沒有發完 if (nSize > 0) { //ChunkBasicHeader header = buffer - 1; hSize = 1; if (cSize) { header -= cSize; hSize += cSize; } //ChunkBasicHeader第1個字節 *header = (0xc0 | c); //ChunkBasicHeader大於1字節 if (cSize) { int tmp = packet->m_nChannel - 64; header[1] = tmp & 0xff; if (cSize == 2) header[2] = tmp >> 8; } } } if (tbuf) { // int wrote = WriteN(r, tbuf, toff-tbuf); free(tbuf); tbuf = NULL; if (!wrote) return FALSE; } /* we invoked a remote method */ if (packet->m_packetType == 0x14) { AVal method; char *ptr; ptr = packet->m_body + 1; AMF_DecodeString(ptr, &method); RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val); /* keep it in call queue till result arrives */ if (queue) { int txn; ptr += 3 + method.av_len; txn = (int)AMF_DecodeNumber(ptr); AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn); } } if (!r->m_vecChannelsOut[packet->m_nChannel]) r->m_vecChannelsOut[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket)); return TRUE; }
這個函數乍一看好像非常復雜,其實不然,他只是按照RTMP規范將數據編碼成符合規范的塊(Chunk),規范可以參考相關的文檔。
具體怎么編碼成塊(Chunk)就不多分析了,在這里需要注意一個函數:WriteN()。該函數完成了將數據發送出去的功能。
來看一下WriteN()函數:
//發送數據報的時候調用(連接,buffer,長度) static int WriteN(RTMP *r, const char *buffer, int n) { const char *ptr = buffer; #ifdef CRYPTO char *encrypted = 0; char buf[RTMP_BUFFER_CACHE_SIZE]; if (r->Link.rc4keyOut) { if (n > sizeof(buf)) encrypted = (char *)malloc(n); else encrypted = (char *)buf; ptr = encrypted; RC4_encrypt2((RC4_KEY *)r->Link.rc4keyOut, n, buffer, ptr); } #endif while (n > 0) { int nBytes; //因方式的不同而調用不同函數 //如果使用的是HTTP協議進行連接 if (r->Link.protocol & RTMP_FEATURE_HTTP) nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n); else nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n); /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d\n", __FUNCTION__, nBytes); */ //成功發送字節數<0 if (nBytes < 0) { int sockerr = GetSockError(); RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__, sockerr, n); if (sockerr == EINTR && !RTMP_ctrlC) continue; RTMP_Close(r); n = 1; break; } if (nBytes == 0) break; n -= nBytes; ptr += nBytes; } #ifdef CRYPTO if (encrypted && encrypted != buf) free(encrypted); #endif return n == 0; }
該函數中,RTMPSockBuf_Send()完成了數據發送的功能,再來看看這個函數(函數調用真是好多啊。。。。)
//Socket發送(指明套接字,buffer緩沖區,數據長度) //返回所發數據量 int RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len) { int rc; #ifdef _DEBUG fwrite(buf, 1, len, netstackdump); #endif #if defined(CRYPTO) && !defined(NO_SSL) if (sb->sb_ssl) { rc = TLS_write((SSL *)sb->sb_ssl, buf, len); } else #endif { //向一個已連接的套接口發送數據。 //int send( SOCKET s, const char * buf, int len, int flags); //s:一個用於標識已連接套接口的描述字。 //buf:包含待發送數據的緩沖區。 //len:緩沖區中數據的長度。 //flags:調用執行方式。 //rc:所發數據量。 rc = send(sb->sb_socket, buf, len, 0); } return rc; } int RTMPSockBuf_Close(RTMPSockBuf *sb) { #if defined(CRYPTO) && !defined(NO_SSL) if (sb->sb_ssl) { TLS_shutdown((SSL *)sb->sb_ssl); TLS_close((SSL *)sb->sb_ssl); sb->sb_ssl = NULL; } #endif return closesocket(sb->sb_socket); }
到這個函數的時候,發現一層層的調用終於完成了,最后調用了系統Socket的send()函數完成了數據的發送功能。
之前貼過一張圖總結這個過程,可能理解起來要方便一些:RTMPDump源代碼分析 0: 主要函數調用分析
9: 接收消息(Message)(接收視音頻數據)
在這里在研究研究接收消息(Message)的源代碼,接收消息最典型的應用就是接收視音頻數據了,因為視頻和音頻分別都屬於RTMP協議規范中的一種消息。在這里主要分析接收視音頻數據。
RTMPdump中完成視音頻數據的接收(也可以說是視音頻數據的下載)的函數是:RTMP_Read()。
RTMPdump主程序中的Download()函數就是通過調用RTMP_Read()完成數據接收,從而實現下載的。
那么我們馬上開始吧,首先看看RTMP_Read()函數:
//FLV文件頭 static const char flvHeader[] = { 'F', 'L', 'V', 0x01, 0x00, /* 0x04代表有音頻, 0x01代表有視頻 */ 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00 }; #define HEADERBUF (128*1024) int RTMP_Read(RTMP *r, char *buf, int size) { int nRead = 0, total = 0; /* can't continue */ fail: switch (r->m_read.status) { case RTMP_READ_EOF: case RTMP_READ_COMPLETE: return 0; case RTMP_READ_ERROR: /* corrupted stream, resume failed */ SetSockError(EINVAL); return -1; default: break; } /* first time thru */ if (!(r->m_read.flags & RTMP_READ_HEADER)) { if (!(r->m_read.flags & RTMP_READ_RESUME)) { //分配內存,指向buf的首部和尾部 char *mybuf = (char *) malloc(HEADERBUF), *end = mybuf + HEADERBUF; int cnt = 0; //buf指向同一地址 r->m_read.buf = mybuf; r->m_read.buflen = HEADERBUF; //把Flv的首部復制到mybuf指向的內存 //RTMP傳遞的多媒體數據是“砍頭”的FLV文件 memcpy(mybuf, flvHeader, sizeof(flvHeader)); //m_read.buf指針后移flvheader個單位 r->m_read.buf += sizeof(flvHeader); //buf長度增加flvheader長度 r->m_read.buflen -= sizeof(flvHeader); //timestamp=0,不是多媒體數據 while (r->m_read.timestamp == 0) { //讀取一個Packet,到r->m_read.buf //nRead為讀取結果標記 nRead = Read_1_Packet(r, r->m_read.buf, r->m_read.buflen); //有錯誤 if (nRead < 0) { free(mybuf); r->m_read.buf = NULL; r->m_read.buflen = 0; r->m_read.status = nRead; goto fail; } /* buffer overflow, fix buffer and give up */ if (r->m_read.buf < mybuf || r->m_read.buf > end) { mybuf = (char *) realloc(mybuf, cnt + nRead); memcpy(mybuf+cnt, r->m_read.buf, nRead); r->m_read.buf = mybuf+cnt+nRead; break; } // //記錄讀取的字節數 cnt += nRead; //m_read.buf指針后移nRead個單位 r->m_read.buf += nRead; r->m_read.buflen -= nRead; //當dataType=00000101時,即有視頻和音頻時 //說明有多媒體數據了 if (r->m_read.dataType == 5) break; } //讀入數據類型 //注意:mybuf指針位置一直沒動 //mybuf[4]中第 6 位表示是否存在音頻Tag。第 8 位表示是否存在視頻Tag。 mybuf[4] = r->m_read.dataType; //兩個指針之間的差 r->m_read.buflen = r->m_read.buf - mybuf; r->m_read.buf = mybuf; //這句很重要!后面memcopy r->m_read.bufpos = mybuf; } //flags標明已經讀完了文件頭 r->m_read.flags |= RTMP_READ_HEADER; } if ((r->m_read.flags & RTMP_READ_SEEKING) && r->m_read.buf) { /* drop whatever's here */ free(r->m_read.buf); r->m_read.buf = NULL; r->m_read.bufpos = NULL; r->m_read.buflen = 0; } /* If there's leftover data buffered, use it up */ if (r->m_read.buf) { nRead = r->m_read.buflen; if (nRead > size) nRead = size; //m_read.bufpos指向mybuf memcpy(buf, r->m_read.bufpos, nRead); r->m_read.buflen -= nRead; if (!r->m_read.buflen) { free(r->m_read.buf); r->m_read.buf = NULL; r->m_read.bufpos = NULL; } else { r->m_read.bufpos += nRead; } buf += nRead; total += nRead; size -= nRead; } //接着讀 while (size > 0 && (nRead = Read_1_Packet(r, buf, size)) >= 0) { if (!nRead) continue; buf += nRead; total += nRead; size -= nRead; break; } if (nRead < 0) r->m_read.status = nRead; if (size < 0) total += size; return total; }
程序關鍵的地方都已經注釋上了代碼,在此就不重復說明了。有一點要提一下:RTMP傳送的視音頻數據的格式和FLV(FLash Video)格式是一樣的,把接收下來的數據直接存入文件就可以了。但是這些視音頻數據沒有文件頭,是純視音頻數據,因此需要在其前面加上FLV格式的文件頭,這樣得到的數據存成文件后才能被一般的視頻播放器所播放。FLV格式的文件頭是13個字節,如代碼中所示。
RTMP_Read()中實際讀取數據的函數是Read_1_Packet(),它的功能是從網絡上讀取一個RTMPPacket的數據,來看看它的源代碼吧:
/* 從流媒體中讀取多媒體packet。 * Returns -3 if Play.Close/Stop, -2 if fatal error, -1 if no more media * packets, 0 if ignorable error, >0 if there is a media packet */ static int Read_1_Packet(RTMP *r, char *buf, unsigned int buflen) { uint32_t prevTagSize = 0; int rtnGetNextMediaPacket = 0, ret = RTMP_READ_EOF; RTMPPacket packet = { 0 }; int recopy = FALSE; unsigned int size; char *ptr, *pend; uint32_t nTimeStamp = 0; unsigned int len; //獲取下一個packet rtnGetNextMediaPacket = RTMP_GetNextMediaPacket(r, &packet); while (rtnGetNextMediaPacket) { char *packetBody = packet.m_body; unsigned int nPacketLen = packet.m_nBodySize; /* Return -3 if this was completed nicely with invoke message * Play.Stop or Play.Complete */ if (rtnGetNextMediaPacket == 2) { RTMP_Log(RTMP_LOGDEBUG, "Got Play.Complete or Play.Stop from server. " "Assuming stream is complete"); ret = RTMP_READ_COMPLETE; break; } //設置dataType r->m_read.dataType |= (((packet.m_packetType == 0x08) << 2) | (packet.m_packetType == 0x09)); //MessageID為9時,為視頻數據,數據太小時。。。 if (packet.m_packetType == 0x09 && nPacketLen <= 5) { RTMP_Log(RTMP_LOGDEBUG, "ignoring too small video packet: size: %d", nPacketLen); ret = RTMP_READ_IGNORE; break; } //MessageID為8時,為音頻數據,數據太小時。。。 if (packet.m_packetType == 0x08 && nPacketLen <= 1) { RTMP_Log(RTMP_LOGDEBUG, "ignoring too small audio packet: size: %d", nPacketLen); ret = RTMP_READ_IGNORE; break; } if (r->m_read.flags & RTMP_READ_SEEKING) { ret = RTMP_READ_IGNORE; break; } #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms, abs TS: %d", packet.m_packetType, nPacketLen, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); if (packet.m_packetType == 0x09) RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0)); #endif if (r->m_read.flags & RTMP_READ_RESUME) { /* check the header if we get one */ //此類packet的timestamp都是0 if (packet.m_nTimeStamp == 0) { //messageID=18,數據消息(AMF0) if (r->m_read.nMetaHeaderSize > 0 && packet.m_packetType == 0x12) { //獲取metadata AMFObject metaObj; int nRes = AMF_Decode(&metaObj, packetBody, nPacketLen, FALSE); if (nRes >= 0) { AVal metastring; AMFProp_GetString(AMF_GetProp(&metaObj, NULL, 0), &metastring); if (AVMATCH(&metastring, &av_onMetaData)) { /* compare */ if ((r->m_read.nMetaHeaderSize != nPacketLen) || (memcmp (r->m_read.metaHeader, packetBody, r->m_read.nMetaHeaderSize) != 0)) { ret = RTMP_READ_ERROR; } } AMF_Reset(&metaObj); if (ret == RTMP_READ_ERROR) break; } } /* check first keyframe to make sure we got the right position * in the stream! (the first non ignored frame) */ if (r->m_read.nInitialFrameSize > 0) { /* video or audio data */ if (packet.m_packetType == r->m_read.initialFrameType && r->m_read.nInitialFrameSize == nPacketLen) { /* we don't compare the sizes since the packet can * contain several FLV packets, just make sure the * first frame is our keyframe (which we are going * to rewrite) */ if (memcmp (r->m_read.initialFrame, packetBody, r->m_read.nInitialFrameSize) == 0) { RTMP_Log(RTMP_LOGDEBUG, "Checked keyframe successfully!"); r->m_read.flags |= RTMP_READ_GOTKF; /* ignore it! (what about audio data after it? it is * handled by ignoring all 0ms frames, see below) */ ret = RTMP_READ_IGNORE; break; } } /* hande FLV streams, even though the server resends the * keyframe as an extra video packet it is also included * in the first FLV stream chunk and we have to compare * it and filter it out !! */ //MessageID=22,聚合消息 if (packet.m_packetType == 0x16) { /* basically we have to find the keyframe with the * correct TS being nResumeTS */ unsigned int pos = 0; uint32_t ts = 0; while (pos + 11 < nPacketLen) { /* size without header (11) and prevTagSize (4) */ uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1); ts = AMF_DecodeInt24(packetBody + pos + 4); ts |= (packetBody[pos + 7] << 24); #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "keyframe search: FLV Packet: type %02X, dataSize: %d, timeStamp: %d ms", packetBody[pos], dataSize, ts); #endif /* ok, is it a keyframe?: * well doesn't work for audio! */ if (packetBody[pos /*6928, test 0 */ ] == r->m_read.initialFrameType /* && (packetBody[11]&0xf0) == 0x10 */ ) { if (ts == r->m_read.nResumeTS) { RTMP_Log(RTMP_LOGDEBUG, "Found keyframe with resume-keyframe timestamp!"); if (r->m_read.nInitialFrameSize != dataSize || memcmp(r->m_read.initialFrame, packetBody + pos + 11, r->m_read. nInitialFrameSize) != 0) { RTMP_Log(RTMP_LOGERROR, "FLV Stream: Keyframe doesn't match!"); ret = RTMP_READ_ERROR; break; } r->m_read.flags |= RTMP_READ_GOTFLVK; /* skip this packet? * check whether skippable: */ if (pos + 11 + dataSize + 4 > nPacketLen) { RTMP_Log(RTMP_LOGWARNING, "Non skipable packet since it doesn't end with chunk, stream corrupt!"); ret = RTMP_READ_ERROR; break; } packetBody += (pos + 11 + dataSize + 4); nPacketLen -= (pos + 11 + dataSize + 4); goto stopKeyframeSearch; } else if (r->m_read.nResumeTS < ts) { /* the timestamp ts will only increase with * further packets, wait for seek */ goto stopKeyframeSearch; } } pos += (11 + dataSize + 4); } if (ts < r->m_read.nResumeTS) { RTMP_Log(RTMP_LOGERROR, "First packet does not contain keyframe, all " "timestamps are smaller than the keyframe " "timestamp; probably the resume seek failed?"); } stopKeyframeSearch: ; if (!(r->m_read.flags & RTMP_READ_GOTFLVK)) { RTMP_Log(RTMP_LOGERROR, "Couldn't find the seeked keyframe in this chunk!"); ret = RTMP_READ_IGNORE; break; } } } } if (packet.m_nTimeStamp > 0 && (r->m_read.flags & (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK))) { /* another problem is that the server can actually change from * 09/08 video/audio packets to an FLV stream or vice versa and * our keyframe check will prevent us from going along with the * new stream if we resumed. * * in this case set the 'found keyframe' variables to true. * We assume that if we found one keyframe somewhere and were * already beyond TS > 0 we have written data to the output * which means we can accept all forthcoming data including the * change between 08/09 <-> FLV packets */ r->m_read.flags |= (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK); } /* skip till we find our keyframe * (seeking might put us somewhere before it) */ if (!(r->m_read.flags & RTMP_READ_GOTKF) && packet.m_packetType != 0x16) { RTMP_Log(RTMP_LOGWARNING, "Stream does not start with requested frame, ignoring data... "); r->m_read.nIgnoredFrameCounter++; if (r->m_read.nIgnoredFrameCounter > MAX_IGNORED_FRAMES) ret = RTMP_READ_ERROR; /* fatal error, couldn't continue stream */ else ret = RTMP_READ_IGNORE; break; } /* ok, do the same for FLV streams */ if (!(r->m_read.flags & RTMP_READ_GOTFLVK) && packet.m_packetType == 0x16) { RTMP_Log(RTMP_LOGWARNING, "Stream does not start with requested FLV frame, ignoring data... "); r->m_read.nIgnoredFlvFrameCounter++; if (r->m_read.nIgnoredFlvFrameCounter > MAX_IGNORED_FRAMES) ret = RTMP_READ_ERROR; else ret = RTMP_READ_IGNORE; break; } /* we have to ignore the 0ms frames since these are the first * keyframes; we've got these so don't mess around with multiple * copies sent by the server to us! (if the keyframe is found at a * later position there is only one copy and it will be ignored by * the preceding if clause) */ if (!(r->m_read.flags & RTMP_READ_NO_IGNORE) && packet.m_packetType != 0x16) { /* exclude type 0x16 (FLV) since it can * contain several FLV packets */ if (packet.m_nTimeStamp == 0) { ret = RTMP_READ_IGNORE; break; } else { /* stop ignoring packets */ r->m_read.flags |= RTMP_READ_NO_IGNORE; } } } /* calculate packet size and allocate slop buffer if necessary */ size = nPacketLen + ((packet.m_packetType == 0x08 || packet.m_packetType == 0x09 || packet.m_packetType == 0x12) ? 11 : 0) + (packet.m_packetType != 0x16 ? 4 : 0); if (size + 4 > buflen) { /* the extra 4 is for the case of an FLV stream without a last * prevTagSize (we need extra 4 bytes to append it) */ r->m_read.buf = (char *) malloc(size + 4); if (r->m_read.buf == 0) { RTMP_Log(RTMP_LOGERROR, "Couldn't allocate memory!"); ret = RTMP_READ_ERROR; /* fatal error */ break; } recopy = TRUE; ptr = r->m_read.buf; } else { ptr = buf; } pend = ptr + size + 4; /* use to return timestamp of last processed packet */ /* audio (0x08), video (0x09) or metadata (0x12) packets : * construct 11 byte header then add rtmp packet's data */ if (packet.m_packetType == 0x08 || packet.m_packetType == 0x09 || packet.m_packetType == 0x12) { nTimeStamp = r->m_read.nResumeTS + packet.m_nTimeStamp; prevTagSize = 11 + nPacketLen; *ptr = packet.m_packetType; ptr++; ptr = AMF_EncodeInt24(ptr, pend, nPacketLen); #if 0 if(packet.m_packetType == 0x09) { /* video */ /* H264 fix: */ if((packetBody[0] & 0x0f) == 7) { /* CodecId = H264 */ uint8_t packetType = *(packetBody+1); uint32_t ts = AMF_DecodeInt24(packetBody+2); /* composition time */ int32_t cts = (ts+0xff800000)^0xff800000; RTMP_Log(RTMP_LOGDEBUG, "cts : %d\n", cts); nTimeStamp -= cts; /* get rid of the composition time */ CRTMP::EncodeInt24(packetBody+2, 0); } RTMP_Log(RTMP_LOGDEBUG, "VIDEO: nTimeStamp: 0x%08X (%d)\n", nTimeStamp, nTimeStamp); } #endif ptr = AMF_EncodeInt24(ptr, pend, nTimeStamp); *ptr = (char)((nTimeStamp & 0xFF000000) >> 24); ptr++; /* stream id */ ptr = AMF_EncodeInt24(ptr, pend, 0); } memcpy(ptr, packetBody, nPacketLen); len = nPacketLen; /* correct tagSize and obtain timestamp if we have an FLV stream */ if (packet.m_packetType == 0x16) { unsigned int pos = 0; int delta; /* grab first timestamp and see if it needs fixing */ // nTimeStamp = AMF_DecodeInt24(packetBody + 4); // nTimeStamp |= (packetBody[7] << 24); // delta = packet.m_nTimeStamp - nTimeStamp; while (pos + 11 < nPacketLen) { /* size without header (11) and without prevTagSize (4) */ uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1); nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4); nTimeStamp |= (packetBody[pos + 7] << 24); // if (delta) // { // nTimeStamp += delta; // AMF_EncodeInt24(ptr+pos+4, pend, nTimeStamp); // ptr[pos+7] = nTimeStamp>>24; // } /* set data type */ r->m_read.dataType |= (((*(packetBody + pos) == 0x08) << 2) | (*(packetBody + pos) == 0x09)); if (pos + 11 + dataSize + 4 > nPacketLen) { if (pos + 11 + dataSize > nPacketLen) { RTMP_Log(RTMP_LOGERROR, "Wrong data size (%lu), stream corrupted, aborting!", dataSize); ret = RTMP_READ_ERROR; break; } RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!"); /* we have to append a last tagSize! */ prevTagSize = dataSize + 11; AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize); size += 4; len += 4; } else { prevTagSize = AMF_DecodeInt32(packetBody + pos + 11 + dataSize); #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms", (unsigned char)packetBody[pos], dataSize, prevTagSize, nTimeStamp); #endif if (prevTagSize != (dataSize + 11)) { #ifdef _DEBUG RTMP_Log(RTMP_LOGWARNING, "Tag and data size are not consitent, writing tag size according to dataSize+11: %d", dataSize + 11); #endif prevTagSize = dataSize + 11; AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize); } } pos += prevTagSize + 4; /*(11+dataSize+4); */ } } ptr += len; if (packet.m_packetType != 0x16) { /* FLV tag packets contain their own prevTagSize */ AMF_EncodeInt32(ptr, pend, prevTagSize); } /* In non-live this nTimeStamp can contain an absolute TS. * Update ext timestamp with this absolute offset in non-live mode * otherwise report the relative one */ /* RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, pktTS: %dms, TS: %dms, bLiveStream: %d", packet.m_packetType, nPacketLen, packet.m_nTimeStamp, nTimeStamp, r->Link.lFlags & RTMP_LF_LIVE); */ r->m_read.timestamp = (r->Link.lFlags & RTMP_LF_LIVE) ? packet.m_nTimeStamp : nTimeStamp; ret = size; break; } if (rtnGetNextMediaPacket) RTMPPacket_Free(&packet); if (recopy) { len = ret > buflen ? buflen : ret; memcpy(buf, r->m_read.buf, len); r->m_read.bufpos = r->m_read.buf + len; r->m_read.buflen = ret - len; } return ret; }
函數功能很多,重要的地方已經加上了注釋,在此不再細分析。Read_1_Packet()里面實現從網絡中讀取視音頻數據的函數是RTMP_GetNextMediaPacket()。下面我們來看看該函數的源代碼:
int RTMP_GetNextMediaPacket(RTMP *r, RTMPPacket *packet) { int bHasMediaPacket = 0; while (!bHasMediaPacket && RTMP_IsConnected(r) && RTMP_ReadPacket(r, packet)) { if (!RTMPPacket_IsReady(packet)) { continue; } bHasMediaPacket = RTMP_ClientPacket(r, packet); if (!bHasMediaPacket) { RTMPPacket_Free(packet); } else if (r->m_pausing == 3) { if (packet->m_nTimeStamp <= r->m_mediaStamp) { bHasMediaPacket = 0; #ifdef _DEBUG RTMP_Log(RTMP_LOGDEBUG, "Skipped type: %02X, size: %d, TS: %d ms, abs TS: %d, pause: %d ms", packet->m_packetType, packet->m_nBodySize, packet->m_nTimeStamp, packet->m_hasAbsTimestamp, r->m_mediaStamp); #endif continue; } r->m_pausing = 0; } } if (bHasMediaPacket) r->m_bPlaying = TRUE; else if (r->m_sb.sb_timedout && !r->m_pausing) r->m_pauseStamp = r->m_channelTimestamp[r->m_mediaChannel]; return bHasMediaPacket; }
這里有兩個函數比較重要:RTMP_ReadPacket()以及RTMP_ClientPacket()。這兩個函數中,前一個函數負責從網絡上讀取數據,后一個負責處理數據。這部分與建立RTMP連接的網絡流(NetStream)的時候很相似,參考:RTMPdump(libRTMP) 源代碼分析 6: 建立一個流媒體連接 (NetStream部分 1)
RTMP_ClientPacket()在前文中已經做過分析,在此不再重復敘述。在這里重點分析一下RTMP_ReadPacket(),來看看它的源代碼。
//讀取收下來的Chunk int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet) { //packet 存讀取完后的的數據 //Chunk Header最大值18 uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 }; //header 指向的是從Socket中收下來的數據 char *header = (char *)hbuf; int nSize, hSize, nToRead, nChunk; int didAlloc = FALSE; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket); //收下來的數據存入hbuf if (ReadN(r, (char *)hbuf, 1) == 0) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__); return FALSE; } //塊類型fmt packet->m_headerType = (hbuf[0] & 0xc0) >> 6; //塊流ID(2-63) packet->m_nChannel = (hbuf[0] & 0x3f); header++; //塊流ID第1字節為0時,塊流ID占2個字節 if (packet->m_nChannel == 0) { if (ReadN(r, (char *)&hbuf[1], 1) != 1) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte", __FUNCTION__); return FALSE; } //計算塊流ID(64-319) packet->m_nChannel = hbuf[1]; packet->m_nChannel += 64; header++; } //塊流ID第1字節為0時,塊流ID占3個字節 else if (packet->m_nChannel == 1) { int tmp; if (ReadN(r, (char *)&hbuf[1], 2) != 2) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte", __FUNCTION__); return FALSE; } tmp = (hbuf[2] << 8) + hbuf[1]; //計算塊流ID(64-65599) packet->m_nChannel = tmp + 64; RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel); header += 2; } //ChunkHeader的大小(4種) nSize = packetSize[packet->m_headerType]; if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */ packet->m_hasAbsTimestamp = TRUE; //11字節的完整ChunkMsgHeader的TimeStamp是絕對值 else if (nSize < RTMP_LARGE_HEADER_SIZE) { /* using values from the last message of this channel */ if (r->m_vecChannelsIn[packet->m_nChannel]) memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel], sizeof(RTMPPacket)); } nSize--; if (nSize > 0 && ReadN(r, header, nSize) != nSize) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x", __FUNCTION__, (unsigned int)hbuf[0]); return FALSE; } hSize = nSize + (header - (char *)hbuf); if (nSize >= 3) { //TimeStamp(注意 BigEndian to SmallEndian)(11,7,3字節首部都有) packet->m_nTimeStamp = AMF_DecodeInt24(header); /*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */ //消息長度(11,7字節首部都有) if (nSize >= 6) { packet->m_nBodySize = AMF_DecodeInt24(header + 3); packet->m_nBytesRead = 0; RTMPPacket_Free(packet); //(11,7字節首部都有) if (nSize > 6) { //Msg type ID packet->m_packetType = header[6]; //Msg Stream ID if (nSize == 11) packet->m_nInfoField2 = DecodeInt32LE(header + 7); } } //Extend TimeStamp if (packet->m_nTimeStamp == 0xffffff) { if (ReadN(r, header + nSize, 4) != 4) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp", __FUNCTION__); return FALSE; } packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize); hSize += 4; } } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize); if (packet->m_nBodySize > 0 && packet->m_body == NULL) { if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)) { RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__); return FALSE; } didAlloc = TRUE; packet->m_headerType = (hbuf[0] & 0xc0) >> 6; } nToRead = packet->m_nBodySize - packet->m_nBytesRead; nChunk = r->m_inChunkSize; if (nToRead < nChunk) nChunk = nToRead; /* Does the caller want the raw chunk? */ if (packet->m_chunk) { packet->m_chunk->c_headerSize = hSize; memcpy(packet->m_chunk->c_header, hbuf, hSize); packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead; packet->m_chunk->c_chunkSize = nChunk; } if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu", __FUNCTION__, packet->m_nBodySize); return FALSE; } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk); packet->m_nBytesRead += nChunk; /* keep the packet as ref for other packets on this channel */ if (!r->m_vecChannelsIn[packet->m_nChannel]) r->m_vecChannelsIn[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket)); //讀取完畢 if (RTMPPacket_IsReady(packet)) { /* make packet's timestamp absolute */ if (!packet->m_hasAbsTimestamp) packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */ r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp; /* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */ /* arrives and requests to re-use some info (small packet header) */ r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL; r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0; r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */ } else { packet->m_body = NULL; /* so it won't be erased on free */ } return TRUE; }
函數代碼看似很多,但是並不是很復雜,可以理解為在從事“簡單重復性勞動”(和搬磚差不多)。基本上是一個字節一個字節的讀取,然后按照RTMP協議規范進行解析。具體如何解析可以參考RTMP協議規范。
在RTMP_ReadPacket()函數里完成從Socket中讀取數據的函數是ReadN(),繼續看看它的源代碼:
//從HTTP或SOCKET中讀取數據 static int ReadN(RTMP *r, char *buffer, int n) { int nOriginalSize = n; int avail; char *ptr; r->m_sb.sb_timedout = FALSE; #ifdef _DEBUG memset(buffer, 0, n); #endif ptr = buffer; while (n > 0) { int nBytes = 0, nRead; if (r->Link.protocol & RTMP_FEATURE_HTTP) { while (!r->m_resplen) { if (r->m_sb.sb_size < 144) { if (!r->m_unackd) HTTP_Post(r, RTMPT_IDLE, "", 1); if (RTMPSockBuf_Fill(&r->m_sb) < 1) { if (!r->m_sb.sb_timedout) RTMP_Close(r); return 0; } } HTTP_read(r, 0); } if (r->m_resplen && !r->m_sb.sb_size) RTMPSockBuf_Fill(&r->m_sb); avail = r->m_sb.sb_size; if (avail > r->m_resplen) avail = r->m_resplen; } else { avail = r->m_sb.sb_size; if (avail == 0) { if (RTMPSockBuf_Fill(&r->m_sb) < 1) { if (!r->m_sb.sb_timedout) RTMP_Close(r); return 0; } avail = r->m_sb.sb_size; } } nRead = ((n < avail) ? n : avail); if (nRead > 0) { memcpy(ptr, r->m_sb.sb_start, nRead); r->m_sb.sb_start += nRead; r->m_sb.sb_size -= nRead; nBytes = nRead; r->m_nBytesIn += nRead; if (r->m_bSendCounter && r->m_nBytesIn > r->m_nBytesInSent + r->m_nClientBW / 2) SendBytesReceived(r); } /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */ #ifdef _DEBUG fwrite(ptr, 1, nBytes, netstackdump_read); #endif if (nBytes == 0) { RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__); /*goto again; */ RTMP_Close(r); break; } if (r->Link.protocol & RTMP_FEATURE_HTTP) r->m_resplen -= nBytes; #ifdef CRYPTO if (r->Link.rc4keyIn) { RC4_encrypt((RC4_KEY *)r->Link.rc4keyIn, nBytes, ptr); } #endif n -= nBytes; ptr += nBytes; } return nOriginalSize - n; }
ReadN()中實現從Socket中接收數據的函數是RTMPSockBuf_Fill(),看看代碼吧(又是層層調用)。
//調用Socket編程中的recv()函數,接收數據 int RTMPSockBuf_Fill(RTMPSockBuf *sb) { int nBytes; if (!sb->sb_size) sb->sb_start = sb->sb_buf; while (1) { //緩沖區長度:總長-未處理字節-已處理字節 //|-----已處理--------|-----未處理--------|---------緩沖區----------| //sb_buf sb_start sb_size nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf); #if defined(CRYPTO) && !defined(NO_SSL) if (sb->sb_ssl) { nBytes = TLS_read((SSL *)sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes); } else #endif { //int recv( SOCKET s, char * buf, int len, int flags); //s:一個標識已連接套接口的描述字。 //buf:用於接收數據的緩沖區。 //len:緩沖區長度。 //flags:指定調用方式。 //從sb_start(待處理的下一字節) + sb_size()還未處理的字節開始buffer為空,可以存儲 nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0); } if (nBytes != -1) { //未處理的字節又多了 sb->sb_size += nBytes; } else { int sockerr = GetSockError(); RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)", __FUNCTION__, nBytes, sockerr, strerror(sockerr)); if (sockerr == EINTR && !RTMP_ctrlC) continue; if (sockerr == EWOULDBLOCK || sockerr == EAGAIN) { sb->sb_timedout = TRUE; nBytes = 0; } } break; } return nBytes; }
從RTMPSockBuf_Fill()代碼中可以看出,調用了系統Socket的recv()函數接收RTMP連接傳輸過來的數據。
10: 處理各種消息(Message)
已經連續寫了一系列的博客了,其實大部分內容都是去年搞RTMP研究的時候積累的經驗,回顧一下過去的知識,其實 RTMPdump(libRTMP)主要的功能也都分析的差不多了,現在感覺還需要一些查漏補缺。主要就是它是如何處理各種消息(Message)的這方面還沒有研究的特明白,在此需要詳細研究一下。
再來看一下RTMPdump(libRTMP)的“靈魂”函數RTMP_ClientPacket(),主要完成了各種消息的處理。
//處理接收到的數據 int RTMP_ClientPacket(RTMP *r, RTMPPacket *packet) { int bHasMediaPacket = 0; switch (packet->m_packetType) { //RTMP消息類型ID=1,設置塊大小 case 0x01: /* chunk size */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Set Chunk Size (typeID=1)。"); //----------------------------- RTMP_LogPrintf("處理消息 Set Chunk Size (typeID=1)\n"); HandleChangeChunkSize(r, packet); break; //RTMP消息類型ID=3,致謝 case 0x03: /* bytes read report */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__); break; //RTMP消息類型ID=4,用戶控制 case 0x04: /* ctrl */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 User Control (typeID=4)。"); //----------------------------- RTMP_LogPrintf("處理消息 User Control (typeID=4)\n"); HandleCtrl(r, packet); break; //RTMP消息類型ID=5 case 0x05: /* server bw */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Window Acknowledgement Size (typeID=5)。"); //----------------------------- RTMP_LogPrintf("處理消息 Window Acknowledgement Size (typeID=5)\n"); HandleServerBW(r, packet); break; //RTMP消息類型ID=6 case 0x06: /* client bw */ //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 Set Peer Bandwidth (typeID=6)。"); //----------------------------- RTMP_LogPrintf("處理消息 Set Peer Bandwidth (typeID=6)\n"); HandleClientBW(r, packet); break; //RTMP消息類型ID=8,音頻數據 case 0x08: /* audio data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleAudio(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; //RTMP消息類型ID=9,視頻數據 case 0x09: /* video data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleVideo(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; //RTMP消息類型ID=15,AMF3編碼,忽略 case 0x0F: /* flex stream send */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex stream send, size %lu bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; //RTMP消息類型ID=16,AMF3編碼,忽略 case 0x10: /* flex shared object */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex shared object, size %lu bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; //RTMP消息類型ID=17,AMF3編碼,忽略 case 0x11: /* flex message */ { RTMP_Log(RTMP_LOGDEBUG, "%s, flex message, size %lu bytes, not fully supported", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ /* some DEBUG code */ #if 0 RTMP_LIB_AMFObject obj; int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1); if(nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__); /*return; */ } obj.Dump(); #endif if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1) bHasMediaPacket = 2; break; } //RTMP消息類型ID=18,AMF0編碼,數據消息 case 0x12: /* metadata (notify) */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %lu bytes", __FUNCTION__, packet->m_nBodySize); //處理元數據,暫時注釋 /* if (HandleMetadata(r, packet->m_body, packet->m_nBodySize)) bHasMediaPacket = 1; break; */ //RTMP消息類型ID=19,AMF0編碼,忽略 case 0x13: RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring", __FUNCTION__); break; //RTMP消息類型ID=20,AMF0編碼,命令消息 //處理命令消息! case 0x14: //---------------- r->dlg->AppendCInfo("處理收到的數據。消息 命令 (AMF0編碼) (typeID=20)。"); //----------------------------- /* invoke */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %lu bytes", __FUNCTION__, packet->m_nBodySize); RTMP_LogPrintf("處理命令消息 (typeID=20,AMF0編碼)\n"); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1) bHasMediaPacket = 2; break; //RTMP消息類型ID=22 case 0x16: { /* go through FLV packets and handle metadata packets */ unsigned int pos = 0; uint32_t nTimeStamp = packet->m_nTimeStamp; while (pos + 11 < packet->m_nBodySize) { uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */ if (pos + 11 + dataSize + 4 > packet->m_nBodySize) { RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!"); break; } if (packet->m_body[pos] == 0x12) { HandleMetadata(r, packet->m_body + pos + 11, dataSize); } else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9) { nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4); nTimeStamp |= (packet->m_body[pos + 7] << 24); } pos += (11 + dataSize + 4); } if (!r->m_pausing) r->m_mediaStamp = nTimeStamp; /* FLV tag(s) */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */ bHasMediaPacket = 1; break; } default: RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__, packet->m_packetType); #ifdef _DEBUG RTMP_LogHex(RTMP_LOGDEBUG, (const uint8_t *)packet->m_body, packet->m_nBodySize); #endif } return bHasMediaPacket; }
前文已經分析過當消息類型ID為0x14(20)的時候,即AMF0編碼的命令消息的時候,會調用HandleInvoke()進行處理。
這里就不再對這種類型ID的消息進行分析了,分析一下其他類型的消息,畢竟從發起一個RTMP連接到接收視音頻數據這個過程中是要處理很多消息的。
參考:RTMP流媒體播放過程
下面我們按照消息ID從小到大的順序,看看接收到的各種消息都是如何處理的。
消息類型ID是0x01的消息功能是“設置塊(Chunk)大小”,處理函數是HandleChangeChunkSize(),可見函數內容很簡單。
static void HandleChangeChunkSize(RTMP *r, const RTMPPacket *packet) { if (packet->m_nBodySize >= 4) { r->m_inChunkSize = AMF_DecodeInt32(packet->m_body); RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r->m_inChunkSize); } }
消息類型ID是0x03的消息功能是“致謝”,沒有處理函數。
消息類型ID是0x04的消息功能是“用戶控制(UserControl)”,處理函數是HandleCtrl(),這類的消息出現的頻率非常高,函數體如下所示。具體用戶控制消息的作用這里就不多說了,有相應的文檔可以參考。
注:該函數中間有一段很長的英文注釋,英語好的大神可以看一看
//處理用戶控制(UserControl)消息。用戶控制消息是服務器端發出的。 static void HandleCtrl(RTMP *r, const RTMPPacket *packet) { short nType = -1; unsigned int tmp; if (packet->m_body && packet->m_nBodySize >= 2) //事件類型(2B) nType = AMF_DecodeInt16(packet->m_body); RTMP_Log(RTMP_LOGDEBUG, "%s, received ctrl. type: %d, len: %d", __FUNCTION__, nType, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ if (packet->m_nBodySize >= 6) { //不同事件類型做不同處理 switch (nType) { //流開始 case 0: //流ID tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream Begin %d", __FUNCTION__, tmp); break; //流結束 case 1: //流ID tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream EOF %d", __FUNCTION__, tmp); if (r->m_pausing == 1) r->m_pausing = 2; break; //流枯竭 case 2: //流ID tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream Dry %d", __FUNCTION__, tmp); break; //是錄制流 case 4: tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream IsRecorded %d", __FUNCTION__, tmp); break; //Ping客戶端 case 6: /* server ping. reply with pong. */ tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Ping %d", __FUNCTION__, tmp); RTMP_SendCtrl(r, 0x07, tmp, 0); break; /* FMS 3.5 servers send the following two controls to let the client * know when the server has sent a complete buffer. I.e., when the * server has sent an amount of data equal to m_nBufferMS in duration. * The server meters its output so that data arrives at the client * in realtime and no faster. * * The rtmpdump program tries to set m_nBufferMS as large as * possible, to force the server to send data as fast as possible. * In practice, the server appears to cap this at about 1 hour's * worth of data. After the server has sent a complete buffer, and * sends this BufferEmpty message, it will wait until the play * duration of that buffer has passed before sending a new buffer. * The BufferReady message will be sent when the new buffer starts. * (There is no BufferReady message for the very first buffer; * presumably the Stream Begin message is sufficient for that * purpose.) * * If the network speed is much faster than the data bitrate, then * there may be long delays between the end of one buffer and the * start of the next. * * Since usually the network allows data to be sent at * faster than realtime, and rtmpdump wants to download the data * as fast as possible, we use this RTMP_LF_BUFX hack: when we * get the BufferEmpty message, we send a Pause followed by an * Unpause. This causes the server to send the next buffer immediately * instead of waiting for the full duration to elapse. (That's * also the purpose of the ToggleStream function, which rtmpdump * calls if we get a read timeout.) * * Media player apps don't need this hack since they are just * going to play the data in realtime anyway. It also doesn't work * for live streams since they obviously can only be sent in * realtime. And it's all moot if the network speed is actually * slower than the media bitrate. */ case 31: tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream BufferEmpty %d", __FUNCTION__, tmp); if (!(r->Link.lFlags & RTMP_LF_BUFX)) break; if (!r->m_pausing) { r->m_pauseStamp = r->m_channelTimestamp[r->m_mediaChannel]; RTMP_SendPause(r, TRUE, r->m_pauseStamp); r->m_pausing = 1; } else if (r->m_pausing == 2) { RTMP_SendPause(r, FALSE, r->m_pauseStamp); r->m_pausing = 3; } break; case 32: tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream BufferReady %d", __FUNCTION__, tmp); break; default: tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream xx %d", __FUNCTION__, tmp); break; } } if (nType == 0x1A) { RTMP_Log(RTMP_LOGDEBUG, "%s, SWFVerification ping received: ", __FUNCTION__); if (packet->m_nBodySize > 2 && packet->m_body[2] > 0x01) { RTMP_Log(RTMP_LOGERROR, "%s: SWFVerification Type %d request not supported! Patches welcome...", __FUNCTION__, packet->m_body[2]); } #ifdef CRYPTO /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ /* respond with HMAC SHA256 of decompressed SWF, key is the 30byte player key, also the last 30 bytes of the server handshake are applied */ else if (r->Link.SWFSize) { RTMP_SendCtrl(r, 0x1B, 0, 0); } else { RTMP_Log(RTMP_LOGERROR, "%s: Ignoring SWFVerification request, use --swfVfy!", __FUNCTION__); } #else RTMP_Log(RTMP_LOGERROR, "%s: Ignoring SWFVerification request, no CRYPTO support!", __FUNCTION__); #endif } }
消息類型ID是0x05的消息功能是“窗口致謝大小(Window Acknowledgement Size,翻譯的真是挺別扭)”,處理函數是HandleServerBW()。在這里注意一下,該消息在Adobe官方公開的文檔中叫“Window Acknowledgement Size”,但是在Adobe公開協議規范之前,破解RTMP協議的組織一直管該協議叫“ServerBW”,只是個稱呼,倒是也無所謂~處理代碼很簡單:
static void HandleServerBW(RTMP *r, const RTMPPacket *packet) { r->m_nServerBW = AMF_DecodeInt32(packet->m_body); RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r->m_nServerBW); }
消息類型ID是0x06的消息功能是“設置對等端帶寬(Set Peer Bandwidth)”,處理函數是HandleClientBW()。與上一種消息一樣,該消息在Adobe官方公開的文檔中叫“Set Peer Bandwidth”,但是在Adobe公開協議規范之前,破解RTMP協議的組織一直管該協議叫“ClientBW”。處理函數也不復雜:
static void HandleClientBW(RTMP *r, const RTMPPacket *packet) { r->m_nClientBW = AMF_DecodeInt32(packet->m_body); if (packet->m_nBodySize > 4) r->m_nClientBW2 = packet->m_body[4]; else r->m_nClientBW2 = -1; RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r->m_nClientBW, r->m_nClientBW2); }
消息類型ID是0x08的消息用於傳輸音頻數據,在這里不處理。
消息類型ID是0x09的消息用於傳輸音頻數據,在這里不處理。
消息類型ID是0x0F-11的消息用於傳輸AMF3編碼的命令。
消息類型ID是0x12-14的消息用於傳輸AMF0編碼的命令。
注:消息類型ID是0x14的消息很重要,用於傳輸AMF0編碼的命令,已經做過分析。