Live555不僅實現了RTSP服務器端,還實現了RTSP客戶端,我們通過testRTSPClient.cpp這個程序來看一下,Live555的RTSP客戶端與服務器端建立RTSP連接的過程。
首先來看一下main函數:
1 char eventLoopWatchVariable = 0; 2 3 int main(int argc, char** argv) { 4 // Begin by setting up our usage environment: 5 TaskScheduler* scheduler = BasicTaskScheduler::createNew(); 6 UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); 7 8 // We need at least one "rtsp://" URL argument: 9 if (argc < 2) { 10 usage(*env, argv[0]); 11 return 1; 12 } 13 14 // There are argc-1 URLs: argv[1] through argv[argc-1]. Open and start streaming each one: 15 for (int i = 1; i <= argc-1; ++i) { 16 openURL(*env, argv[0], argv[i]); 17 } 18 19 // All subsequent activity takes place within the event loop: 20 env->taskScheduler().doEventLoop(&eventLoopWatchVariable); 21 // This function call does not return, unless, at some point in time, "eventLoopWatchVariable" gets set to something non-zero. 22 23 return 0; 24 25 // If you choose to continue the application past this point (i.e., if you comment out the "return 0;" statement above), 26 // and if you don't intend to do anything more with the "TaskScheduler" and "UsageEnvironment" objects, 27 // then you can also reclaim the (small) memory used by these objects by uncommenting the following code: 28 /* 29 env->reclaim(); env = NULL; 30 delete scheduler; scheduler = NULL; 31 */ 32 }
和testOnDeamandRTSPServer.cpp一樣,首先也是創建TaskScheduler對象和UsageEnvironment對象,然后調用openURL函數去請求某個媒體資源,參數是該媒體資源的RTSP地址,最后使程序進入主循環。
1 void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL) { 2 // Begin by creating a "RTSPClient" object. Note that there is a separate "RTSPClient" object for each stream that we wish 3 // to receive (even if more than stream uses the same "rtsp://" URL). 4 RTSPClient* rtspClient = ourRTSPClient::createNew(env, rtspURL, RTSP_CLIENT_VERBOSITY_LEVEL, progName); 5 if (rtspClient == NULL) { 6 env << "Failed to create a RTSP client for URL \"" << rtspURL << "\": " << env.getResultMsg() << "\n"; 7 return; 8 } 9 10 ++rtspClientCount; 11 12 // Next, send a RTSP "DESCRIBE" command, to get a SDP description for the stream. 13 // Note that this command - like all RTSP commands - is sent asynchronously; we do not block, waiting for a response. 14 // Instead, the following function call returns immediately, and we handle the RTSP response later, from within the event loop: 15 rtspClient->sendDescribeCommand(continueAfterDESCRIBE); //發送DESCRIBE命令,並傳入回調函數 16 }
OpenURL函數很簡單,創建一個RTSPClient對象,一個RTSPClient對象代表一個RTSP客戶端。然后調用sendDescribeCommand函數發送DESCRIBE命令,回調函數是continueAfterDESCRIBE函數,在收到RTSP服務器端對DESCRIBE命令的回復時調用。
1 void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) { 2 do { 3 UsageEnvironment& env = rtspClient->envir(); // alias 4 StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias 5 6 if (resultCode != 0) { // 返回結果碼非0表示出錯 7 env << *rtspClient << "Failed to get a SDP description: " << resultString << "\n"; 8 delete[] resultString; 9 break; 10 } 11 // resultString即從服務器端返回的SDP信息字符串 12 char* const sdpDescription = resultString; 13 env << *rtspClient << "Got a SDP description:\n" << sdpDescription << "\n"; 14 15 // Create a media session object from this SDP description: 16 scs.session = MediaSession::createNew(env, sdpDescription); //根據SDP信息創建一個MediaSession對象 17 delete[] sdpDescription; // because we don't need it anymore 18 if (scs.session == NULL) { 19 env << *rtspClient << "Failed to create a MediaSession object from the SDP description: " << env.getResultMsg() << "\n"; 20 break; 21 } else if (!scs.session->hasSubsessions()) { 22 env << *rtspClient << "This session has no media subsessions (i.e., no \"m=\" lines)\n"; 23 break; 24 } 25 26 // Then, create and set up our data source objects for the session. We do this by iterating over the session's 'subsessions', 27 // calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one. 28 // (Each 'subsession' will have its own data source.) 29 scs.iter = new MediaSubsessionIterator(*scs.session); 30 setupNextSubsession(rtspClient); //開始對服務器端的每個ServerMediaSubsession發送SETUP命令請求建立連接 31 return; 32 } while (0); 33 34 // An unrecoverable error occurred with this stream. 35 shutdownStream(rtspClient); 36 }
客戶端收到服務器端對DESCRIBE命令的回復,取得SDP信息后,客戶端創建一個MediaSession對象。MediaSession和ServerMediaSession是相對應的概念,MediaSession表示客戶端請求服務器端某個媒體資源的會話,類似地,客戶端還存在與ServerMediaSubsession相對應的MediaSubsession,表示MediaSession的子會話,創建MediaSession的同時也創建了包含的MediaSubsession對象。然后客戶端對服務器端的每個ServerMediaSubsession發送SETUP命令請求建立連接。
1 void setupNextSubsession(RTSPClient* rtspClient) { 2 UsageEnvironment& env = rtspClient->envir(); // alias 3 StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias 4 5 scs.subsession = scs.iter->next(); 6 if (scs.subsession != NULL) { 7 if (!scs.subsession->initiate()) { // 調用initiate函數初始化MediaSubsession對象 8 env << *rtspClient << "Failed to initiate the \"" << *scs.subsession << "\" subsession: " << env.getResultMsg() << "\n"; 9 setupNextSubsession(rtspClient); // give up on this subsession; go to the next one 10 } else { 11 env << *rtspClient << "Initiated the \"" << *scs.subsession << "\" subsession ("; 12 if (scs.subsession->rtcpIsMuxed()) { 13 env << "client port " << scs.subsession->clientPortNum(); 14 } else { 15 env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1; 16 } 17 env << ")\n"; 18 // 發送SETUP命令 19 // Continue setting up this subsession, by sending a RTSP "SETUP" command: 20 rtspClient->sendSetupCommand(*scs.subsession, continueAfterSETUP, False, REQUEST_STREAMING_OVER_TCP); 21 } 22 return; 23 } 24 // 成功與所有的ServerMediaSubsession建立了連接,現在發送PLAY命令 25 // We've finished setting up all of the subsessions. Now, send a RTSP "PLAY" command to start the streaming: 26 if (scs.session->absStartTime() != NULL) { 27 // Special case: The stream is indexed by 'absolute' time, so send an appropriate "PLAY" command: 28 rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY, scs.session->absStartTime(), scs.session->absEndTime()); 29 } else { 30 scs.duration = scs.session->playEndTime() - scs.session->playStartTime(); 31 rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY); 32 } 33 } 34 35 void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) { 36 do { 37 UsageEnvironment& env = rtspClient->envir(); // alias 38 StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias 39 40 if (resultCode != 0) { 41 env << *rtspClient << "Failed to set up the \"" << *scs.subsession << "\" subsession: " << resultString << "\n"; 42 break; 43 } 44 45 env << *rtspClient << "Set up the \"" << *scs.subsession << "\" subsession ("; 46 if (scs.subsession->rtcpIsMuxed()) { 47 env << "client port " << scs.subsession->clientPortNum(); 48 } else { 49 env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1; 50 } 51 env << ")\n"; 52 53 // Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it. 54 // (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later, 55 // after we've sent a RTSP "PLAY" command.) 56 //對每個MediaSubsession創建一個MediaSink對象來請求和保存數據 57 scs.subsession->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url()); 58 // perhaps use your own custom "MediaSink" subclass instead 59 if (scs.subsession->sink == NULL) { 60 env << *rtspClient << "Failed to create a data sink for the \"" << *scs.subsession 61 << "\" subsession: " << env.getResultMsg() << "\n"; 62 break; 63 } 64 65 env << *rtspClient << "Created a data sink for the \"" << *scs.subsession << "\" subsession\n"; 66 scs.subsession->miscPtr = rtspClient; // a hack to let subsession handle functions get the "RTSPClient" from the subsession 67 scs.subsession->sink->startPlaying(*(scs.subsession->readSource()), 68 subsessionAfterPlaying, scs.subsession); // 調用MediaSink的startPlaying函數准備播放 69 // Also set a handler to be called if a RTCP "BYE" arrives for this subsession: 70 if (scs.subsession->rtcpInstance() != NULL) { 71 scs.subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, scs.subsession); 72 } 73 } while (0); 74 delete[] resultString; 75 76 // Set up the next subsession, if any: 與下一個ServerMediaSubsession建立連接 77 setupNextSubsession(rtspClient); 78 }
setupNextSubsession函數中首先調用MediaSubsession的initiate函數初始化MediaSubsession,然后對ServerMediaSubsession發送SETUP命令,收到回復后回調continueAfterSETUP函數。在continueAfterSETUP函數中,為MediaSubsession創建MediaSink對象來請求和保存服務器端發送的數據,然后調用MediaSink::startPlaying函數開始准備播放對應的ServerMediaSubsession,最后調用setupNextSubsession函數與下一個ServerMediaSubsession建立連接,在setupNextSubsession函數中,會檢查是否與所有的ServerMediaSubsession都建立了連接,是則發送PLAY命令請求開始傳送數據,收到回復則調用continueAfterPLAY函數。
在客戶端發送PLAY命令之前,我們先看一下MediaSubsession::initiate函數的內容:
1 Boolean MediaSubsession::initiate(int useSpecialRTPoffset) { 2 if (fReadSource != NULL) return True; // has already been initiated 3 4 do { 5 if (fCodecName == NULL) { 6 env().setResultMsg("Codec is unspecified"); 7 break; 8 } 9 //創建客戶端socket,包括RTP socket和RTCP socket,准備從服務器端接收數據 10 // Create RTP and RTCP 'Groupsocks' on which to receive incoming data. 11 // (Groupsocks will work even for unicast addresses) 12 struct in_addr tempAddr; 13 tempAddr.s_addr = connectionEndpointAddress(); 14 // This could get changed later, as a result of a RTSP "SETUP" 15 //使用指定的RTP端口和RTCP端口,RTP端口必須是偶數,而RTCP端口必須是(RTP端口+1) 16 if (fClientPortNum != 0 && (honorSDPPortChoice || IsMulticastAddress(tempAddr.s_addr))) { 17 // The sockets' port numbers were specified for us. Use these: 18 Boolean const protocolIsRTP = strcmp(fProtocolName, "RTP") == 0; 19 if (protocolIsRTP && !fMultiplexRTCPWithRTP) { 20 fClientPortNum = fClientPortNum&~1; 21 // use an even-numbered port for RTP, and the next (odd-numbered) port for RTCP 22 } 23 if (isSSM()) { 24 fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, fClientPortNum); 25 } else { 26 fRTPSocket = new Groupsock(env(), tempAddr, fClientPortNum, 255); 27 } 28 if (fRTPSocket == NULL) { 29 env().setResultMsg("Failed to create RTP socket"); 30 break; 31 } 32 33 if (protocolIsRTP) { 34 if (fMultiplexRTCPWithRTP) { 35 // Use the RTP 'groupsock' object for RTCP as well: 36 fRTCPSocket = fRTPSocket; 37 } else { 38 // Set our RTCP port to be the RTP port + 1: 39 portNumBits const rtcpPortNum = fClientPortNum|1; 40 if (isSSM()) { 41 fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum); 42 } else { 43 fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255); 44 } 45 } 46 } 47 } else {
// 選取隨機的RTP端口和RTCP端口 48 // Port numbers were not specified in advance, so we use ephemeral port numbers. 49 // Create sockets until we get a port-number pair (even: RTP; even+1: RTCP). 50 // (However, if we're multiplexing RTCP with RTP, then we create only one socket, 51 // and the port number can be even or odd.) 52 // We need to make sure that we don't keep trying to use the same bad port numbers over 53 // and over again, so we store bad sockets in a table, and delete them all when we're done. 54 HashTable* socketHashTable = HashTable::create(ONE_WORD_HASH_KEYS); 55 if (socketHashTable == NULL) break; 56 Boolean success = False; 57 NoReuse dummy(env()); 58 // ensures that our new ephemeral port number won't be one that's already in use 59 60 while (1) { 61 // Create a new socket: 62 if (isSSM()) { 63 fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, 0); 64 } else { 65 fRTPSocket = new Groupsock(env(), tempAddr, 0, 255); 66 } 67 if (fRTPSocket == NULL) { 68 env().setResultMsg("MediaSession::initiate(): unable to create RTP and RTCP sockets"); 69 break; 70 } 71 72 // Get the client port number: 73 Port clientPort(0); 74 if (!getSourcePort(env(), fRTPSocket->socketNum(), clientPort)) { 75 break; 76 } 77 fClientPortNum = ntohs(clientPort.num()); 78 79 if (fMultiplexRTCPWithRTP) { 80 // Use this RTP 'groupsock' object for RTCP as well: 81 fRTCPSocket = fRTPSocket; 82 success = True; 83 break; 84 } 85 86 // To be usable for RTP, the client port number must be even: 87 if ((fClientPortNum&1) != 0) { // it's odd 88 // Record this socket in our table, and keep trying: 89 unsigned key = (unsigned)fClientPortNum; 90 Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket); 91 delete existing; // in case it wasn't NULL 92 continue; 93 } 94 95 // Make sure we can use the next (i.e., odd) port number, for RTCP: 96 portNumBits rtcpPortNum = fClientPortNum|1; 97 if (isSSM()) { 98 fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum); 99 } else { 100 fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255); 101 } 102 if (fRTCPSocket != NULL && fRTCPSocket->socketNum() >= 0) { 103 // Success! Use these two sockets. 104 success = True; 105 break; 106 } else { 107 // We couldn't create the RTCP socket (perhaps that port number's already in use elsewhere?). 108 delete fRTCPSocket; fRTCPSocket = NULL; 109 110 // Record the first socket in our table, and keep trying: 111 unsigned key = (unsigned)fClientPortNum; 112 Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket); 113 delete existing; // in case it wasn't NULL 114 continue; 115 } 116 } 117 118 // Clean up the socket hash table (and contents): 119 Groupsock* oldGS; 120 while ((oldGS = (Groupsock*)socketHashTable->RemoveNext()) != NULL) { 121 delete oldGS; 122 } 123 delete socketHashTable; 124 125 if (!success) break; // a fatal error occurred trying to create the RTP and RTCP sockets; we can't continue 126 } 127 128 // Try to use a big receive buffer for RTP - at least 0.1 second of 129 // specified bandwidth and at least 50 KB 130 unsigned rtpBufSize = fBandwidth * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes 131 if (rtpBufSize < 50 * 1024) 132 rtpBufSize = 50 * 1024; 133 increaseReceiveBufferTo(env(), fRTPSocket->socketNum(), rtpBufSize); 134 135 if (isSSM() && fRTCPSocket != NULL) { 136 // Special case for RTCP SSM: Send RTCP packets back to the source via unicast: 137 fRTCPSocket->changeDestinationParameters(fSourceFilterAddr,0,~0); 138 } 139 //創建FramedSource對象來請求數據 140 // Create "fRTPSource" and "fReadSource": 141 if (!createSourceObjects(useSpecialRTPoffset)) break; 142 143 if (fReadSource == NULL) { 144 env().setResultMsg("Failed to create read source"); 145 break; 146 } 147 // 創建RTCPInstance對象 148 // Finally, create our RTCP instance. (It starts running automatically) 149 if (fRTPSource != NULL && fRTCPSocket != NULL) { 150 // If bandwidth is specified, use it and add 5% for RTCP overhead. 151 // Otherwise make a guess at 500 kbps. 152 unsigned totSessionBandwidth 153 = fBandwidth ? fBandwidth + fBandwidth / 20 : 500; 154 fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket, 155 totSessionBandwidth, 156 (unsigned char const*) 157 fParent.CNAME(), 158 NULL /* we're a client */, 159 fRTPSource); 160 if (fRTCPInstance == NULL) { 161 env().setResultMsg("Failed to create RTCP instance"); 162 break; 163 } 164 } 165 166 return True; 167 } while (0); 168 169 deInitiate(); 170 fClientPortNum = 0; 171 return False; 172 }
在MediaSubsession::initiate函數中,首先創建了兩個客戶端socket分別用於接收RTP數據和RTCP數據;然后創建FramedSource對象用來從服務器端請求數據,FramedSource對象在createSourceObjects函數中被創建,createSourceObjects根據ServerMediaSubsession資源的不同格式創建不同的FramedSource,我們還是以H264視頻為例,則創建的是H264VideoRTPSource對象;最后還創建了RTCPInstance對象。
接下來,我們繼續看客戶端收到PLAY命令回復后調用continueAfterPLAY函數:
1 void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) { 2 Boolean success = False; 3 4 do { 5 UsageEnvironment& env = rtspClient->envir(); // alias 6 StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias 7 8 if (resultCode != 0) { 9 env << *rtspClient << "Failed to start playing session: " << resultString << "\n"; 10 break; 11 } 12 13 // Set a timer to be handled at the end of the stream's expected duration (if the stream does not already signal its end 14 // using a RTCP "BYE"). This is optional. If, instead, you want to keep the stream active - e.g., so you can later 15 // 'seek' back within it and do another RTSP "PLAY" - then you can omit this code. 16 // (Alternatively, if you don't want to receive the entire stream, you could set this timer for some shorter value.) 17 if (scs.duration > 0) { 18 unsigned const delaySlop = 2; // number of seconds extra to delay, after the stream's expected duration. (This is optional.) 19 scs.duration += delaySlop; 20 unsigned uSecsToDelay = (unsigned)(scs.duration*1000000); 21 scs.streamTimerTask = env.taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)streamTimerHandler, rtspClient); 22 } 23 24 env << *rtspClient << "Started playing session"; 25 if (scs.duration > 0) { 26 env << " (for up to " << scs.duration << " seconds)"; 27 } 28 env << "...\n"; 29 30 success = True; 31 } while (0); 32 delete[] resultString; 33 34 if (!success) { 35 // An unrecoverable error occurred with this stream. 36 shutdownStream(rtspClient); 37 } 38 }
continueAfterPLAY函數的內容很簡單,只是簡單地打印出“Started playing session”。在服務器端收到PLAY命令后,就開始向客戶端發送RTP數據包和RTCP數據包,而客戶端在MediaSink::startPlaying函數中就開始等待接收來自服務器端的視頻數據。
在continueAfterSETUP函數中創建的MediaSink是DummySink對象,DummySink是MediaSink的子類,這個例子中客戶端沒有利用收到的視頻數據,所以叫做DummySink。
客戶端調用MediaSink::startPlaying函數開始接收服務器端的數據,這個函數和之前介紹服務器端建立RTSP連接過程時是同一個函數
1 Boolean MediaSink::startPlaying(MediaSource& source, 2 afterPlayingFunc* afterFunc, 3 void* afterClientData) { 4 // Make sure we're not already being played: 5 if (fSource != NULL) { 6 envir().setResultMsg("This sink is already being played"); 7 return False; 8 } 9 10 // Make sure our source is compatible: 11 if (!sourceIsCompatibleWithUs(source)) { 12 envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!"); 13 return False; 14 } 15 fSource = (FramedSource*)&source; //此處的fSource是之前創立的H264VideoRTPSource對象 16 17 fAfterFunc = afterFunc; 18 fAfterClientData = afterClientData; 19 return continuePlaying(); 20 }
在MediaSink::startPlaying函數中又調用DummySink::continuePlaying函數
1 Boolean DummySink::continuePlaying() { 2 if (fSource == NULL) return False; // sanity check (should not happen) 3 4 // Request the next frame of data from our input source. "afterGettingFrame()" will get called later, when it arrives: 5 fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE, 6 afterGettingFrame, this, 7 onSourceClosure, this); 8 return True; 9 }
在DummySink::continuePlaying函數中通過H264VideoRTPSource對象請求服務器端的數據,H264VideoRTPSource是MultiFramedRTPSource的子類,請求成功后回調DummySink::afterGettingFrame函數。在FramedSource::getNextFrame函數中,調用了MultiFramedRTPSource::doGetNextFrame函數:
1 void MultiFramedRTPSource::doGetNextFrame() { 2 if (!fAreDoingNetworkReads) { 3 // Turn on background read handling of incoming packets: 4 fAreDoingNetworkReads = True; 5 TaskScheduler::BackgroundHandlerProc* handler 6 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler; 7 fRTPInterface.startNetworkReading(handler); //通過RTPInterface對象讀取網絡數據,在服務器端是通過RTPInterface對象發送網絡數據
//讀到數據后回調networkReadHandler函數來處理
8 } 9 10 fSavedTo = fTo; //讀到的數據保存在fTo中 11 fSavedMaxSize = fMaxSize; 12 fFrameSize = 0; // for now 13 fNeedDelivery = True; 14 doGetNextFrame1(); 15 } 16 17 void MultiFramedRTPSource::doGetNextFrame1() { 18 while (fNeedDelivery) { 19 // If we already have packet data available, then deliver it now. 20 Boolean packetLossPrecededThis; 21 BufferedPacket* nextPacket 22 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis); 23 if (nextPacket == NULL) break; 24 25 fNeedDelivery = False; 26 27 if (nextPacket->useCount() == 0) { 28 // Before using the packet, check whether it has a special header 29 // that needs to be processed: 30 unsigned specialHeaderSize; 31 if (!processSpecialHeader(nextPacket, specialHeaderSize)) { 32 // Something's wrong with the header; reject the packet: 33 fReorderingBuffer->releaseUsedPacket(nextPacket); 34 fNeedDelivery = True; 35 break; 36 } 37 nextPacket->skip(specialHeaderSize); 38 } 39 40 // Check whether we're part of a multi-packet frame, and whether 41 // there was packet loss that would render this packet unusable: 42 if (fCurrentPacketBeginsFrame) { 43 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) { 44 // We didn't get all of the previous frame. 45 // Forget any data that we used from it: 46 fTo = fSavedTo; fMaxSize = fSavedMaxSize; 47 fFrameSize = 0; 48 } 49 fPacketLossInFragmentedFrame = False; 50 } else if (packetLossPrecededThis) { 51 // We're in a multi-packet frame, with preceding packet loss 52 fPacketLossInFragmentedFrame = True; 53 } 54 if (fPacketLossInFragmentedFrame) { 55 // This packet is unusable; reject it: 56 fReorderingBuffer->releaseUsedPacket(nextPacket); 57 fNeedDelivery = True; 58 break; 59 } 60 61 // The packet is usable. Deliver all or part of it to our caller: 62 unsigned frameSize; 63 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes, 64 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp, 65 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP, 66 fCurPacketMarkerBit); 67 fFrameSize += frameSize; 68 69 if (!nextPacket->hasUsableData()) { 70 // We're completely done with this packet now 71 fReorderingBuffer->releaseUsedPacket(nextPacket); 72 } 73 74 if (fCurrentPacketCompletesFrame) { // 成功讀到一幀數據 75 // We have all the data that the client wants. 76 if (fNumTruncatedBytes > 0) { 77 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size (" 78 << fSavedMaxSize << "). " 79 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n"; 80 } 81 // Call our own 'after getting' function, so that the downstream object can consume the data: 82 if (fReorderingBuffer->isEmpty()) { 83 // Common case optimization: There are no more queued incoming packets, so this code will not get 84 // executed again without having first returned to the event loop. Call our 'after getting' function 85 // directly, because there's no risk of a long chain of recursion (and thus stack overflow): 86 afterGetting(this); 87 } else { 88 // Special case: Call our 'after getting' function via the event loop. 89 nextTask() = envir().taskScheduler().scheduleDelayedTask(0, 90 (TaskFunc*)FramedSource::afterGetting, this); 91 } 92 } else { 93 // This packet contained fragmented data, and does not complete 94 // the data that the client wants. Keep getting data: 95 fTo += frameSize; fMaxSize -= frameSize; 96 fNeedDelivery = True; 97 } 98 } 99 }
在doGetNextFrame1函數中,若成功讀取到一個完整的幀,則調用Framed::afterGetting函數,進一步回調了DummySink::afterGettingFrame函數
1 void DummySink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, 2 struct timeval presentationTime, unsigned durationInMicroseconds) { 3 DummySink* sink = (DummySink*)clientData; 4 sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); 5 } 6 7 // If you don't want to see debugging output for each received frame, then comment out the following line: 8 #define DEBUG_PRINT_EACH_RECEIVED_FRAME 1 9 10 void DummySink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, 11 struct timeval presentationTime, unsigned /*durationInMicroseconds*/) { 12 // We've just received a frame of data. (Optionally) print out information about it: 13 #ifdef DEBUG_PRINT_EACH_RECEIVED_FRAME 14 if (fStreamId != NULL) envir() << "Stream \"" << fStreamId << "\"; "; 15 envir() << fSubsession.mediumName() << "/" << fSubsession.codecName() << ":\tReceived " << frameSize << " bytes"; 16 if (numTruncatedBytes > 0) envir() << " (with " << numTruncatedBytes << " bytes truncated)"; 17 char uSecsStr[6+1]; // used to output the 'microseconds' part of the presentation time 18 sprintf(uSecsStr, "%06u", (unsigned)presentationTime.tv_usec); 19 envir() << ".\tPresentation time: " << (int)presentationTime.tv_sec << "." << uSecsStr; 20 if (fSubsession.rtpSource() != NULL && !fSubsession.rtpSource()->hasBeenSynchronizedUsingRTCP()) { 21 envir() << "!"; // mark the debugging output to indicate that this presentation time is not RTCP-synchronized 22 } 23 #ifdef DEBUG_PRINT_NPT 24 envir() << "\tNPT: " << fSubsession.getNormalPlayTime(presentationTime); 25 #endif 26 envir() << "\n"; 27 #endif 28 29 // Then continue, to request the next frame of data: 30 continuePlaying(); 31 }
在DummySink::afterGettingFrame函數中只是簡單地打印出了某個MediaSubsession接收到了多少字節的數據,然后接着利用FramedSource去讀取數據。可以看出,在RTSP客戶端,Live555也是在MediaSink和FramedSource之間形成了一個循環,不停地從服務器端讀取數據。