上一篇我們簡單分析了testOnDemandRTSPServer.cpp的main函數,主要步驟是創建RTSPServer,創建ServerMediaSession對象,然后等待RTSP客戶端的連接。接下來我們分析一下Live555中建立RTSP連接的詳細過程,首先我們需要簡單了解一下RTSP協議建立連接的過程:
1.(可選)
RTSP客戶端 —> RTSP服務器端 OPTIONS命令 詢問服務器端有哪些方法可使用
RTSP服務器端 —> RTSP客戶端 回復OPTIONS命令 回復客戶端服務器支持的方法
2. (可選)
RTSP客戶端 —> RTSP服務器端 DESCRIBE命令 請求對某個媒體資源(Live555中用ServerMediaSession表示)的描述信息
RTSP服務器端 —> RTSP客戶端 回復DESCRIBE命令 回復客戶端某個媒體資源的描述信息(即SDP)
3. (必選)
RTSP客戶端 —> RTSP服務器端 SETUP命令 請求建立對某個媒體資源的連接
RTSP服務器端 —> RTSP客戶端 回復SETUP命令 回復建立連接的結果
4. (必選)
RTSP客戶端 —> RTSP服務器端 PLAY命令 請求播放媒體資源
RTSP服務器端 —> RTSP客戶端 回復PLAY命令 回復播放的結果
--------------------RTSP服務器端發送RTP包(封裝了數據)給RTSP客戶端-------------------------------
下面我們從RTSPServer::incomingConnectionHandlerRTSP函數開始,在incomingConnectionHandlerRTSP函數中又調用了RTSPServer::incomingConnectionHandler函數,在這個函數中accept客戶端的TCP連接,然后調用RTSPServer::createNewClientConnection函數創建一個RTSPClientConnection實例,該實例表示一個與客戶端的RTSP連接。
1 RTSPServer::RTSPClientConnection 2 ::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr) 3 : fOurServer(ourServer), fIsActive(True), 4 fClientInputSocket(clientSocket), fClientOutputSocket(clientSocket), fClientAddr(clientAddr), 5 fRecursionCount(0), fOurSessionCookie(NULL) { 6 // Add ourself to our 'client connections' table: 把這個RTSPClientConnection實例添加到RTSPServer的列表中 7 fOurServer.fClientConnections->Add((char const*)this, this); 8 9 // Arrange to handle incoming requests: 10 resetRequestBuffer(); 11 envir().taskScheduler().setBackgroundHandling(fClientInputSocket, SOCKET_READABLE|SOCKET_EXCEPTION, 12 (TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this); 13 }
RTSPClientConnection的構造函數中,將自己添加到RTSPServer的連接列表中,然后將客戶端socket添加到SOCKET SET中,並且設置相應的回調處理函數incomingRequestHandler,然后就開始等待客戶端發送命令。服務器端收到客戶端的命令即回調RTSPClientConnection::incomingRequestHandler來處理。
在RTSPClientConnection::incomingRequestHandler函數中又調用RTSPClientConnection::incomingRequestHandler1函數,在這個函數中,從客戶端socket中讀取數據,讀取的數據存儲在RTSPClientConnection::fRequestBuffer這個數組中,然后調RTSPClientConnection::handleRequestBytes函數處理剛才讀到的數據。handleRequestBytes函數的內容(比較多)主要是分析讀取的數據,提取出命令名等數據,然后根據不同的命令調用不同的函數去處理,將處理后的結果保存在fResponseBuffer這個數組中,然后發送給客戶端。在此,我們假設客戶端跳過OPTINS命令,直接發送DESCRIBE命令請求建立連接,則在handleRequestBytes函數中會調用RTSPClientConnection::handleCmd_DESCRIBE函數來處理,下面來看一下handleCmd_DESCRIBE函數。先說一下urlPreSuffix和urlSuffix吧,假設客戶端請求媒體資源的RTSP地址是rtsp://127.0.0.1:8554/test1/test2/test.264,urlPreSuffix表示的是ip:port之后(不含緊跟的“/”)到最后一個“/”之前的部分,即test1/test2,urlSuffix表示的是最后一個“/”之后(不含緊跟的“/”)的內容,即test.264。
1 void RTSPServer::RTSPClientConnection 2 ::handleCmd_DESCRIBE(char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) { 3 char* sdpDescription = NULL; 4 char* rtspURL = NULL; 5 do { 6 char urlTotalSuffix[RTSP_PARAM_STRING_MAX]; 7 if (strlen(urlPreSuffix) + strlen(urlSuffix) + 2 > sizeof urlTotalSuffix) { 8 handleCmd_bad(); 9 break; 10 } 11 urlTotalSuffix[0] = '\0'; // 拼接urlPreSuffix和urlSuffix,保存在urlTotalSuffix中 12 if (urlPreSuffix[0] != '\0') { 13 strcat(urlTotalSuffix, urlPreSuffix); 14 strcat(urlTotalSuffix, "/"); 15 } 16 strcat(urlTotalSuffix, urlSuffix); 17 18 if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break; 19 20 // We should really check that the request contains an "Accept:" ##### 21 // for "application/sdp", because that's what we're sending back ##### 22 23 // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix": 24 ServerMediaSession* session = fOurServer.lookupServerMediaSession(urlTotalSuffix); // 在RTSPServer中查找對應的ServerMediaSession 25 if (session == NULL) { 26 handleCmd_notFound(); 27 break; 28 } 29 30 // Then, assemble a SDP description for this session: 31 sdpDescription = session->generateSDPDescription(); // 產生SDP描述信息字符串 32 if (sdpDescription == NULL) { 33 // This usually means that a file name that was specified for a 34 // "ServerMediaSubsession" does not exist. 35 setRTSPResponse("404 File Not Found, Or In Incorrect Format"); 36 break; 37 } 38 unsigned sdpDescriptionSize = strlen(sdpDescription); 39 40 // Also, generate our RTSP URL, for the "Content-Base:" header 41 // (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests). 42 rtspURL = fOurServer.rtspURL(session, fClientInputSocket); 43 44 snprintf((char*)fResponseBuffer, sizeof fResponseBuffer, // 構造回復信息 45 "RTSP/1.0 200 OK\r\nCSeq: %s\r\n" 46 "%s" 47 "Content-Base: %s/\r\n" 48 "Content-Type: application/sdp\r\n" 49 "Content-Length: %d\r\n\r\n" 50 "%s", 51 fCurrentCSeq, 52 dateHeader(), 53 rtspURL, 54 sdpDescriptionSize, 55 sdpDescription); 56 } while (0); 57 58 59 60 delete[] sdpDescription; 61 delete[] rtspURL; 62 }
在handleCmd_DESCRIBE函數中,主要調用了ServerMediaSession::generateSDPDescription函數產生SDP信息,ServerMediaSession的SDP信息由每個ServerMediaSubsession的SDP信息構成,然后將產生的SDP回復給客戶端。我們就來看一下generateSDPDescription函數。
1 char* ServerMediaSession::generateSDPDescription() { 2 AddressString ipAddressStr(ourIPAddress(envir())); 3 unsigned ipAddressStrSize = strlen(ipAddressStr.val()); 4 5 // For a SSM sessions, we need a "a=source-filter: incl ..." line also: 6 char* sourceFilterLine; 7 if (fIsSSM) { 8 char const* const sourceFilterFmt = 9 "a=source-filter: incl IN IP4 * %s\r\n" 10 "a=rtcp-unicast: reflection\r\n"; 11 unsigned const sourceFilterFmtSize = strlen(sourceFilterFmt) + ipAddressStrSize + 1; 12 13 sourceFilterLine = new char[sourceFilterFmtSize]; 14 sprintf(sourceFilterLine, sourceFilterFmt, ipAddressStr.val()); 15 } else { 16 sourceFilterLine = strDup(""); 17 } 18 19 char* rangeLine = NULL; // for now 20 char* sdp = NULL; // for now 21 22 do { 23 // Count the lengths of each subsession's media-level SDP lines. 24 // (We do this first, because the call to "subsession->sdpLines()" 25 // causes correct subsession 'duration()'s to be calculated later.)
//首先調用每個ServerMediaSubsession的sdpLines函數,用來計算sdp的長度 26 unsigned sdpLength = 0; 27 ServerMediaSubsession* subsession; 28 for (subsession = fSubsessionsHead; subsession != NULL; 29 subsession = subsession->fNext) { 30 char const* sdpLines = subsession->sdpLines(); 31 if (sdpLines == NULL) continue; // the media's not available 32 sdpLength += strlen(sdpLines); 33 } 34 if (sdpLength == 0) break; // the session has no usable subsessions 35 36 // Unless subsessions have differing durations, we also have a "a=range:" line: // 計算ServerMediaSession的持續時間,該返回值影響a=range字段,該字段決定了該媒體資源是否可以執行快進、快退、任意進度點播,ServerMediaSession的duration由各個ServerMediaSubsession的duration決定。
// ServerMediaSubsession的duration默認實現是返回0,Live555只對部分格式的媒體文件實現了duration函數,如MKV文件的MatroskaFileServerMediaSubsession分析了mkv文件的播放時長
37 float dur = duration(); 38 if (dur == 0.0) { 39 rangeLine = strDup("a=range:npt=0-\r\n"); 40 } else if (dur > 0.0) { 41 char buf[100]; 42 sprintf(buf, "a=range:npt=0-%.3f\r\n", dur); 43 rangeLine = strDup(buf); 44 } else { // subsessions have differing durations, so "a=range:" lines go there 45 rangeLine = strDup(""); 46 } 47 48 char const* const sdpPrefixFmt = 49 "v=0\r\n" 50 "o=- %ld%06ld %d IN IP4 %s\r\n" 51 "s=%s\r\n" 52 "i=%s\r\n" 53 "t=0 0\r\n" 54 "a=tool:%s%s\r\n" 55 "a=type:broadcast\r\n" 56 "a=control:*\r\n" 57 "%s" 58 "%s" 59 "a=x-qt-text-nam:%s\r\n" 60 "a=x-qt-text-inf:%s\r\n" 61 "%s"; 62 sdpLength += strlen(sdpPrefixFmt) 63 + 20 + 6 + 20 + ipAddressStrSize 64 + strlen(fDescriptionSDPString) 65 + strlen(fInfoSDPString) 66 + strlen(libNameStr) + strlen(libVersionStr) 67 + strlen(sourceFilterLine) 68 + strlen(rangeLine) 69 + strlen(fDescriptionSDPString) 70 + strlen(fInfoSDPString) 71 + strlen(fMiscSDPLines); 72 sdpLength += 1000; // in case the length of the "subsession->sdpLines()" calls below change 73 sdp = new char[sdpLength]; 74 if (sdp == NULL) break; 75 76 // Generate the SDP prefix (session-level lines): 77 snprintf(sdp, sdpLength, sdpPrefixFmt, 78 fCreationTime.tv_sec, fCreationTime.tv_usec, // o= <session id> 79 1, // o= <version> // (needs to change if params are modified) 80 ipAddressStr.val(), // o= <address> 81 fDescriptionSDPString, // s= <description> 82 fInfoSDPString, // i= <info> 83 libNameStr, libVersionStr, // a=tool: 84 sourceFilterLine, // a=source-filter: incl (if a SSM session) 85 rangeLine, // a=range: line 86 fDescriptionSDPString, // a=x-qt-text-nam: line 87 fInfoSDPString, // a=x-qt-text-inf: line 88 fMiscSDPLines); // miscellaneous session SDP lines (if any) 89 90 // Then, add the (media-level) lines for each subsession:
// 再次調用每個ServerMediaSubsession的sdpLines函數,這次真正將每個ServerMediaSubsession的sdp信息添加到ServerMediaSession的SDP信息中 91 char* mediaSDP = sdp; 92 for (subsession = fSubsessionsHead; subsession != NULL; 93 subsession = subsession->fNext) { 94 unsigned mediaSDPLength = strlen(mediaSDP); 95 mediaSDP += mediaSDPLength; // 指針后移 96 sdpLength -= mediaSDPLength; 97 if (sdpLength <= 1) break; // the SDP has somehow become too long 98 99 char const* sdpLines = subsession->sdpLines(); 100 if (sdpLines != NULL) snprintf(mediaSDP, sdpLength, "%s", sdpLines); 101 } 102 } while (0); 103 104 delete[] rangeLine; delete[] sourceFilterLine; 105 return sdp; 106 }
到此,服務器端將客戶端請求的SDP信息發送給客戶端,然后等着客戶端發送下一個命令(SETUP命令),在分析服務器端如何處理SETUP命令之前,我們繼續深入看一下服務器端是如何獲得SDP信息的。從generateSDPDescription函數中可以看到,主要是調用了每個ServerMediaSubsession的sdpLines函數,默認實現在OnDemandServerMediaSubsession這個類中,下面我們就來看看OnDemandServerMediaSubsession::sdpLines函數。
1 char const* OnDemandServerMediaSubsession::sdpLines() { 2 if (fSDPLines == NULL) { 3 // We need to construct a set of SDP lines that describe this 4 // subsession (as a unicast stream). To do so, we first create 5 // dummy (unused) source and "RTPSink" objects, 6 // whose parameters we use for the SDP lines:
// 這幾句話的意思是說,為了獲得這個ServerMediaSubsession的sdp信息,我們先創建“虛設的”FramedSource和RTPSink來分析出sdp信息,並非正式的開始播放
7 unsigned estBitrate;
// 創建FramedSource對象,用來獲取數據
//(這里實際調用的是子類H264VideoFileServerMediaSubsession的createNewStreamSource函數,創建的是ByteStreamFileSource,ByteStreamFileSource是FramedSource的子類) 8 FramedSource* inputSource = createNewStreamSource(0, estBitrate); // 創建FramedSource對象,獲取視頻幀數據 9 if (inputSource == NULL) return NULL; // file not found 10 11 struct in_addr dummyAddr; 12 dummyAddr.s_addr = 0; 13 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0); 14 unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
// 創建RTPSink對象,用來保存RTP數據包(這里實際調用的是子類H264VideoFileServerMediaSubsession的createNewRTPSink函數,創建的是H264VideoRTPSink對象,H264VideoRTPSink是RTPSink的子類) 15 RTPSink* dummyRTPSink 16 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
17 if (dummyRTPSink != NULL && dummyRTPSink->estimatedBitrate() > 0) estBitrate = dummyRTPSink->estimatedBitrate(); 18 19 setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate); // 通過RTPSink對象獲得ServerMediaSubsession的sdp信息
20 Medium::close(dummyRTPSink); 21 closeStreamSource(inputSource); 22 } 23 24 return fSDPLines; 25 }
我們再轉到OnDemandServerMediaSubsession::setSDPLinesFromRTPSink函數,在這個函數中,我們通過創建的FramedSource對象和RTPSink對象將文件播放一段以便產生出sdp信息。在此,我要插一下Live555 RTSPServer播放媒體資源的一個大體流程:RTSPServer使用RTPSink獲得和保存RTP包,RTPSink不斷地向FramedSource請求幀數據,FramedSource取得幀數據后就調用回調函數把數據給RTPSink處理,RTPSink在回調函數中將數據發送給客戶端(也可以保存在本地存成文件,即錄像的功能)
1 void OnDemandServerMediaSubsession 2 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) { 3 if (rtpSink == NULL) return; 4
//通過RTPSink獲取各種關於該ServerMediaSubsession的信息,最主要的是獲取auxSDPLine 5 char const* mediaType = rtpSink->sdpMediaType(); 6 unsigned char rtpPayloadType = rtpSink->rtpPayloadType(); 7 AddressString ipAddressStr(fServerAddressForSDP); 8 char* rtpmapLine = rtpSink->rtpmapLine(); 9 char const* rtcpmuxLine = fMultiplexRTCPWithRTP ? "a=rtcp-mux\r\n" : ""; 10 char const* rangeLine = rangeSDPLine(); 11 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource); 12 if (auxSDPLine == NULL) auxSDPLine = ""; 13 14 char const* const sdpFmt = 15 "m=%s %u RTP/AVP %d\r\n" 16 "c=IN IP4 %s\r\n" 17 "b=AS:%u\r\n" 18 "%s" 19 "%s" 20 "%s" 21 "%s" 22 "a=control:%s\r\n"; 23 unsigned sdpFmtSize = strlen(sdpFmt) 24 + strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */ 25 + strlen(ipAddressStr.val()) 26 + 20 /* max int len */ 27 + strlen(rtpmapLine) 28 + strlen(rtcpmuxLine) 29 + strlen(rangeLine) 30 + strlen(auxSDPLine) 31 + strlen(trackId()); 32 char* sdpLines = new char[sdpFmtSize]; 33 sprintf(sdpLines, sdpFmt, 34 mediaType, // m= <media> 35 fPortNumForSDP, // m= <port> 36 rtpPayloadType, // m= <fmt list> 37 ipAddressStr.val(), // c= address 38 estBitrate, // b=AS:<bandwidth> 39 rtpmapLine, // a=rtpmap:... (if present) 40 rtcpmuxLine, // a=rtcp-mux:... (if present) 41 rangeLine, // a=range:... (if present) 42 auxSDPLine, // optional extra SDP line 43 trackId()); // a=control:<track-id> 44 delete[] (char*)rangeLine; delete[] rtpmapLine; 45 46 fSDPLines = strDup(sdpLines); 47 delete[] sdpLines; 48 }
在setSDPLinesFromRTPSink函數中通過RTPSink對象獲得各種信息,最復雜的是獲取auxSDPLine的過程,這個函數在H264VideoFileServerMediaSubsession類中被重寫了,由於我們現在分析的媒體資源是.264文件,所以我們來看一下H264VideoFileServerMediaSubsession::getAuxSDPLine函數:
1 char const* H264VideoFileServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource) { 2 if (fAuxSDPLine != NULL) return fAuxSDPLine; // it's already been set up (for a previous client) 3 4 if (fDummyRTPSink == NULL) { // we're not already setting it up for another, concurrent stream 5 // Note: For H264 video files, the 'config' information ("profile-level-id" and "sprop-parameter-sets") isn't known 6 // until we start reading the file. This means that "rtpSink"s "auxSDPLine()" will be NULL initially, 7 // and we need to start reading data from our file until this changes. 8 fDummyRTPSink = rtpSink; 9 10 // Start reading the file: //調用RTPSink的startPlaying函數來播放,對於文件型的ServerMediaSubsession,Live555的做法是播放一段文件來獲取sdp信息 11 fDummyRTPSink->startPlaying(*inputSource, afterPlayingDummy, this); 12 13 // Check whether the sink's 'auxSDPLine()' is ready: 14 checkForAuxSDPLine(this); 15 } 16 17 envir().taskScheduler().doEventLoop(&fDoneFlag); // fDoneFlag初始值為NULL,讓程序在此循環等待,直到成功分析出sdp信息 18 19 return fAuxSDPLine; 20 }
在這個函數中調用RTPSink的startPlaying函數開始讀取數據,調用H264VideoFileServerMediaSubsession::checkForAuxSDPLine函數來檢查是否已經從讀取的數據中分析出sdp信息,看一下checkForAuxSDPLine函數:
1 static void checkForAuxSDPLine(void* clientData) { 2 H264VideoFileServerMediaSubsession* subsess = (H264VideoFileServerMediaSubsession*)clientData; 3 subsess->checkForAuxSDPLine1(); 4 } 5 6 void H264VideoFileServerMediaSubsession::checkForAuxSDPLine1() { 7 char const* dasl; 8 9 if (fAuxSDPLine != NULL) { //說明已經分析出了sdp信息 10 // Signal the event loop that we're done: 11 setDoneFlag(); // 使程序退出循環等待 12 } // 還沒分析出sdp信息,調用RTPSink的auxSDPLine函數分析sdp信息
else if (fDummyRTPSink != NULL && (dasl = fDummyRTPSink->auxSDPLine()) != NULL) { 13 fAuxSDPLine = strDup(dasl); 14 fDummyRTPSink = NULL; 15 16 // Signal the event loop that we're done: 17 setDoneFlag(); // 分析出了sdp信息,使程序退出循環等待 18 } else if (!fDoneFlag) // 仍然沒有分析出sdp信息,則稍后一會兒再執行checkForAuxSDPLine函數 19 // try again after a brief delay: 20 int uSecsToDelay = 100000; // 100 ms 21 nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, 22 (TaskFunc*)checkForAuxSDPLine, this); 23 } 24 }
上面檢查發現沒有分析出sdp信息后,調用H264VideoRTPSink::auxSDPLine函數再次試圖分析出sdp信息,看看auxSDPLine函數:
1 char const* H264VideoRTPSink::auxSDPLine() { 2 // Generate a new "a=fmtp:" line each time, using our SPS and PPS (if we have them), 3 // otherwise parameters from our framer source (in case they've changed since the last time that 4 // we were called): 5 H264or5VideoStreamFramer* framerSource = NULL; 6 u_int8_t* vpsDummy = NULL; unsigned vpsDummySize = 0; 7 u_int8_t* sps = fSPS; unsigned spsSize = fSPSSize; 8 u_int8_t* pps = fPPS; unsigned ppsSize = fPPSSize; 9 if (sps == NULL || pps == NULL) { 10 // We need to get SPS and PPS from our framer source:
// fOurFragmenter在調用startPlaying函數后被創建
11 if (fOurFragmenter == NULL) return NULL; // we don't yet have a fragmenter (and therefore not a source) 12 framerSource = (H264or5VideoStreamFramer*)(fOurFragmenter->inputSource()); 13 if (framerSource == NULL) return NULL; // we don't yet have a source 14 15 framerSource->getVPSandSPSandPPS(vpsDummy, vpsDummySize, sps, spsSize, pps, ppsSize); 16 if (sps == NULL || pps == NULL) return NULL; // our source isn't ready 17 } 18
// 已經從文件里面成功讀出了數據,接下來就分析sdp信息 19 // Set up the "a=fmtp:" SDP line for this stream: 20 u_int8_t* spsWEB = new u_int8_t[spsSize]; // "WEB" means "Without Emulation Bytes" 21 unsigned spsWEBSize = removeH264or5EmulationBytes(spsWEB, spsSize, sps, spsSize); 22 if (spsWEBSize < 4) { // Bad SPS size => assume our source isn't ready 23 delete[] spsWEB; 24 return NULL; 25 } 26 u_int32_t profileLevelId = (spsWEB[1]<<16) | (spsWEB[2]<<8) | spsWEB[3]; 27 delete[] spsWEB; 28 29 char* sps_base64 = base64Encode((char*)sps, spsSize); 30 char* pps_base64 = base64Encode((char*)pps, ppsSize); 31 32 char const* fmtpFmt = 33 "a=fmtp:%d packetization-mode=1" 34 ";profile-level-id=%06X" 35 ";sprop-parameter-sets=%s,%s\r\n"; 36 unsigned fmtpFmtSize = strlen(fmtpFmt) 37 + 3 /* max char len */ 38 + 6 /* 3 bytes in hex */ 39 + strlen(sps_base64) + strlen(pps_base64); 40 char* fmtp = new char[fmtpFmtSize]; 41 sprintf(fmtp, fmtpFmt, 42 rtpPayloadType(), 43 profileLevelId, 44 sps_base64, pps_base64); 45 46 delete[] sps_base64; 47 delete[] pps_base64; 48 49 delete[] fFmtpSDPLine; fFmtpSDPLine = fmtp; 50 return fFmtpSDPLine; 51 }
至此,RTSPServer成功地處理了客戶端發送來的DESCRIBE命令,將SDP信息回復客戶端。然后,客戶端對應每個ServerMediaSubsession發送一個SETUP命令請求建立與該ServerMediaSubsession的連接,服務器端收到后會調用RTSPClientSession::handleCmd_SETUP函數來處理SETUP命令。RTSPClientSession類是服務器端用來維護和客戶端的一個會話,SETUP命令、PLAY命令、PAUSE命令、TEARDOWN命令等都是在RTSPClientSession中處理的,RTSPClientSession是RTSPClientConnection的內部類,來看一下這個類:
1 // The state of an individual client session (using one or more sequential TCP connections) handled by a RTSP server: 2 class RTSPClientSession { 3 protected: 4 RTSPClientSession(RTSPServer& ourServer, u_int32_t sessionId); 5 virtual ~RTSPClientSession(); 6 7 friend class RTSPServer; 8 friend class RTSPClientConnection; 9 // Make the handler functions for each command virtual, to allow subclasses to redefine them: 10 virtual void handleCmd_SETUP(RTSPClientConnection* ourClientConnection, // 處理SETUP命令 11 char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr); 12 virtual void handleCmd_withinSession(RTSPClientConnection* ourClientConnection, 13 char const* cmdName, 14 char const* urlPreSuffix, char const* urlSuffix, 15 char const* fullRequestStr); 16 virtual void handleCmd_TEARDOWN(RTSPClientConnection* ourClientConnection, // 處理TEARDOWN命令(結束會話) 17 ServerMediaSubsession* subsession); 18 virtual void handleCmd_PLAY(RTSPClientConnection* ourClientConnection, // 處理PLAY命令 19 ServerMediaSubsession* subsession, char const* fullRequestStr); 20 virtual void handleCmd_PAUSE(RTSPClientConnection* ourClientConnection, // 處理PAUSE命令 21 ServerMediaSubsession* subsession); 22 virtual void handleCmd_GET_PARAMETER(RTSPClientConnection* ourClientConnection, 23 ServerMediaSubsession* subsession, char const* fullRequestStr); 24 virtual void handleCmd_SET_PARAMETER(RTSPClientConnection* ourClientConnection, 25 ServerMediaSubsession* subsession, char const* fullRequestStr); 26 protected: 27 UsageEnvironment& envir() { return fOurServer.envir(); } 28 void reclaimStreamStates(); 29 Boolean isMulticast() const { return fIsMulticast; } 30 void noteLiveness(); 31 static void noteClientLiveness(RTSPClientSession* clientSession); // 與客戶端的心跳 32 static void livenessTimeoutTask(RTSPClientSession* clientSession); 33 34 // Shortcuts for setting up a RTSP response (prior to sending it): 35 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr) { ourClientConnection->setRTSPResponse(responseStr); } 36 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId) { ourClientConnection->setRTSPResponse(responseStr, sessionId); } 37 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, contentStr); } 38 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, sessionId, contentStr); } 39 40 protected: 41 RTSPServer& fOurServer; 42 u_int32_t fOurSessionId; 43 ServerMediaSession* fOurServerMediaSession; 44 Boolean fIsMulticast, fStreamAfterSETUP; 45 unsigned char fTCPStreamIdCount; // used for (optional) RTP/TCP 46 Boolean usesTCPTransport() const { return fTCPStreamIdCount > 0; } 47 TaskToken fLivenessCheckTask; 48 unsigned fNumStreamStates; // streamState對象的數目 49 struct streamState { // streamState結構體,保存一個請求的ServerMediaSubsession以及對應的StreamState對象 50 ServerMediaSubsession* subsession; 51 void* streamToken; // streamToken指向一個StreamState對象 52 } * fStreamStates; // fStreamStates是streamState數組 53 }; 54
/* StreamState類從名字上就可以看出是服務器端用來保存對某個ServerMediaSubsession的流化的狀態(包括serverRTPPort、serverRTCPPort、rtpSink、mediaSource等)
當某個ServerMediaSubsession被客戶端請求SETUP時,服務器端會創建一個StreamState對象,並創建相關的服務器端socket、RTPSink、FramedSource為后面的播放做好准備,
在創建一個ServerMediaSubsession對象時(詳情見testOnDemandRTSPServer.cpp的main函數),會傳入reuseFirstSource這個參數。如果reuseFirstSource為true,
則表示對於請求該ServerMediaSubsession的所有客戶端都使用同一個StreamState對象,即服務器端使用同一個RTP端口、RTCP端口、RTPSink、FramedSource來為請求該
ServerMediaSubsession的多個客戶端服務(一對多,節省服務器端資源);而如果reuseFirstSource為false,則服務器端為每個對ServerMediaSubsession的請求創建一個StreamState對象(多對多,需要占用服務器端較多資源) */ 55 class StreamState { // StreamState類,表示服務器端對一個ServerMediaSubsession的一次流化,並保存相關狀態 56 public: 57 StreamState(OnDemandServerMediaSubsession& master, 58 Port const& serverRTPPort, Port const& serverRTCPPort, 59 RTPSink* rtpSink, BasicUDPSink* udpSink, 60 unsigned totalBW, FramedSource* mediaSource, 61 Groupsock* rtpGS, Groupsock* rtcpGS); 62 virtual ~StreamState(); 63 64 void startPlaying(Destinations* destinations, // 開始播放,服務器端在收到PLAY命令后,就是調用各個StreamState的startPlaying函數來開始播放一個ServerMediaSubsession 65 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData, 66 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler, 67 void* serverRequestAlternativeByteHandlerClientData); 68 void pause(); 69 void endPlaying(Destinations* destinations); // 結束播放 70 void reclaim(); 71 72 unsigned& referenceCount() { return fReferenceCount; } // 引用該路流的客戶端數目 73 74 Port const& serverRTPPort() const { return fServerRTPPort; } 75 Port const& serverRTCPPort() const { return fServerRTCPPort; } 76 77 RTPSink* rtpSink() const { return fRTPSink; } 78 79 float streamDuration() const { return fStreamDuration; } 80 81 FramedSource* mediaSource() const { return fMediaSource; } 82 float& startNPT() { return fStartNPT; } 83 84 private: 85 OnDemandServerMediaSubsession& fMaster; 86 Boolean fAreCurrentlyPlaying; 87 unsigned fReferenceCount; 88 89 Port fServerRTPPort, fServerRTCPPort; 90 91 RTPSink* fRTPSink; 92 BasicUDPSink* fUDPSink; 93 94 float fStreamDuration; 95 unsigned fTotalBW; 96 RTCPInstance* fRTCPInstance; 97 98 FramedSource* fMediaSource; 99 float fStartNPT; // initial 'normal play time'; reset after each seek 100 101 Groupsock* fRTPgs; 102 Groupsock* fRTCPgs; 103 };
接下來,在handleCmd_SETUP函數中,服務器首先找到客戶端請求的ServerMediaSession,再找到客戶端請求的ServerMediaSubsession,然后從客戶端的請求中獲取一些客戶端參數(如:客戶端的RTP端口、RTCP端口),最后調用OnDemandServerMediaSubsession::getStreamParameters函數創建RTP連接和RTCP連接。看一下getStreamParameters函數:
1 void OnDemandServerMediaSubsession 2 ::getStreamParameters(unsigned clientSessionId,netAddressBits clientAddress,Port const& clientRTPPort, 3 Port const& clientRTCPPort,int tcpSocketNum,unsigned char rtpChannelId, 4 unsigned char rtcpChannelId,netAddressBits& destinationAddress,u_int8_t& /*destinationTTL*/, 5 Boolean& isMulticast,Port& serverRTPPort,Port& serverRTCPPort,void*& streamToken) { 6 if (destinationAddress == 0) destinationAddress = clientAddress; 7 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress; 8 isMulticast = False; 9 10 if (fLastStreamToken != NULL && fReuseFirstSource) { // 如果fReuseFirstSource為true,則使用之前已經創建的StreamState對象 11 // Special case: Rather than creating a new 'StreamState', 12 // we reuse the one that we've already created: 13 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort(); 14 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort(); 15 ++((StreamState*)fLastStreamToken)->referenceCount(); 16 streamToken = fLastStreamToken; 17 } else { // 對於該ServerMediaSubsession尚未創建StreamState對象,或者fReuseFirstSource為false 18 // Normal case: Create a new media source: 19 unsigned streamBitrate; 20 FramedSource* mediaSource // 創建FramedSource對象,實際調用的是子類的createNewStreamSource函數,對應H264VideoFileServerMediaSubsession創建的是ByteStreamFileSource 21 = createNewStreamSource(clientSessionId, streamBitrate); 22 23 // Create 'groupsock' and 'sink' objects for the destination, 24 // using previously unused server port numbers: 25 RTPSink* rtpSink = NULL; 26 BasicUDPSink* udpSink = NULL; 27 Groupsock* rtpGroupsock = NULL; 28 Groupsock* rtcpGroupsock = NULL; 29 30 if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations 31 portNumBits serverPortNum; 32 if (clientRTCPPort.num() == 0) { 33 // We're streaming raw UDP (not RTP). Create a single groupsock: 34 NoReuse dummy(envir()); // ensures that we skip over ports that are already in use 35 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) { 36 struct in_addr dummyAddr; dummyAddr.s_addr = 0; 37 38 serverRTPPort = serverPortNum; 39 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255); 40 if (rtpGroupsock->socketNum() >= 0) break; // success 41 } 42 43 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock); 44 } else { // 創建兩個服務器端socket用來傳輸RTP包和RTCP包,對應rtpGroupsock和rtcpGroupsock 45 // Normal case: We're streaming RTP (over UDP or TCP). Create a pair of 46 // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even). 47 // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.) 48 NoReuse dummy(envir()); // ensures that we skip over ports that are already in use 49 for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) { 50 struct in_addr dummyAddr; dummyAddr.s_addr = 0; 51 52 serverRTPPort = serverPortNum; 53 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255); 54 if (rtpGroupsock->socketNum() < 0) { 55 delete rtpGroupsock; 56 continue; // try again 57 } 58 59 if (fMultiplexRTCPWithRTP) { 60 // Use the RTP 'groupsock' object for RTCP as well: 61 serverRTCPPort = serverRTPPort; 62 rtcpGroupsock = rtpGroupsock; 63 } else { 64 // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP: 65 serverRTCPPort = ++serverPortNum; 66 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255); 67 if (rtcpGroupsock->socketNum() < 0) { 68 delete rtpGroupsock; 69 delete rtcpGroupsock; 70 continue; // try again 71 } 72 } 73 74 break; // success 75 } 76 77 unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
//創建RTPSink,實際調用的是子類的createNewRTPSink函數,對應H264VideoFileServerMediaSubsession創建的是H264VideoRTPSink 78 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource); 79 if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate(); 80 } 81 82 // Turn off the destinations for each groupsock. They'll get set later 83 // (unless TCP is used instead): 84 85 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations(); 86 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations(); 87 88 if (rtpGroupsock != NULL) { 89 // Try to use a big send buffer for RTP - at least 0.1 second of 90 // specified bandwidth and at least 50 KB 91
92 unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes 93 if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024; 94 increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize); 95 } 96 } 97 98 // Set up the state of the stream. The stream will get started later: 99 streamToken = fLastStreamToken 100 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink, 101 streamBitrate, mediaSource, 102 rtpGroupsock, rtcpGroupsock); 103 } 104 105 // Record these destinations as being for this client session id: 106 Destinations* destinations; 107 if (tcpSocketNum < 0) { // UDP 108 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort); 109 } else { // TCP 110 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId); 111 } 112 fDestinationsHashTable->Add((char const*)clientSessionId, destinations); 113 }
經過上面的步驟后,服務器端就已經准備好向客戶端傳送RTP包以及RTCP包了,等待客戶端發送PLAY命令后開始傳輸。服務器端收到PLAY命令后,調用RTSPClientSession::handleCmd_PLAY函數處理。在handleCmd_PLAY函數中,首先提取Scale,表示客戶端期望的播放速度(正常、快進、快退),然后提取Range,表示客戶端期望的播放起止范圍,根據這兩個參數分別調用ServerMediaSubsession::setStreamScale函數和ServerMediaSubsession::seekStream函數,最后調用ServerMediaSubsession::startStream函數開始傳輸數據。實際調用的是OnDemandServerMediaSubsession::startStream函數,看一下這個函數的內容:
1 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,void* streamToken, 2 TaskFunc* rtcpRRHandler,void* rtcpRRHandlerClientData, 3 unsigned short& rtpSeqNum,unsigned& rtpTimestamp, 4 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler, 5 void* serverRequestAlternativeByteHandlerClientData) { 6 StreamState* streamState = (StreamState*)streamToken; 7 Destinations* destinations 8 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); // 查找目的客戶端的地址 9 if (streamState != NULL) { 10 streamState->startPlaying(destinations, 11 rtcpRRHandler, rtcpRRHandlerClientData, 12 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData); //調用StreamState::startPlaying函數開始播放 13 14 RTPSink* rtpSink = streamState->rtpSink(); // alias 15 if (rtpSink != NULL) { 16 rtpSeqNum = rtpSink->currentSeqNo(); 17 rtpTimestamp = rtpSink->presetNextTimestamp(); 18 } 19 } 20 }
在OnDemandServerMediaSubsessionstartStream函數中,主要是調用了StreamState::startPlaying函數,來看一下這個函數:
1 void StreamState 2 ::startPlaying(Destinations* dests, 3 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData, 4 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler, 5 void* serverRequestAlternativeByteHandlerClientData) { 6 if (dests == NULL) return; 7 8 if (fRTCPInstance == NULL && fRTPSink != NULL) { 9 // Create (and start) a 'RTCP instance' for this RTP sink: 10 fRTCPInstance 11 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs, 12 fTotalBW, (unsigned char*)fMaster.fCNAME, 13 fRTPSink, NULL /* we're a server */); 14 // Note: This starts RTCP running automatically 15 } 16 17 if (dests->isTCP) { // 使用TCP傳輸RTP包和RTCP包 18 19 // Change RTP and RTCP to use the TCP socket instead of UDP: 20 if (fRTPSink != NULL) { 21 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId); 22 RTPInterface 23 ::setServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum, 24 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData); 25 // So that we continue to handle RTSP commands from the client 26 } 27 if (fRTCPInstance != NULL) { 28 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId); 29 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId, 30 rtcpRRHandler, rtcpRRHandlerClientData); 31 } 32 } else { // 使用UDP傳輸RTP包和RTCP包 33 34 // Tell the RTP and RTCP 'groupsocks' about this destination 35 // (in case they don't already have it): 36 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); 37 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); 38 if (fRTCPInstance != NULL) { 39 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort, 40 rtcpRRHandler, rtcpRRHandlerClientData); 41 } 42 } 43 44 if (fRTCPInstance != NULL) { 45 // Hack: Send an initial RTCP "SR" packet, before the initial RTP packet, so that receivers will (likely) be able to 46 // get RTCP-synchronized presentation times immediately: 47 fRTCPInstance->sendReport(); 48 } 49 50 if (!fAreCurrentlyPlaying && fMediaSource != NULL) { //調用RTPSink::startPlaying函數開始傳輸數據 51 if (fRTPSink != NULL) { 52 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); 53 fAreCurrentlyPlaying = True; 54 } else if (fUDPSink != NULL) { 55 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); 56 fAreCurrentlyPlaying = True; 57 } 58 } 59 }
在StreamState::startPlaying函數中,將客戶端添加到目的客戶端列表中去,然后調用RTPSink::startPlaying函數,實際調用的是MediaSink::startPlaying函數:
1 Boolean MediaSink::startPlaying(MediaSource& source, 2 afterPlayingFunc* afterFunc, 3 void* afterClientData) { 4 // Make sure we're not already being played: 5 if (fSource != NULL) { 6 envir().setResultMsg("This sink is already being played"); 7 return False; 8 } 9 10 // Make sure our source is compatible: 11 if (!sourceIsCompatibleWithUs(source)) { 12 envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!"); 13 return False; 14 } 15 fSource = (FramedSource*)&source; 16 17 fAfterFunc = afterFunc; // 設置好回調函數后,就調用continuePlaying函數開始播放 18 fAfterClientData = afterClientData; 19 return continuePlaying(); 20 } 21 /* 這里我們的媒體資源是.264文件,對應的是H264VideoFileServerMediaSubsession,對應H264VideoFileServerMediaSubsession創建的是
H264VideoRTPSink對象,H264VideoRTPSink是H264or5VideoRTPSink的子類,因此上面實際調用的是H264or5VideoRTPSink::continuePlaying函數 */ 22 Boolean H264or5VideoRTPSink::continuePlaying() { 23 // First, check whether we have a 'fragmenter' class set up yet. 24 // If not, create it now: 創建一個H264or5Fragmenter對象, 25 if (fOurFragmenter == NULL) { 26 fOurFragmenter = new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize, 27 ourMaxPacketSize() - 12/*RTP hdr size*/); 28 } else { 29 fOurFragmenter->reassignInputSource(fSource); 30 } 31 fSource = fOurFragmenter; // 注意,此處fSource變成了H264or5Fragmenter對象,H264or5Fragmenter是FramedFilter的子類 32 // FramedFilter是FramedSource的子類,從名字可以看出FramedFilter的作用是對FramedSource送來的數據做一些“過濾”,並且FramedFilter的
結果數據還可以給另外一個FramedFilter做進一步的“過濾”,這里類似於Java中的IO裝飾流,使用了裝飾模式。 33 // Then call the parent class's implementation: 34 return MultiFramedRTPSink::continuePlaying(); //調用了MultiFramedRTPSink的continuePlaying函數 35 }
1 Boolean MultiFramedRTPSink::continuePlaying() { 2 // Send the first packet. 3 // (This will also schedule any future sends.) 4 buildAndSendPacket(True); 5 return True; 6 } 7 8 void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) { 9 fIsFirstPacket = isFirstPacket;
// RTP version 2; marker ('M') bit not set (by default; it can be set later) 10 unsigned rtpHdr = 0x80000000; Set up the RTP header: /設置RTP頭部 11 rtpHdr |= (fRTPPayloadType<<16); 12 rtpHdr |= fSeqNo; // sequence number 13 fOutBuf->enqueueWord(rtpHdr); 14 // Note where the RTP timestamp will go. 15 // (We can't fill this in until we start packing payload frames.) 16 fTimestampPosition = fOutBuf->curPacketSize(); 17 fOutBuf->skipBytes(4); // leave a hole for the timestamp 18 fOutBuf->enqueueWord(SSRC()); 19 // Allow for a special, payload-format-specific header following the 20 // RTP header: 21 fSpecialHeaderPosition = fOutBuf->curPacketSize(); 22 fSpecialHeaderSize = specialHeaderSize(); 23 fOutBuf->skipBytes(fSpecialHeaderSize); 24 25 // Begin packing as many (complete) frames into the packet as we can: 26 fTotalFrameSpecificHeaderSizes = 0; 27 fNoFramesLeft = False; 28 fNumFramesUsedSoFar = 0; 29 packFrame(); // 調用packFrame函數 30 } 31 32 void MultiFramedRTPSink::packFrame() { 33 // Get the next frame. 34 35 // First, see if we have an overflow frame that was too big for the last pkt 36 if (fOutBuf->haveOverflowData()) { 37 // Use this frame before reading a new one from the source 38 unsigned frameSize = fOutBuf->overflowDataSize(); 39 struct timeval presentationTime = fOutBuf->overflowPresentationTime(); 40 unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds(); 41 fOutBuf->useOverflowData(); 42 43 afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds); 44 } else { 45 // Normal case: we need to read a new frame from the source 46 if (fSource == NULL) return; 47 48 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize(); 49 fCurFrameSpecificHeaderSize = frameSpecificHeaderSize(); 50 fOutBuf->skipBytes(fCurFrameSpecificHeaderSize); 51 fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; 52 fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(), // 調用FramedSource::getNextFrame函數獲取幀數據保存到fOutBuf中 53 afterGettingFrame, this, ourHandleClosure, this); // 獲取后回調MultiFramedRTPSink::afterGettingFrame函數
//對應H264VideoFileServerMediaSubsession,是在H264or5Fragmenter中回調MultiFramedRTPSink::afterGettingFrame函數 54 } 55 } 56 57 void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize, 58 afterGettingFunc* afterGettingFunc,void* afterGettingClientData, 59 onCloseFunc* onCloseFunc,void* onCloseClientData) { 60 // Make sure we're not already being read: 61 if (fIsCurrentlyAwaitingData) { 62 envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n"; 63 envir().internalError(); 64 } 65 66 fTo = to; 67 fMaxSize = maxSize; 68 fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame() 69 fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame() 70 fAfterGettingFunc = afterGettingFunc; 71 fAfterGettingClientData = afterGettingClientData; 72 fOnCloseFunc = onCloseFunc; 73 fOnCloseClientData = onCloseClientData; 74 fIsCurrentlyAwaitingData = True; 75 76 doGetNextFrame(); // 調用doGetNextFrame函數 77 }
對應H264VideoFileServerMediaSubsession之前創建的FramedSource是ByteStreamFileSource對象,然后又在ByteStreamFileSource外面套了一個H264or5Fragmenter,因此調用的是H264or5Fragmenter::doGetNextFrame函數:
1 void H264or5Fragmenter::doGetNextFrame() { 2 if (fNumValidDataBytes == 1) { 3 // We have no NAL unit data currently in the buffer. Read a new one:
//在這里又調用了ByteStreamFileSource的doGetNextFrame函數,並且設置回調函數為H264or5Fragmenter::afterGettingFrame函數
4 fInputSource->getNextFrame(&fInputBuffer[1], fInputBufferSize - 1,
5 afterGettingFrame, this,
6 FramedSource::handleClosure, this);
7 } else { 8 // We have NAL unit data in the buffer. There are three cases to consider: 9 // 1. There is a new NAL unit in the buffer, and it's small enough to deliver 10 // to the RTP sink (as is). 11 // 2. There is a new NAL unit in the buffer, but it's too large to deliver to 12 // the RTP sink in its entirety. Deliver the first fragment of this data, 13 // as a FU packet, with one extra preceding header byte (for the "FU header"). 14 // 3. There is a NAL unit in the buffer, and we've already delivered some 15 // fragment(s) of this. Deliver the next fragment of this data, 16 // as a FU packet, with two (H.264) or three (H.265) extra preceding header bytes 17 // (for the "NAL header" and the "FU header"). 18 19 if (fMaxSize < fMaxOutputPacketSize) { // shouldn't happen 20 envir() << "H264or5Fragmenter::doGetNextFrame(): fMaxSize (" 21 << fMaxSize << ") is smaller than expected\n"; 22 } else { 23 fMaxSize = fMaxOutputPacketSize; 24 } 25 26 fLastFragmentCompletedNALUnit = True; // by default 27 if (fCurDataOffset == 1) { // case 1 or 2 28 if (fNumValidDataBytes - 1 <= fMaxSize) { // case 1 29 memmove(fTo, &fInputBuffer[1], fNumValidDataBytes - 1); 30 fFrameSize = fNumValidDataBytes - 1; 31 fCurDataOffset = fNumValidDataBytes; 32 } else { // case 2 33 // We need to send the NAL unit data as FU packets. Deliver the first 34 // packet now. Note that we add "NAL header" and "FU header" bytes to the front 35 // of the packet (overwriting the existing "NAL header"). 36 if (fHNumber == 264) { 37 fInputBuffer[0] = (fInputBuffer[1] & 0xE0) | 28; // FU indicator 38 fInputBuffer[1] = 0x80 | (fInputBuffer[1] & 0x1F); // FU header (with S bit) 39 } else { // 265 40 u_int8_t nal_unit_type = (fInputBuffer[1]&0x7E)>>1; 41 fInputBuffer[0] = (fInputBuffer[1] & 0x81) | (49<<1); // Payload header (1st byte) 42 fInputBuffer[1] = fInputBuffer[2]; // Payload header (2nd byte) 43 fInputBuffer[2] = 0x80 | nal_unit_type; // FU header (with S bit) 44 } 45 memmove(fTo, fInputBuffer, fMaxSize); 46 fFrameSize = fMaxSize; 47 fCurDataOffset += fMaxSize - 1; 48 fLastFragmentCompletedNALUnit = False; 49 } 50 } else { // case 3 51 // We are sending this NAL unit data as FU packets. We've already sent the 52 // first packet (fragment). Now, send the next fragment. Note that we add 53 // "NAL header" and "FU header" bytes to the front. (We reuse these bytes that 54 // we already sent for the first fragment, but clear the S bit, and add the E 55 // bit if this is the last fragment.) 56 unsigned numExtraHeaderBytes; 57 if (fHNumber == 264) { 58 fInputBuffer[fCurDataOffset-2] = fInputBuffer[0]; // FU indicator 59 fInputBuffer[fCurDataOffset-1] = fInputBuffer[1]&~0x80; // FU header (no S bit) 60 numExtraHeaderBytes = 2; 61 } else { // 265 62 fInputBuffer[fCurDataOffset-3] = fInputBuffer[0]; // Payload header (1st byte) 63 fInputBuffer[fCurDataOffset-2] = fInputBuffer[1]; // Payload header (2nd byte) 64 fInputBuffer[fCurDataOffset-1] = fInputBuffer[2]&~0x80; // FU header (no S bit) 65 numExtraHeaderBytes = 3; 66 } 67 unsigned numBytesToSend = numExtraHeaderBytes + (fNumValidDataBytes - fCurDataOffset); 68 if (numBytesToSend > fMaxSize) { 69 // We can't send all of the remaining data this time: 70 numBytesToSend = fMaxSize; 71 fLastFragmentCompletedNALUnit = False; 72 } else { 73 // This is the last fragment: 74 fInputBuffer[fCurDataOffset-1] |= 0x40; // set the E bit in the FU header 75 fNumTruncatedBytes = fSaveNumTruncatedBytes; 76 } 77 memmove(fTo, &fInputBuffer[fCurDataOffset-numExtraHeaderBytes], numBytesToSend); 78 fFrameSize = numBytesToSend; 79 fCurDataOffset += numBytesToSend - numExtraHeaderBytes; 80 } 81 82 if (fCurDataOffset >= fNumValidDataBytes) { 83 // We're done with this data. Reset the pointers for receiving new data: 84 fNumValidDataBytes = fCurDataOffset = 1; 85 } 86 87 // Complete delivery to the client: 88 FramedSource::afterGetting(this); // 回調MultiFramedRTPSink::afterGettingFrame函數 89 } 90 }
1 void ByteStreamFileSource::doGetNextFrame() { 2 if (feof(fFid) || ferror(fFid) || (fLimitNumBytesToStream && fNumBytesToStream == 0)) { 3 handleClosure(); 4 return; 5 } 6 7 #ifdef READ_FROM_FILES_SYNCHRONOUSLY 8 doReadFromFile(); // 讀文件 9 #else 10 if (!fHaveStartedReading) { 11 // Await readable data from the file: 12 envir().taskScheduler().turnOnBackgroundReadHandling(fileno(fFid), 13 (TaskScheduler::BackgroundHandlerProc*)&fileReadableHandler, this); 14 fHaveStartedReading = True; 15 } 16 #endif 17 } 18 19 void ByteStreamFileSource::doReadFromFile() { 20 // Try to read as many bytes as will fit in the buffer provided (or "fPreferredFrameSize" if less) 21 if (fLimitNumBytesToStream && fNumBytesToStream < (u_int64_t)fMaxSize) { 22 fMaxSize = (unsigned)fNumBytesToStream; 23 } 24 if (fPreferredFrameSize > 0 && fPreferredFrameSize < fMaxSize) { 25 fMaxSize = fPreferredFrameSize; 26 } 27 #ifdef READ_FROM_FILES_SYNCHRONOUSLY 28 fFrameSize = fread(fTo, 1, fMaxSize, fFid); //調用fread函數讀取數據 29 #else 30 if (fFidIsSeekable) { 31 fFrameSize = fread(fTo, 1, fMaxSize, fFid); 32 } else { 33 // For non-seekable files (e.g., pipes), call "read()" rather than "fread()", to ensure that the read doesn't block: 34 fFrameSize = read(fileno(fFid), fTo, fMaxSize); 35 } 36 #endif 37 if (fFrameSize == 0) { 38 handleClosure(); 39 return; 40 } 41 fNumBytesToStream -= fFrameSize; 42 43 // Set the 'presentation time': 44 if (fPlayTimePerFrame > 0 && fPreferredFrameSize > 0) { 45 if (fPresentationTime.tv_sec == 0 && fPresentationTime.tv_usec == 0) { 46 // This is the first frame, so use the current time: 47 gettimeofday(&fPresentationTime, NULL); 48 } else { 49 // Increment by the play time of the previous data: 50 unsigned uSeconds = fPresentationTime.tv_usec + fLastPlayTime; 51 fPresentationTime.tv_sec += uSeconds/1000000; 52 fPresentationTime.tv_usec = uSeconds%1000000; 53 } 54 55 // Remember the play time of this data: 56 fLastPlayTime = (fPlayTimePerFrame*fFrameSize)/fPreferredFrameSize; 57 fDurationInMicroseconds = fLastPlayTime; 58 } else { 59 // We don't know a specific play time duration for this data, 60 // so just record the current time as being the 'presentation time': 61 gettimeofday(&fPresentationTime, NULL); 62 } 63 //讀取數據后,調用FramedSource::afterGetting函數 64 // Inform the reader that he has data: 65 #ifdef READ_FROM_FILES_SYNCHRONOUSLY 66 // To avoid possible infinite recursion, we need to return to the event loop to do this: 67 nextTask() = envir().taskScheduler().scheduleDelayedTask(0, 68 (TaskFunc*)FramedSource::afterGetting, this); 69 #else 70 // Because the file read was done from the event loop, we can call the 71 // 'after getting' function directly, without risk of infinite recursion: 72 FramedSource::afterGetting(this); 73 #endif 74 } 75 76 void FramedSource::afterGetting(FramedSource* source) { 77 source->fIsCurrentlyAwaitingData = False; 78 // indicates that we can be read again 79 // Note that this needs to be done here, in case the "fAfterFunc" 80 // called below tries to read another frame (which it usually will) 81 82 if (source->fAfterGettingFunc != NULL) { 83 (*(source->fAfterGettingFunc))(source->fAfterGettingClientData, 84 source->fFrameSize, source->fNumTruncatedBytes, 85 source->fPresentationTime, 86 source->fDurationInMicroseconds); 87 } 88 }
在FramedSource::afterGetting函數中調用fAfterGettingFunc函數,對於ByteStreamFileSource對象,fAfterGettingFunc在之前被設置為H264or5Fragmenter::afterGettingFrame函數:
1 void H264or5Fragmenter::afterGettingFrame(void* clientData, unsigned frameSize, 2 unsigned numTruncatedBytes, 3 struct timeval presentationTime, 4 unsigned durationInMicroseconds) { 5 H264or5Fragmenter* fragmenter = (H264or5Fragmenter*)clientData; 6 fragmenter->afterGettingFrame1(frameSize, numTruncatedBytes, presentationTime, 7 durationInMicroseconds); 8 } 9 10 void H264or5Fragmenter::afterGettingFrame1(unsigned frameSize, 11 unsigned numTruncatedBytes, 12 struct timeval presentationTime, 13 unsigned durationInMicroseconds) { 14 fNumValidDataBytes += frameSize; 15 fSaveNumTruncatedBytes = numTruncatedBytes; 16 fPresentationTime = presentationTime; 17 fDurationInMicroseconds = durationInMicroseconds; 18 19 // Deliver data to the client: 20 doGetNextFrame(); 21 }
在H264or5Fragmenter::afterGettingFrame1函數中又調用了H264or5Fragmenter::doGetNextFrame函數,在H264or5Fragmenter::doGetNextFrame函數中,當讀取的幀數據滿足條件時就又回調MultiFrameRTPSink的afterGettingFrame函數。
1 void MultiFramedRTPSink 2 ::afterGettingFrame(void* clientData, unsigned numBytesRead, 3 unsigned numTruncatedBytes, 4 struct timeval presentationTime, 5 unsigned durationInMicroseconds) { 6 MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData; 7 sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, 8 presentationTime, durationInMicroseconds); 9 } 10 11 void MultiFramedRTPSink 12 ::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes, 13 struct timeval presentationTime, 14 unsigned durationInMicroseconds) { 15 if (fIsFirstPacket) { 16 // Record the fact that we're starting to play now: 17 gettimeofday(&fNextSendTime, NULL); 18 } 19 20 fMostRecentPresentationTime = presentationTime; 21 if (fInitialPresentationTime.tv_sec == 0 && fInitialPresentationTime.tv_usec == 0) { 22 fInitialPresentationTime = presentationTime; 23 } 24 25 if (numTruncatedBytes > 0) { //超出緩沖區大小要被舍棄的數據 26 unsigned const bufferSize = fOutBuf->totalBytesAvailable(); 27 envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" 28 << bufferSize << "). " 29 << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least " 30 << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " 31 << OutPacketBuffer::maxSize << ".)\n"; 32 } 33 unsigned curFragmentationOffset = fCurFragmentationOffset; 34 unsigned numFrameBytesToUse = frameSize; 35 unsigned overflowBytes = 0; 36 37 // If we have already packed one or more frames into this packet, 38 // check whether this new frame is eligible to be packed after them. 39 // (This is independent of whether the packet has enough room for this 40 // new frame; that check comes later.) 41 if (fNumFramesUsedSoFar > 0) { // 在這個RTP包中已經包含了若干幀數據 42 if ((fPreviousFrameEndedFragmentation 43 && !allowOtherFramesAfterLastFragment()) 44 || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { //這個RTP包中不允許再添加多余的幀(比如:前面的幀作了結尾標記) 45 // Save away this frame for next time: 46 numFrameBytesToUse = 0; 47 fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize, 48 presentationTime, durationInMicroseconds); 49 } 50 } 51 fPreviousFrameEndedFragmentation = False; 52 53 if (numFrameBytesToUse > 0) { // 允許將這一幀添加到RTP包中,但要檢查大小是否超出了RTP包的剩余空間 54 // Check whether this frame overflows the packet 55 if (fOutBuf->wouldOverflow(frameSize)) { //這一幀數據超出了RTP包剩余空間 56 // Don't use this frame now; instead, save it as overflow data, and 57 // send it in the next packet instead. However, if the frame is too 58 // big to fit in a packet by itself, then we need to fragment it (and 59 // use some of it in this packet, if the payload format permits this.) 60 if (isTooBigForAPacket(frameSize) 61 && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { //發送這一幀數據的一部分 62 // We need to fragment this frame, and use some of it now: 63 overflowBytes = computeOverflowForNewFrame(frameSize); 64 numFrameBytesToUse -= overflowBytes; 65 fCurFragmentationOffset += numFrameBytesToUse; 66 } else { 67 // We don't use any of this frame now: // 不添加這一幀數據 68 overflowBytes = frameSize; 69 numFrameBytesToUse = 0; 70 } 71 fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, // 標記超出幀的位置和大小,以便后面調整packet 72 overflowBytes, presentationTime, durationInMicroseconds); 73 } else if (fCurFragmentationOffset > 0) { 74 // This is the last fragment of a frame that was fragmented over 75 // more than one packet. Do any special handling for this case: 76 fCurFragmentationOffset = 0; 77 fPreviousFrameEndedFragmentation = True; 78 } 79 } 80 81 if (numFrameBytesToUse == 0 && frameSize > 0) { // 讀取適當的數據后,開始發送RTP包 82 // Send our packet now, because we have filled it up: 83 sendPacketIfNecessary();
84 } else { 85 // Use this frame in our outgoing packet: 86 unsigned char* frameStart = fOutBuf->curPtr(); 87 fOutBuf->increment(numFrameBytesToUse); 88 // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes 89 90 // Here's where any payload format specific processing gets done: 91 doSpecialFrameHandling(curFragmentationOffset, frameStart, 92 numFrameBytesToUse, presentationTime, 93 overflowBytes); 94 95 ++fNumFramesUsedSoFar; 96 97 // Update the time at which the next packet should be sent, based 98 // on the duration of the frame that we just packed into it. 99 // However, if this frame has overflow data remaining, then don't 100 // count its duration yet. 101 if (overflowBytes == 0) { 102 fNextSendTime.tv_usec += durationInMicroseconds; 103 fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000; 104 fNextSendTime.tv_usec %= 1000000; 105 } 106 107 // Send our packet now if (i) it's already at our preferred size, or 108 // (ii) (heuristic) another frame of the same size as the one we just 109 // read would overflow the packet, or 110 // (iii) it contains the last fragment of a fragmented frame, and we 111 // don't allow anything else to follow this or 112 // (iv) one frame per packet is allowed: 113 if (fOutBuf->isPreferredSize() 114 || fOutBuf->wouldOverflow(numFrameBytesToUse) 115 || (fPreviousFrameEndedFragmentation && 116 !allowOtherFramesAfterLastFragment()) 117 || !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize, 118 frameSize) ) { 119 // The packet is ready to be sent now 120 sendPacketIfNecessary(); 121 } else { 122 // There's room for more frames; try getting another: 123 packFrame(); // 調用packFrame函數讀取更多的數據 124 } 125 } 126 }
在MultiFramedRTPSink中盡量讀取多的幀之后,調用sendPacketIfNecessary函數發送給客戶端:
1 void MultiFramedRTPSink::sendPacketIfNecessary() { 2 if (fNumFramesUsedSoFar > 0) { 3 // Send the packet: 4 #ifdef TEST_LOSS 5 if ((our_random()%10) != 0) // simulate 10% packet loss ##### 6 #endif 7 if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) { // 通過RTPInterface發送RTP包 8 // if failure handler has been specified, call it 9 if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); 10 } 11 ++fPacketCount; 12 fTotalOctetCount += fOutBuf->curPacketSize(); 13 fOctetCount += fOutBuf->curPacketSize() 14 - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; 15 16 ++fSeqNo; // for next time 17 } 18 19 if (fOutBuf->haveOverflowData() //未發送的幀數據,對RTP包作出調整 20 && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) { 21 // Efficiency hack: Reset the packet start pointer to just in front of 22 // the overflow data (allowing for the RTP header and special headers), 23 // so that we probably don't have to "memmove()" the overflow data 24 // into place when building the next packet: 25 unsigned newPacketStart = fOutBuf->curPacketSize() 26 - (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize()); 27 fOutBuf->adjustPacketStart(newPacketStart); 28 } else { 29 // Normal case: Reset the packet start pointer back to the start: 30 fOutBuf->resetPacketStart(); 31 } 32 fOutBuf->resetOffset(); 33 fNumFramesUsedSoFar = 0; 34 35 if (fNoFramesLeft) { 36 // We're done: 37 onSourceClosure(); 38 } else { 39 // We have more frames left to send. Figure out when the next frame 40 // is due to start playing, then make sure that we wait this long before 41 // sending the next packet. 42 struct timeval timeNow; 43 gettimeofday(&timeNow, NULL); 44 int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; 45 int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec); 46 if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative: 47 uSecondsToGo = 0; 48 } 49 50 // Delay this amount of time: // 准備下一次發送RTP包 51 nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this); 52 } 53 } 54 55 // The following is called after each delay between packet sends: 56 void MultiFramedRTPSink::sendNext(void* firstArg) { 57 MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg; 58 sink->buildAndSendPacket(False); //循環調用buildAndSendPacket 59 }
以上的調用函數過程比較亂,特附一張圖以更清晰地展示以上的流程
服務器端通過RTPSink去讀數據,在RTPSink中又通過FramedSource讀數據,讀完數據后交給RTPSink處理,RTPSink處理完后繼續通過FramedSource讀取數據,如此在RTPSink和FramedSoruce之間形成一個循環,這是Live555讀取發送數據的總體流程。
以上便是從建立RTSP連接到發送RTP數據的流程(以H264文件為例),后面的停止發送數據到斷開連接不再關注和詳述。