Live555 分析(一):類介紹


  從程序的結構來看,live項目包括了四個基本庫、程序入口類(在mediaServer中)和一些測試代碼(在testProgs中)。

  四個基本靜態庫是UsageEnvironmentBasicUsageEnvironmentgroupsockliveMedia

UsageEnvironment:

  包括抽象類UsageEnvironment和抽象類TaskScheduler,這兩個類用於事件調度,其中包括實現了對事件的異步讀取、對事件句柄的設置及對錯誤信息的輸出等;該庫中還有一個HashTable,這是一個通用的HashTable,在整個項目中都可以使用它,當然該HashTable也是一個抽象類。

BasicUsageEnvironment:

  主要是對UsageEnvironment中對應類的實現。 

  BasicUsageEnvironment和UsageEnvironment中的類都是用於整個系統的基礎功能類.比如UsageEnvironment代表了整個系統運行的環境,它提供了錯誤記錄和錯誤報告的功能,無論哪一個類要輸出錯誤,就需要保存UsageEnvironment的指針.而TaskScheduler則提供了任務調度功能.整個程序的運行發動機就是它,它調度任務,執行任務(任務就是一個函數).TaskScheduler由於在全局中只有一個,所以保存在了UsageEnvironment中.而所有的類又都保存了UsageEnvironment的指針,所以誰想把自己的任務加入調度中,那是很容易的.在此還看到一個結論:整個live555(服務端)只有一個線程.

  BasicUsageEnvironment和UsageEnvironment中的類初始化:

  // Begin by setting up our usage environment:
  TaskScheduler* scheduler = BasicTaskScheduler::createNew();
  env = BasicUsageEnvironment::createNew(*scheduler); 

  在服務端中的main()函數,可見其創建一個RTSPServer類實例后,和在客服端的main函數連接成功,開始播放后,即進入相同的一個函數:

  env->taskScheduler().doEventLoop(); // does not return

  在函數定義在BasicTaskScheduler0.cpp中:

void BasicTaskScheduler0::doEventLoop(char* watchVariable) {
    // Repeatedly loop, handling readble sockets and timed events:
    while (1) {
        if (watchVariable != NULL && *watchVariable != 0)
            break;
        SingleStep();
    }
}

  BasicTaskScheduler0從TaskScheduler派生,所以還是一個任務調度對象,所以依然說明任務調度對象是整個程序的發動機。循環中每次走一步:SingleStep():

void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) 
{
        fd_set readSet = fReadSet; // make a copy for this select() call
        
        //計算select socket們時的超時時間。
        DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();
        struct timeval tv_timeToDelay;
        
        tv_timeToDelay.tv_sec = timeToDelay.seconds();
        tv_timeToDelay.tv_usec = timeToDelay.useconds();
        
        // Very large "tv_sec" values cause select() to fail.
        // Don't make it any larger than 1 million seconds (11.5 days)
        const long MAX_TV_SEC = MILLION;
        if (tv_timeToDelay.tv_sec > MAX_TV_SEC) 
        {
                tv_timeToDelay.tv_sec = MAX_TV_SEC;
        }
        
        // Also check our "maxDelayTime" parameter (if it's > 0):
        if (maxDelayTime > 0
                && (tv_timeToDelay.tv_sec > (long)maxDelayTime/MILLION 
                || (tv_timeToDelay.tv_sec == (long)maxDelayTime/MILLION 
                && tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) 
        {
                tv_timeToDelay.tv_sec = maxDelayTime/MILLION;
                tv_timeToDelay.tv_usec = maxDelayTime%MILLION;
        }

        //先執行socket的select操作,以確定哪些socket任務(handler)需要執行。
        int selectResult = select(fMaxNumSockets, &readSet, NULL, NULL, &tv_timeToDelay);
        
        if (selectResult < 0) 
        {
#if defined(__WIN32__) || defined(_WIN32)
                int err = WSAGetLastError();
                // For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if
                // it was called with no entries set in "readSet".  If this happens, ignore it:
                if (err == WSAEINVAL && readSet.fd_count == 0) 
                {
                        err = 0;
                        // To stop this from happening again, create a dummy readable socket:
                        int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0);
                        FD_SET((unsigned)dummySocketNum, &fReadSet);
                }
                if (err != 0) 
                {
#else
                        if (errno != EINTR && errno != EAGAIN) 
                        {
#endif
                                // Unexpected error - treat this as fatal:
#if !defined(_WIN32_WCE)
                                perror("BasicTaskScheduler::SingleStep(): select() fails");
#endif
                                exit(0);
                        }
                }
          
          // Handle any delayed event that may have come due:
          //執行一個最迫切的延遲任務。
          fDelayQueue.handleAlarm();
          
          // Call the handler function for one readable socket:
          HandlerIterator iter(*fReadHandlers);
          HandlerDescriptor* handler;
          
          // To ensure forward progress through the handlers, begin past the last
          // socket number that we handled:
        if (fLastHandledSocketNum >= 0) 
        {
            //找到上次執行的socket handler的下一個
                while ((handler = iter.next()) != NULL) 
                {
                        if (handler->socketNum == fLastHandledSocketNum) break;
                }
                if (handler == NULL) 
                {
                        fLastHandledSocketNum = -1;
                        iter.reset(); // start from the beginning instead
                }
        }
        
         //從找到的handler開始,找一個可以執行的handler,不論其狀態是可讀,可寫,還是出錯,執行之。
        while ((handler = iter.next()) != NULL) 
        {
                if (FD_ISSET(handler->socketNum, &readSet) &&
                        FD_ISSET(handler->socketNum, &fReadSet) /* sanity check */ &&
                        handler->handlerProc != NULL) 
                {
                        fLastHandledSocketNum = handler->socketNum;
                        // Note: we set "fLastHandledSocketNum" before calling the handler,
                        // in case the handler calls "doEventLoop()" reentrantly.
                        (*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
                        break;
                }
        }
        
        //如果尋找完了依然沒有執行任何handle,則從頭再找。
        if (handler == NULL && fLastHandledSocketNum >= 0) 
        {
                // We didn't call a handler, but we didn't get to check all of them,
                // so try again from the beginning:
                iter.reset();
                while ((handler = iter.next()) != NULL) 
                {
                        if (FD_ISSET(handler->socketNum, &readSet) &&
                                FD_ISSET(handler->socketNum, &fReadSet) /* sanity check */ &&
                                handler->handlerProc != NULL) 
                        {
                                fLastHandledSocketNum = handler->socketNum;
                                // Note: we set "fLastHandledSocketNum" before calling the handler,
                                // in case the handler calls "doEventLoop()" reentrantly.
                                (*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
                                break;
                        }
                }
                
                //依然沒有找到可執行的handler。
                if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler
        }
}

總結為以下四步:
  1> 為所有需要操作的socket執行select。
  2> 找到第一個應執行的延遲任務並執行之
  3> 找出第一個應執行的socket任務(handler)並執行之

  可見,每一步中只執行三個任務隊列中的一項。到這里,我們不盡要問這些socket handlerdelay task是哪里來的?

  DelayQueue類:譯為"延遲隊列",它是一個隊列,每一項代表了一個要調度的任務(在它的fToken變量中保存)。同時保存了這個任務離執行時間點的剩余時間。可以預見,它就是在TaskScheduler中用於管理調度任務的東西。注意:此隊列中的任務只被執行一次!執行完后這一項即被無情拋棄!

  HandlerSet類:Handler集合。Handler是什么呢?它是一種專門用於執行socket操作的任務(函數),HandlerSet被TaskScheduler用來管理所有的socket任務(增刪改查)。所以TaskScheduler中現在已調度兩種任務了:socket任務(handlerSet)和延遲任務(DelayQueue).其實TaskScheduler還調度第三種任務:Event,介個后面再說。

  HandlerDescriptor類:socket handlet描述。

  socket handler加入執行隊列后會一直存在,而delay task在執行完一次后會立即棄掉。

  socket handler保存在隊列BasicTaskScheduler0::HandlerSet* fHandlers中,通過調用envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this)這樣的方式加入到socket handler隊列中:

void BasicTaskScheduler::turnOnBackgroundReadHandling(int socketNum,
                BackgroundHandlerProc* handlerProc,
                void* clientData) 
{
    if (socketNum < 0) return;
    
    FD_SET((unsigned)socketNum, &fReadSet);
    
    fReadHandlers->assignHandler(socketNum, handlerProc, clientData);

    if (socketNum+1 > fMaxNumSockets) {
        fMaxNumSockets = socketNum+1;
    }
}

  socket handler添加時為什么需要那些參數呢?socketNum是需要的,因為要select socket(socketNum即是socket()返回的那個socket對象)。conditionSet也是需要的,它用於表明socket在select時查看哪種裝態,是可讀?可寫?還是出錯?proc和clientData這兩個參數就不必說了(真有不明白的嗎?)。再看BackgroundHandlerProc的參數,socketNum不必解釋,mask是什么呢?它正是對應着conditionSet,但它表明的是select之后的結果,比如一個socket可能需要檢查其讀/寫狀態,而當前只能讀,不能寫,那么mask中就只有表明讀的位被設置。

void HandlerSet::assignHandler(int socketNum,
        TaskScheduler::BackgroundHandlerProc* handlerProc,
        void* clientData) 
{
    // First, see if there's already a handler for this socket:
    HandlerDescriptor* handler;
    HandlerIterator iter(*this);
    
    while ((handler = iter.next()) != NULL) {
        if (handler->socketNum == socketNum) break;
    }
    
    if (handler == NULL) { // No existing handler, so create a new descr:
        handler = new HandlerDescriptor(fHandlers.fNextHandler);
        handler->socketNum = socketNum;
    }

    handler->handlerProc = handlerProc;
    handler->clientData = clientData;
}

  delay task保存在隊列BasicTaskScheduler0::DelayQueue fDelayQueue中,通過調用envir().taskScheduler().scheduleDelayedTask(0,  (TaskFunc*)FramedSource::afterGetting, this)這樣的方式加入到delay task隊列中,需要傳入task延遲等待的微秒(百萬分之一秒)數(第一個參數):

TaskToken BasicTaskScheduler0::scheduleDelayedTask(int64_t microseconds,TaskFunc* proc, void* clientData) 
{
  if (microseconds < 0)
    microseconds = 0;
  
//DelayInterval 是表示時間差的結構   DelayInterval timeToDelay((long) (microseconds / 1000000),(long) (microseconds % 1000000));   //創建delayQueue中的一項   AlarmHandler* alarmHandler = new AlarmHandler(proc, clientData,timeToDelay);   //加入DelayQueue   fDelayQueue.addEntry(alarmHandler);
  
//返回delay task的唯一標志   return (void*) (alarmHandler->token()); }

  delay task的執行都在函數fDelayQueue.handleAlarm()中,handleAlarm()在類DelayQueue中實現。看一下handleAlarm():

void DelayQueue::handleAlarm() 
{
  //如果第一個任務的執行時間未到,則同步一下(重新計算各任務的等待時間)。
  if (head()->fDeltaTimeRemaining != DELAY_ZERO)
    synchronize();
  
//如果第一個任務的執行時間到了,則執行第一個,並把它從隊列中刪掉。   if (head()->fDeltaTimeRemaining == DELAY_ZERO) {     // This event is due to be handled:     DelayQueueEntry* toRemove = head();     removeEntry(toRemove); // do this first, in case handler accesses queue     //執行任務,執行完后會把這一項銷毀。     toRemove->handleTimeout();   } }

  可能感覺奇怪,其它的任務隊列都是先搜索第一個應該執行的項,然后再執行,這里干脆,直接執行第一個完事。那就說明第一個就是最應該執行的一個吧?也就是等待時間最短的一個吧?那么應該在添加任務時,將新任務跟據其等待時間插入到適當的位置而不是追加到尾巴上吧?猜得對不對還得看fDelayQueue.addEntry(alarmHandler)這個函數是怎么執行的。

void DelayQueue::addEntry(DelayQueueEntry* newEntry) 
{
  //重新計算各項的等待時間
  synchronize();

  //取得第一項
  DelayQueueEntry* cur = head();
  //從頭至尾循環中將新項與各項的等待時間進行比較
  while (newEntry->fDeltaTimeRemaining >= cur->fDeltaTimeRemaining) {
    //如果新項等待時間長於當前項的等待時間,則減掉當前項的等待時間。
    //也就是后面的等待時幾只是與前面項等待時間的差,這樣省掉了記錄插入時的時間的變量。
    newEntry->fDeltaTimeRemaining -= cur->fDeltaTimeRemaining;
    //下一項
    cur = cur->fNext;
  }

  //循環完畢,cur就是找到的應插它前面的項,那就插它前面吧
  cur->fDeltaTimeRemaining -= newEntry->fDeltaTimeRemaining;

  // Add "newEntry" to the queue, just before "cur":
  newEntry->fNext = cur;
  newEntry->fPrev = cur->fPrev;
  cur->fPrev = newEntry->fPrev->fNext = newEntry;
}

groupsock:

  groupsock庫中包括了GroupEId、Groupsock、GroupsockHelper、NetAddress、NetInterface等類。

  這個是放在單獨的庫Groupsock中。它封裝了socket操作,支持udp多播放支持和一對多單播的功能,tcp socket創建。Groupsock類有兩個構造函數:

  一個是ASM(即任意源組播模型):

// Constructor for a source-independent multicast group
Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
                            Port port, u_int8_t ttl)
                            :OutputSocket(env, port),//創建udp socket
                            deleteIfNoMembers(False), isSlave(False),
                             fIncomingGroupEId(groupAddr, port.num(), ttl), 
                             fDests(NULL), fTTL(ttl) 
{
    addDestination(groupAddr, port);//記錄組播地址

    if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {//如果非組播地址,則不會進入里面,組播地址,則加入組播
        if (DebugLevel >= 1) {
            env << *this << ": failed to join group: " << env.getResultMsg() << "\n";
        }
    }

    // Make sure we can get our source address:
    if (ourSourceAddressForMulticast(env) == 0) {//獲取本機的地址
        if (DebugLevel >= 0) { // this is a fatal error
            env << "Unable to determine our source address: " << env.getResultMsg() << "\n";
        }
    }

    if (DebugLevel >= 2) env << *this << ": created\n";
}

  另一個是SSM(指定信源組播模型):

// Constructor for a source-specific multicast group
Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
                            struct in_addr const& sourceFilterAddr,Port port)
                            : OutputSocket(env, port),//創建udp socket
                            deleteIfNoMembers(False), isSlave(False),
                            fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
                            fDests(NULL), fTTL(255) 
{
    addDestination(groupAddr, port);//記錄組播地址

    // First try a SSM join.  If that fails, try a regular join:
    if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr, sourceFilterAddr.s_addr)) {//如果非組播地址,則退出
        if (DebugLevel >= 3) {
            env << *this << ": SSM join failed: " << env.getResultMsg();
            env << " - trying regular join instead\n";
        }
        
        if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {//如果是組播地址則加入
            if (DebugLevel >= 1) {
                env << *this << ": failed to join group: " << env.getResultMsg() << "\n";
            }
        }
    }

    if (DebugLevel >= 2) env << *this << ": created\n";
}

  根據上面兩個構造函數,我們發現只是加入組播,真正的創建組播在哪里呢?就是在構造函數的初始化列表中的OutputSocket,這個類最終調用了Socket的setupDatagramSocket函數:

int setupDatagramSocket(UsageEnvironment& env, Port port,
#ifdef IP_MULTICAST_LOOP
            Boolean setLoopback
#else
            Boolean
#endif
    ) 
{
    if (!initializeWinsockIfNecessary()) {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    int newSocket = socket(AF_INET, SOCK_DGRAM, 0); //創建udp socket
    if (newSocket < 0) {
        socketErr(env, "unable to create datagram socket: ");
        return newSocket;
    }

    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//設置socket 可重用
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

#if defined(__WIN32__) || defined(_WIN32)
    // Windoze doesn't properly handle SO_REUSEPORT or IP_MULTICAST_LOOP
#else
#ifdef SO_REUSEPORT
    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//端口復用
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif

#ifdef IP_MULTICAST_LOOP
    const u_int8_t loop = (u_int8_t)setLoopback;
    if (setsockopt(newSocket, IPPROTO_IP, IP_MULTICAST_LOOP, (const char*)&loop, sizeof loop) < 0) {//數據送到本地回環接口
        socketErr(env, "setsockopt(IP_MULTICAST_LOOP) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif
#endif

    // Note: Windoze requires binding, even if the port number is 0
    netAddressBits addr = INADDR_ANY;
#if defined(__WIN32__) || defined(_WIN32)
#else
    if (port.num() != 0 || ReceivingInterfaceAddr != INADDR_ANY) {
#endif
        if (port.num() == 0) addr = ReceivingInterfaceAddr;
        
        MAKE_SOCKADDR_IN(name, addr, port.num());
        
        if (bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0) {//綁定socket
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error (port number: %d): ",
            ntohs(port.num()));
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
#if defined(__WIN32__) || defined(_WIN32)
#else
    }
#endif

    // Set the sending interface for multicasts, if it's not the default:
    if (SendingInterfaceAddr != INADDR_ANY) {
        struct in_addr addr;
        addr.s_addr = SendingInterfaceAddr;

        if (setsockopt(newSocket, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&addr, sizeof addr) < 0) {//設置組播的默認網絡接口
            socketErr(env, "error setting outgoing multicast interface: ");
            closeSocket(newSocket);
            return -1;
        }
    }

    return newSocket;
}

  而GroupsockHelper.cpp不但定義了setupDatagramSocket函數,還定義了udp Socket的讀(readSocket)寫(writeSocket)函數。

     上面講的udp的組播和單播,下面分析一下tcp的單播。

  首先是tcp socket創建:

int setupStreamSocket(UsageEnvironment& env,
                      Port port, Boolean makeNonBlocking) 
{
    if (!initializeWinsockIfNecessary()) {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    int newSocket = socket(AF_INET, SOCK_STREAM, 0);//創建tcp socket
    if (newSocket < 0) {
        socketErr(env, "unable to create stream socket: ");
        return newSocket;
    }

    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof reuseFlag) < 0) { //設置socket 復用
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

    // SO_REUSEPORT doesn't really make sense for TCP sockets, so we
    // normally don't set them.  However, if you really want to do this
    // #define REUSE_FOR_TCP
#ifdef REUSE_FOR_TCP
#if defined(__WIN32__) || defined(_WIN32)
    // Windoze doesn't properly handle SO_REUSEPORT
#else
#ifdef SO_REUSEPORT
    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//設置端口復用
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif
#endif
#endif

    // Note: Windoze requires binding, even if the port number is 0
#if defined(__WIN32__) || defined(_WIN32)
#else
    if (port.num() != 0 || ReceivingInterfaceAddr != INADDR_ANY) {
#endif
        MAKE_SOCKADDR_IN(name, ReceivingInterfaceAddr, port.num());
        if (bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0) {//綁定socket
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error (port number: %d): ", ntohs(port.num()));
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
#if defined(__WIN32__) || defined(_WIN32)
#else
    }
#endif

    if (makeNonBlocking) {
        if (!makeSocketNonBlocking(newSocket)) {
            socketErr(env, "failed to make non-blocking: ");
            closeSocket(newSocket);
            return -1;
        }
    }

    return newSocket;
}

  那程序是怎么判斷用組播還是單播,是tcp還是udp呢?

  在RTSP的setupMediaSubsession()函數中的“SETUP”會向服務端確定是否支持單播或者組播,當收到服務器的應該后,進行下面的處理:

       ... ...   

    if (streamUsingTCP) {//如果客服端將streamUsingTcp設置為1,則表明rtp和rtcp進行tcp傳輸;如果為0,則是udp傳輸。tcp傳輸只能是單播,udp傳輸則可能是單播也可能是組播,服務端的設置,如果服務端支持組播,那只能是組播,反之則是單播。
            // Tell the subsession to receive RTP (and send/receive RTCP)
            // over the RTSP stream:
            if (subsession.rtpSource() != NULL)//rtp連接
                subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId);//rtpSource()函數獲取的fRTPSource在initiate函數根據sdp描述的編碼類型創建的
            if (subsession.rtcpInstance() != NULL)//rtcp連接
                subsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId);//rtcpInstance()函數獲取的fRTCPInstance在initiate函數中創建的
        } else {//udp傳輸
            // Normal case.
            // Set the RTP and RTCP sockets' destination address and port
            // from the information in the SETUP response: 
            subsession.setDestinations(fServerAddress); //它管理着一個本地socket和多個目的地址,因為是UDP,所以只需知道對方地址和端口即可發送數據。所以我們從服務器獲取了新的rtp和rtcp端口加入到目標地址中。
        }
... ...

  setStreamSocket函數最終調用的是RTPInterface::setStreamSocket,將rtp和rtcp的socket保存到streams->fStreamSocketNum(注意:tcp單播:服務端的socket是綁定的,每次客服端請求便會創建一個clientsocket, 而客服端rtsp rtp rtcp協議用就是用的這個socket;udp單播,live555沒有采用connect的方式,直接setDestinations函數加入到fDests鏈表中,發送時候的目標地址在此鏈表中),當rtp和rtcp發送數據的時候調用的RTPInterface::sendPacket函數將會用到:

void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) 
{
    // Normal case: Send as a UDP packet:
    fGS->output(envir(), fGS->ttl(), packet, packetSize);//udp傳輸

    // Also, send over each of our TCP sockets:
    for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {//tcp傳輸
        sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId);
    }
}

  setDestinations()函數定義在MediaSubsession.cpp中:

void MediaSubsession::setDestinations(unsigned defaultDestAddress) 
{
    // Get the destination address from the connection endpoint name
    // (This will be 0 if it's not known, in which case we use the default)
    unsigned destAddress = connectionEndpointAddress();
    if (destAddress == 0) destAddress = defaultDestAddress;
    struct in_addr destAddr; destAddr.s_addr = destAddress;

    // The destination TTL remains unchanged:
    int destTTL = ~0; // means: don't change

    if (fRTPSocket != NULL) {
        Port destPort(serverPortNum);//這里serverPortNum是在服務端應答中獲取rtp的端口號
        fRTPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//這里的fRTPSocket是在前面的initiate函數中創建的
    }
    if (fRTCPSocket != NULL && !isSSM()) {
        // Note: For SSM sessions, the dest address for RTCP was already set.
        Port destPort(serverPortNum+1);//這個是rtcp的端口號
        fRTCPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//這里的fRTCPSocket在前面的initiate函數創建的
    }
}
changeDestinationParameters()定義在Groupsock.cpp中:
//改變目的地址的參數
//newDestAddr是新的目的地址
//newDestPort是新的目的端口
//newDestTTL是新的TTL
void Groupsock::changeDestinationParameters(
        struct in_addr const& newDestAddr,
        Port newDestPort,
        int newDestTTL)
{
    if (fDests == NULL)
        return;

    //獲取第一個目的地址(此處不是很明白:fDest是一個單向鏈表,每次添加一個目的地址,
    //都會把它插入到最前目,難道這個函數僅改變最后一個添加的目的地址?)
    struct in_addr destAddr = fDests->fGroupEId.groupAddress();
    if (newDestAddr.s_addr != 0) {
        if (newDestAddr.s_addr != destAddr.s_addr
                && IsMulticastAddress(newDestAddr.s_addr))
        {
            //如果目的地址是一個多播地址,則離開老的多播組,加入新的多播組。
            socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
            socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
        }
        destAddr.s_addr = newDestAddr.s_addr;
    }

    portNumBits destPortNum = fDests->fGroupEId.portNum();
    if (newDestPort.num() != 0) {
        if (newDestPort.num() != destPortNum &&
                IsMulticastAddress(destAddr.s_addr))
        {
            //如果端口也不一樣,則先更改本身socket的端口
            //(其實是關掉原先的socket的,再以新端口打開一個socket)。
            changePort(newDestPort);
            //然后把新的socket加入到新的多播組。
            // And rejoin the multicast group:
            socketJoinGroup(env(), socketNum(), destAddr.s_addr);
        }
        destPortNum = newDestPort.num();
        fDests->fPort = newDestPort;
    }

    u_int8_t destTTL = ttl();
    if (newDestTTL != ~0)
        destTTL = (u_int8_t) newDestTTL;

    //目標地址的所有信息都在fGroupEId中,所以改變成員fGroupEId。
    fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL);
    
    //(看起來這個函數好像只用於改變多播時的地址參數,
    //以上分析是否合理,肯請高人指點)
}

liveMedia:

  是很重要的一個庫,其不僅包含了實現RTSP Server的類,還包含了針對不同流媒體類型(如TS流、PS流等)編碼的類。在該庫中,基類是Medium,層次關系非常清晰。在該庫中,有幾個很重要的類,如RTSPServer、ServerMediaSession、RTPSink、RTPInterface、FramedSource等。  

  從上面這個主要的類結構可以看出,liveMedia庫中的基類為Medium,其下又有幾個非常重要的部分,一個是×××Subsession,除Medium父類外,所有的×××Subsession類都繼承於ServerMediaSubsession;一個是×××Source,MediaSource的frameSource下主要包含FramedFileSource、RTPSource、FramedFilter等幾個主要的部分;一個是MediaSink,以繼承於RTPSink的類居多。

  此外,還包含了用於處理packet的BufferedPacketFactory和BufferedPacket及其相關子類,用於處理流分析的StreamParser及其子類。

公用類:

  RTPInterface類:發送rtp rtcp數據包;startNetworkReading函數注冊tcp讀取rtp rtcp數據的回調函數,注冊udp讀取rtp rtcp數據的回調函數。

  RTCPInstance類:rtcp協議的實現,創建RTPInterface類實例fRTCPInterface。

服務端實現需要以下幾個基類:  

  RTSPServer:創建rtsp分tcp socket,注冊任務incomingConnectionHandler,在此任務函數里,accept接收客服端連接,創建RTSPClientSession類;

  RTSPClientSession類:注冊任務incomingRequestHandler,在此任務函數里,readSocket讀取客服端發送的信息,並且解析出cmdName,收到"OPTIONS"命令則通過handleCmd_OPTIONS函數應答,收到"DESCRIBE"則通過handleCmd_DESCRIBE函數應答,收到 "SETUP"命令則通過handleCmd_SETUP函數應答,收到 "TEARDOWN"、 "PLAY"、"PAUSE"、"GET_PARAMETER"通過handleCmd_withinSession函數應答,如果都不是則通過handleCmd_notSupported函數應答。  

    ServerMediaSubsession類:多媒體流可能包含幾個子流,每個流都帶有fTrackId,比如視頻流track1,音頻流track2

  ServerMediaSession類:一個多媒體流增加addSubsession子流,產生generateSDPDescription SDP描述。

      FramedSource類:提供虛函數doGetNextFrame函數去獲取信息,此函數具體實現在派生類中,比如我的是在OpenFileSource類實現。

  RTPSink類:服務端數據是從 FramedSource流到 RTPSink,並且創建RTPInterface類實例fRTPInterface。

單播:

  StreamState類:startPlaying函數開始播放,endPlaying函數結束播放,pause函數暫停播放。

  Destinations類:udp連接,保存isTCP為false,目標地址,rtp的端口,rtcp的端口;tcp連接,保存isTcp為TRUE,tcp socket值,rtp rtcp通道ID。

  OnDemandServerMediaSubsession類:單播的時候,通過getStreamParameters函數創建udp socket,創建FramedSource, 創建RTPSink,創建StreamState,創建Destinations,當收到“PLAY”的命令時調用startStream開始播放,當收到“PAUSE”時調用pauseStream函數暫停播放,當收到“TERADOWN”時停止播放。 

組播:

  PassiveServerMediaSubsession類:getStreamParameters函數改變rtp和rtcp的目標地址。

客服端實現需要以下幾個基類:

  RTPSource:客戶端數據則是從 RTPSource 流到 XXXFileSink, 並且創建RTPInterface類實例fRTPInterface。

  MediaSubsession類:initiate函數創建rtp rtcp socket,並且根據服務端傳來的編碼類型fCodecName來創建相應的RTPSource,創建RTCPInstance;解析SDP的各種屬性;setDestinations函數設置服務端地址。

  RTSPClient: 創建tcp socket,連接rtsp服務端,sendOptionsCmd函數發送“OPTIONS”命令; describeURL函數則是發送“DESCRIBE”命令;announceSDPDescription函數則是發送“ANNOUNCE”命令;setupMediaSubsession函數則是發送“SETUP”命令,並且設置socket的目標地址;playMediaSession函數則是發送“PLAY”命令;pauseMediaSession函數則是發送“PAUSE”命令;等等。主要是RTSP的通信處理。

  基本上,整個liveMedia庫的主要類結構就是這樣。

  在http://www.live555.com上的相關文檔中提到穿透防火牆的問題,方法是開啟一個HTTP的tunnel,然后我們可以在liveMedia庫中找到一個RTSPOverHTTPServer的類,該類解決了這樣的問題。

mediaServer:

   Live555MediaServer提供了main函數,DynamicRTSPServer繼承了RTSPServer並重寫了虛函數lookupServerMediaSession。用不到。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM