Live555 分析(三):客服端


live555的客服端流程:建立任務計划對象--建立環境對象--處理用戶輸入的參數(RTSP地址)--創建RTSPClient實例--發出DESCRIBE--發出SETUP--發出PLAY--進入Loop循環接收數據--發出TEARDOWN結束連接。

可以抽成3個函數接口:rtspOpen rtspRead rtspClose。

首先我們來分析rtspOpen的過程

int rtspOpen(rtsp_object_t *p_obj, int tcpConnect)
{
     ... ...
TRACE1_DEC("BasicTaskScheduler::createNew !!!\n" ); if( ( p_sys->scheduler = BasicTaskScheduler::createNew() ) == NULL ) { TRACE1_DEC("BasicTaskScheduler::createNew failed\n" ); goto error; } if( !( p_sys->env = BasicUsageEnvironment::createNew(*p_sys->scheduler) ) ) { TRACE1_DEC("BasicUsageEnvironment::createNew failed\n"); goto error; } if( ( i_return = Connect( p_obj ) ) != RTSP_SUCCESS ) { TRACE1_DEC( "Failed to connect with %s\n", p_obj->rtspURL ); goto error; } if( p_sys->p_sdp == NULL ) { TRACE1_DEC( "Failed to retrieve the RTSP Session Description\n" ); goto error; } if( ( i_return = SessionsSetup( p_obj ) ) != RTSP_SUCCESS ) { TRACE1_DEC( "Nothing to play for rtsp://%s\n", p_obj->rtspURL ); goto error; } if( ( i_return = Play( p_obj ) ) != RTSP_SUCCESS ) goto error;      ... ... }

1> BasicTaskScheduler::createNew()

2> BasicUsageEnvironment::createNew()

3> connect 

static int Connect( rtsp_object_t *p_demux )
{
     ... ...
sprintf(appName, "LibRTSP%d", p_demux->id); if( ( p_sys->rtsp = RTSPClient::createNew( *p_sys->env, 1, appName, i_http_port ) ) == NULL ) { TRACE1_DEC( "RTSPClient::createNew failed (%s)\n", p_sys->env->getResultMsg() ); i_ret = RTSP_ERROR; goto connect_error; } psz_options = p_sys->rtsp->sendOptionsCmd( p_demux->rtspURL, psz_user, psz_pwd ); if(psz_options == NULL) TRACE1_DEC("RTSP Option commend error!!\n"); delete [] psz_options; p_sdp = p_sys->rtsp->describeURL( p_demux->rtspURL );      ... ... }

  connect中做了三件事:RTSPClient類的實例,發送“OPTIONS”請求,發送“describeURL”請求。

  sendOptionsCmd()函數首先調用openConnectionFromURL()函數進程tcp連接,然后組包發送:

 

OPTIONS rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 493
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

 

  收到服務器的應答:

RTSP/1.0 200 OK
CSeq: 493
Date: Mon, May 26 2014 13:27:07 GMT
Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE

  describeURL()函數首先也會調用openConnectionFromURL()函數進行TCP連接(這里可以看出先發OPTIONS請求,也可以先發describeURL請求),然后組包發送:

DESCRIBE rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 494
Accept: application/sdp
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

  收到服務器應答:

DESCRIBE rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 494
Accept: application/sdp
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)


Received DESCRIBE response: 
RTSP/1.0 200 OK
CSeq: 494
Date: Mon, May 26 2014 13:27:07 GMT
Content-Base: rtsp://192.168.103.51:8552/h264_ch2/
Content-Type: application/sdp
Content-Length: 509

Need to read 509 extra bytes
Read 509 extra bytes: v=0
o=- 1401092685794152 1 IN IP4 192.168.103.51
s=RTSP/RTP stream from NETRA
i=h264_ch2
t=0 0
a=tool:LIVE555 Streaming Media v2008.04.02
a=type:broadcast
a=control:*
a=range:npt=0-
a=x-qt-text-nam:RTSP/RTP stream from NETRA
a=x-qt-text-inf:h264_ch2
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;profile-level-id=000042;sprop-parameter-sets=h264
a=control:track1
m=audio 0 RTP/AVP 96
c=IN IP4 0.0.0.0
a=rtpmap:96 PCMU/48000/2
a=control:track2

4> SessionsSetup

static int SessionsSetup( rtsp_object_t *p_demux )
{
     ... ... 
        //    unsigned const thresh             = 1000000;
        if( !( p_sys->ms = MediaSession::createNew( *p_sys->env, p_sys->p_sdp ) ) )
        {
                TRACE1_DEC( "Could not create the RTSP Session: %s\n", p_sys->env->getResultMsg() );
                return RTSP_ERROR;
        }    

        /* Initialise each media subsession */
        iter = new MediaSubsessionIterator( *p_sys->ms );
        while( ( sub = iter->next() ) != NULL )
        {
               ... ...
                bInit = sub->initiate();

                if( !bInit )
                {
                        TRACE1_DEC( "RTP subsession '%s/%s' failed (%s)\n",
                        sub->mediumName(), sub->codecName(), p_sys->env->getResultMsg() );
                }
                else
                {
              ... ...
                        /* Issue the SETUP */
                        if( p_sys->rtsp )
                        {
                                if( !p_sys->rtsp->setupMediaSubsession( *sub, False, b_rtsp_tcp, False ) )
                                {
                                        /* if we get an unsupported transport error, toggle TCP
                                        * use and try again */
                                        if( !strstr(p_sys->env->getResultMsg(),"461 Unsupported Transport")
                                                || !p_sys->rtsp->setupMediaSubsession( *sub, False, b_rtsp_tcp, False ) )
                                        {
                                                TRACE1_DEC( "SETUP of'%s/%s' failed %s\n", sub->mediumName(), sub->codecName(), p_sys->env->getResultMsg() );
                                                continue;
                                        }
                                }
                        }

               ... .../* Value taken from mplayer */
                        if( !strcmp( sub->mediumName(), "audio" ) )
                        {
                                if( !strcmp( sub->codecName(), "MP4A-LATM" ) )
                                {
                                       ... ...
                                }
                                else if( !strcmp( sub->codecName(), "PCMA" )  || !strcmp( sub->codecName(), "PCMU" ))
                                {
                                        tk->fmt.i_extra = 0;
                                        tk->fmt.i_codec = RTSP_CODEC_PCMU;
                                }
                        } 
                        else if( !strcmp( sub->mediumName(), "video" ) )
                        {
                                if( !strcmp( sub->codecName(), "H264" ) )
                                {
                                       ... ...
                                }
                                else if( !strcmp( sub->codecName(), "MP4V-ES" ) )
                                {
                                        ... ...
                                }                
                                else if( !strcmp( sub->codecName(), "JPEG" ) )
                                {
                                        tk->fmt.i_codec = RTSP_CODEC_MJPG;
                                }                
                        }  
               ... ...         
                }
        }
     ... ...
}

  這個函數做了四件事:創建MediaSession類的實例,創建MediaSubsessionIterator類的實例,MediaSubsession的初始化,發送"SETUP"請求。

  創建MediaSession實例的同時,會調用initializeWithSDP()函數去解析SDP,解析出"s="相對應的fSessionName,解析出"s="相對應的fSessionName,解析出"i="相對應的fSessionDescription,解析出"c="相對應的connectionEndpointName,解析出"a=type:"相對應的fMediaSessionType等等。創建MediaSubsession類的實例,並且加入到fSubsessionsHead鏈表中,從上面的SDP描述來看,有兩個MediaSubsession,一個video,一個audio。

  創建MediaSubsessionIterator類的實例,並且調用reset函數,將fOurSession.fSubsessionsHead賦值給fNextPtr,也就是將鏈表的頭結點賦值給fNextPtr。當執行while循環的時候,執行了兩次,一次video,一次audio。

  initiate函數,根據fSourceFilterAddr來判斷是否是SSM,還是ASM,然后調用Groupsock的不同構造函數來創建實例fRTPSocket、fRTCPSocket;然后根據協議類型fProtocolName(這個值在sdp中的“m=”)來判斷是基於udp還是rtp,我們只分析RTP,如果是RTP,則根據相應的編碼類型fCodecName(這個值在sdp中的“a=rtpmap:”)來判斷相應的fRTPSource,這里我們創建了H264和PCMU的RTPSource實例fRTPSource;創建RTCPInstance類的實例fRTCPInstance。

  setupMediaSubsession()函數,主要是發送“SETUP”請求,通過SDP的描述,知道我們采用的是RTP協議,根據rtspOpen傳入的參數streamUsingTCP來請求rtp是基於udp傳輸,還是tcp傳輸,如果是TCP傳輸,只能是單播,如果udp傳輸,則根據connectionEndpointName和傳入的參數forceMulticastOnUnspecified來判斷是否多播還是單播,我們的服務端值支持單播,而且傳入的參數false,所以這里采用單播;組包發送“SETUP”請求:

SETUP rtsp://192.168.103.51:8552/h264_ch2/track1 RTSP/1.0
CSeq: 495
Transport: RTP/AVP;unicast;client_port=33482-33483
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

   服務器應答:

RTSP/1.0 200 OK
CSeq: 495
Date: Mon, May 26 2014 13:27:07 GMT
Transport: RTP/AVP;unicast;destination=14.214.248.17;source=192.168.103.51;client_port=33482-33483;server_port=6970-6971
Session: 151

  最后,如果采用TCP傳輸,則調用setStreamSocket()->RTPInterface::setStreamSocket()->addStreamSocket()函數將RTSP的socket值fInputSocketNum加入到fTCPStreams鏈表中;如果是UDP傳輸的話,組播地址為空,則用服務端地址保存到fDests中,如果組播地址不為空,則加入組播組。

        ... ...
     if (streamUsingTCP) { // Tell the subsession to receive RTP (and send/receive RTCP) // over the RTSP stream: if (subsession.rtpSource() != NULL) subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId); if (subsession.rtcpInstance() != NULL) subsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId); } else { // Normal case. // Set the RTP and RTCP sockets' destination address and port // from the information in the SETUP response: subsession.setDestinations(fServerAddress); }
... ...

5> play

static int Play( rtsp_object_t *p_demux )
{
    ... ...
    if( p_sys->rtsp )
    {    
        /* The PLAY */
        if( !p_sys->rtsp->playMediaSession( *p_sys->ms, p_sys->i_npt_start, -1, 1 ) )
        {
            TRACE1_DEC( "RTSP PLAY failed %s\n", p_sys->env->getResultMsg() );
            return RTSP_ERROR;;
        }        
    }
    ... ...return RTSP_SUCCESS;    
}

  playMediaSession()函數,就是發送“PLAY”請求:

PLAY rtsp://120.90.0.50:8552/h264_ch2/ RTSP/1.0
CSeq: 497
Session: 151
Range: npt=0.000-
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

 服務器應答:

RTSP/1.0 200 OK
CSeq: 497
Date: Mon, May 26 2014 13:27:07 GMT
Range: npt=0.000-
Session: 151
RTP-Info: url=rtsp://192.168.103.51:8552/h264_ch2/track1;seq=63842;rtptime=1242931431,url=rtsp://192.168.103.51:8552/h264_ch2/track2;seq=432;rtptime=3179210581

接着我們分析rtspRead過程:

int rtspRead(rtsp_object_t *p_obj)
{ 
      ... ...
        if(p_sys != NULL)
        {
                /* First warn we want to read data */
                p_sys->event = 0;    
                for( i = 0; i < p_sys->i_track; i++ )
                {
                        live_track_t *tk = p_sys->track[i];if( tk->waiting == 0 )
                        {
                                tk->waiting = 1;
                                tk->sub->readSource()->getNextFrame( tk->p_buffer, tk->i_buffer,
                                        StreamRead, tk, StreamClose, tk );
                        }        
                }               

                /* Create a task that will be called if we wait more than 300ms */
                task = p_sys->scheduler->scheduleDelayedTask( 300000, TaskInterrupt, p_obj );        

                /* Do the read */
                p_sys->scheduler->doEventLoop( &p_sys->event );

                /* remove the task */
                p_sys->scheduler->unscheduleDelayedTask( task );    

                p_sys->b_error ? ret = RTSP_ERROR : ret = RTSP_SUCCESS;
        }

        return ret;
}

  這個函數首先要知道readSource()函數的fReadSource的值在哪里復制,在前面的initiate()函數里面有:

      
       ... ...
       } else if (strcmp(fCodecName, "H264") == 0) { fReadSource = fRTPSource = H264VideoRTPSource::createNew(env(), fRTPSocket, fRTPPayloadFormat, fRTPTimestampFrequency); } else if (strcmp(fCodecName, "JPEG") == 0) { // motion JPEG           ... ... } else if ( strcmp(fCodecName, "PCMU") == 0 // PCM u-law audio || strcmp(fCodecName, "GSM") == 0 // GSM audio || strcmp(fCodecName, "PCMA") == 0 // PCM a-law audio || strcmp(fCodecName, "L16") == 0 // 16-bit linear audio || strcmp(fCodecName, "MP1S") == 0 // MPEG-1 System Stream || strcmp(fCodecName, "MP2P") == 0 // MPEG-2 Program Stream || strcmp(fCodecName, "L8") == 0 // 8-bit linear audio || strcmp(fCodecName, "G726-16") == 0 // G.726, 16 kbps || strcmp(fCodecName, "G726-24") == 0 // G.726, 24 kbps || strcmp(fCodecName, "G726-32") == 0 // G.726, 32 kbps || strcmp(fCodecName, "G726-40") == 0 // G.726, 40 kbps || strcmp(fCodecName, "SPEEX") == 0 // SPEEX audio ) { createSimpleRTPSource = True; useSpecialRTPoffset = 0; } else if (useSpecialRTPoffset >= 0) {   ... ... } if (createSimpleRTPSource) { char* mimeType = new char[strlen(mediumName()) + strlen(codecName()) + 2] ; sprintf(mimeType, "%s/%s", mediumName(), codecName()); fReadSource = fRTPSource = SimpleRTPSource::createNew(env(), fRTPSocket, fRTPPayloadFormat, fRTPTimestampFrequency, mimeType, (unsigned)useSpecialRTPoffset, doNormalMBitRule); delete[] mimeType; } }

    如果是h264編碼方式,則getNextFrame函數定義在FramedSource::getNextFrame:

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
                afterGettingFunc* afterGettingFunc,
                void* afterGettingClientData,
                onCloseFunc* onCloseFunc,
                void* onCloseClientData) 
{
    // Make sure we're not already being read:
    if (fIsCurrentlyAwaitingData) {
        envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";
        exit(1);
    }

    fTo = to;
    fMaxSize = maxSize;
    fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
    fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
    fAfterGettingFunc = afterGettingFunc;
    fAfterGettingClientData = afterGettingClientData;
    fOnCloseFunc = onCloseFunc;
    fOnCloseClientData = onCloseClientData;
    fIsCurrentlyAwaitingData = True;

    doGetNextFrame();
}

  doGetNextFrame()函數定義在MultiFramedRTPSource::doGetNextFrame():

void MultiFramedRTPSource::doGetNextFrame() 
{
    if (!fAreDoingNetworkReads) {
        // Turn on background read handling of incoming packets:
        fAreDoingNetworkReads = True;
        TaskScheduler::BackgroundHandlerProc* handler = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
                                                   fRTPInterface.startNetworkReading(handler);
    }

    fSavedTo = fTo;
    fSavedMaxSize = fMaxSize;
    fFrameSize = 0; // for now
    fNeedDelivery = True;
    
    doGetNextFrame1();
}

  doGetNextFrame1()函數定義在MultiFramedRTPSource::doGetNextFrame1():

void MultiFramedRTPSource::doGetNextFrame1() 
{
    while (fNeedDelivery) {
        // If we already have packet data available, then deliver it now.
        Boolean packetLossPrecededThis;
        BufferedPacket* nextPacket = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
        if (nextPacket == NULL) break;

        fNeedDelivery = False;

        if (nextPacket->useCount() == 0) {
            // Before using the packet, check whether it has a special header
            // that needs to be processed:
            unsigned specialHeaderSize;
            if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
                // Something's wrong with the header; reject the packet:
                fReorderingBuffer->releaseUsedPacket(nextPacket);
                fNeedDelivery = True;
                break;
            }
            nextPacket->skip(specialHeaderSize);
        }

        // Check whether we're part of a multi-packet frame, and whether
        // there was packet loss that would render this packet unusable:
        if (fCurrentPacketBeginsFrame) {
            if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
                // We didn't get all of the previous frame.
                // Forget any data that we used from it:
                fTo = fSavedTo; fMaxSize = fSavedMaxSize;
                fFrameSize = 0;
            }
            fPacketLossInFragmentedFrame = False;
        } else if (packetLossPrecededThis) {
            // We're in a multi-packet frame, with preceding packet loss
            fPacketLossInFragmentedFrame = True;
        }
        if (fPacketLossInFragmentedFrame) {
            // This packet is unusable; reject it:
            fReorderingBuffer->releaseUsedPacket(nextPacket);
            fNeedDelivery = True;
            break;
        }

        // The packet is usable. Deliver all or part of it to our caller:
        unsigned frameSize;
        nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
                        fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
                        fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
                        fCurPacketMarkerBit);
        fFrameSize += frameSize;

        if (!nextPacket->hasUsableData()) {
            // We're completely done with this packet now
            fReorderingBuffer->releaseUsedPacket(nextPacket);
        }

        if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) {
            // We have all the data that the client wants.
            if (fNumTruncatedBytes > 0) {
                envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
                       << fSavedMaxSize << ").  "<< fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
            }
            // Call our own 'after getting' function, so that the downstream object can consume the data:
            if (fReorderingBuffer->isEmpty()) {
                // Common case optimization: There are no more queued incoming packets, so this code will not get
                // executed again without having first returned to the event loop.  Call our 'after getting' function
                // directly, because there's no risk of a long chain of recursion (and thus stack overflow):
                afterGetting(this);
            } else {
                // Special case: Call our 'after getting' function via the event loop.
                nextTask() = envir().taskScheduler().scheduleDelayedTask(0,  (TaskFunc*)FramedSource::afterGetting, this);
            }
        } else {
            // This packet contained fragmented data, and does not complete
            // the data that the client wants.  Keep getting data:
            fTo += frameSize; fMaxSize -= frameSize;
            fNeedDelivery = True;
        }
    }
}

   FramedSource::afterGetting(FramedSource* source) :

void FramedSource::afterGetting(FramedSource* source) 
{
    source->fIsCurrentlyAwaitingData = False;
    // indicates that we can be read again
    // Note that this needs to be done here, in case the "fAfterFunc"
    // called below tries to read another frame (which it usually will)

    if (source->fAfterGettingFunc != NULL) {
        (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                                   source->fFrameSize, 
                                   source->fNumTruncatedBytes,
                                   source->fPresentationTime,
                                   source->fDurationInMicroseconds);
    }
}

  fAfterGettingFunc函數指針在FramedSource::getNextFrame()中被賦值afterGettingFunc,afterGettingFunc的值則是rtspRead()函數調用getNextFrame()函數時,傳入的StreamRead()。這樣就獲取了一幀數據。

     在MultiFramedRTPSource::doGetNextFrame()函數中,我們發現了fRTPInterface.startNetworkReading(handler),這個函數主要做了什么作用?

void RTPInterface::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) 
{
    // Normal case: Arrange to read UDP packets:
    envir().taskScheduler().turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);

    // Also, receive RTP over TCP, on each of our TCP connections:
    fReadHandlerProc = handlerProc;
    for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {
        // Get a socket descriptor for "streams->fStreamSocketNum":
        SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
        if (socketDescriptor == NULL) {
            socketDescriptor = new SocketDescriptor(envir(), streams->fStreamSocketNum);
            socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum), socketDescriptor);
        }

        // Tell it about our subChannel:
        socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
    }
}

  這個函數主要做了兩個作用,一個是注冊UDP socket的讀取任務函數MultiFramedRTPSource::networkReadHandler()到任務隊列,一個是注冊TCP socket的讀取任務函數SocketDescriptor::tcpReadHandler()到任務隊列,最終還是會調用MultiFramedRTPSource::networkReadHandler()函數獲取一幀數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM