經過兩個多月的攻關,終於搞定了live555多線程並穩定壓測通過


live555已經發展了十幾年了,不得不欽佩作者堅持不懈的奉獻和國外的開源生態環境,live555可以說是大部分的安防從業者的入門之選,尤其是在嵌入式或者Linux系統上,其應用還是蠻廣泛的,主要是其兼容性和穩定性;

但是隨着live555十幾年的不斷迭代,很多開發者反復向作者Ross提到的多線程和IPv6的功能,作者也一直都沒有去嘗試,可能是這樣會對live555的架構產生比較大的改動和影響,作者為了穩妥,選擇了小改動、穩定、逐步迭代的方式, 雖然是性能穩定,但支持的路數有限,不能多線程工作始終是個坎; 網上找到幾篇live555多線程的博客, 基本上大同小異,就是創建獨立的UsageEnvironment和TaskSchedule, 由獨立的線程分工協作; 本人也是這個思路,創建多個工作線程,每個工作線程內創建UsageEnvironment和TaskSchedule,然后各自開啟EventLoop;

今天我們拋磚引玉,先大概聊一下主體思路,在后續的博客中將盡力完整地匯總這些思路和開發的過程:

目標

將live555修改為多線程, 每個通道對應一個工作線程,由工作線程對該通道進行獨立處理;

大體修改點

修改支持多線程, 主要涉及到以下類的修改

  • GenericMediaServer
  • RTSPServer

GenericMediaServer.cpp
在GenericMediaServer的構造函數中, 創建工作線程個數,即最大支持的通道數;

GenericMediaServer
::GenericMediaServer(UsageEnvironment& env, int ourSocketV4, int ourSocketV6, Port ourPort,
             unsigned reclamationSeconds)
  : Medium(env),
    fServerSocket4(ourSocketV4), fServerSocket6(ourSocketV6), 
    fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientSessions(HashTable::create(STRING_HASH_KEYS)) {
  ignoreSigPipeOnSocket(fServerSocket4); // so that clients on the same host that are killed don't also kill us
  ignoreSigPipeOnSocket(fServerSocket6); // so that clients on the same host that are killed don't also kill us


#ifdef LIVE_MULTI_THREAD_ENABLE

  InitMutex(&mutexClientConnection);

  memset(&multiThreadCore, 0x00, sizeof(MultiThread_CORE_T));
  multiThreadCore.threadNum = MAX_DEFAULT_MULTI_THREAD_NUM;
  multiThreadCore.threadTask = new LIVE_THREAD_TASK_T[multiThreadCore.threadNum];
  memset(&multiThreadCore.threadTask[0], 0x00, sizeof(LIVE_THREAD_TASK_T) * multiThreadCore.threadNum);
  for (int i=0; i<multiThreadCore.threadNum; i++)
  {
      char szName[36] = {0};
      sprintf(szName, "worker thread %d", i);
      multiThreadCore.threadTask[i].id = i;
      multiThreadCore.threadTask[i].extPtr = this;
      multiThreadCore.threadTask[i].pSubScheduler = BasicTaskScheduler::createNew();
      multiThreadCore.threadTask[i].pSubEnv = BasicUsageEnvironment::createNew(*multiThreadCore.threadTask[i].pSubScheduler, i+1, szName);

      CreateOSThread( &multiThreadCore.threadTask[i].osThread, __OSThread_Proc, (void *)&multiThreadCore.threadTask[i] );


  }
#endif

  // Arrange to handle connections from others:
  env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket4, incomingConnectionHandler4, this);
  env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket6, incomingConnectionHandler6, this);
} 

接受客戶端連接

按原有流程接受客戶端連接;

分配客戶端請求

在收到客戶端發送的DESCRIBE命令后,
在通道列表中找出空閑的通道,將該客戶端關聯到該通道, 然后從主線程中移除該socket, 由工作線程接管該socket的操作;
后續有客戶端如訪問已經存在的通道,則主線程會將該請求直接分配給對應的工作線程處理;

注意: 主線程的工作到此結束,不要執行lookupServerMediaSession的操作;

在工作線程中, 接管客戶端的socket后, 馬上執行lookupServerMediaSession, 在該函數中,將后綴回調給上層調用程序, 由上層調用程序判斷是否存在該通道,如不存在則返回失敗,如存在則向前端取流,然后填充媒體信息返回成功, 庫內部則創建相應的mediasession, 再回應客戶端, 后續的則完成整個rtsp流程的交互;

注意: 創建MediaSession時,必須將工作線程中的UsageEnvironment傳進去, 不能使用主線程中的envir();

int RTSPServer::RTSPClientConnection
::handleCmd_DESCRIBE(UsageEnvironment *pEnv, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr, LIVE_THREAD_TASK_T **pThreadTask) 
{
    int handleCmdRet = 0;

  ServerMediaSession* session = NULL;
  char* sdpDescription = NULL;
  char* rtspURL = NULL;
  do {
    char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];
        // enough space for urlPreSuffix/urlSuffix'\0'
    urlTotalSuffix[0] = '\0';
    if (urlPreSuffix[0] != '\0') {
      strcat(urlTotalSuffix, urlPreSuffix);
      strcat(urlTotalSuffix, "/");
    }
    strcat(urlTotalSuffix, urlSuffix);

    if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;

    // We should really check that the request contains an "Accept:" #####
    // for "application/sdp", because that's what we're sending back #####



    _TRACE(TRACE_LOG_DEBUG, "handleCmd_DESCRIBE  socket[%d]\n", this->fOurSocket);


    #ifdef LIVE_MULTI_THREAD_ENABLE


    //如果當前是主線程,則進入到查找通道流程
    if (pEnv->GetEnvirId() == 1000)
    {
        fOurServer.LockClientConnection();  //Lock

        UsageEnvironment  *pChEnv = fOurServer.GetEnvBySuffix(urlSuffix, this, pThreadTask);
        if (NULL == pChEnv)
        {
            fOurServer.UnlockClientConnection();        //Unlock

            handleCmdRet = -1;

            this->assignSink = False;
            this->pEnv = NULL;
            handleCmd_notFound();
            break;
        }
        else
        {
            _TRACE(TRACE_LOG_DEBUG, "將socket[%d] 關聯到[%s]\n", this->fOurSocket, pChEnv->GetEnvirName());

            //將socket從主線程移到工作線程中
            UsageEnvironment  *pMainEnv = &envir();
            envir().taskScheduler().disableBackgroundHandling(fOurSocket);

            fOurServer.UnlockClientConnection();        //Unlock

            return 1000;
        }

        break;
    }

    #endif


    // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix":


    //在工作線程中執行 lookupServerMediaSession
    session = fOurServer.lookupServerMediaSession(pEnv, 1, this, urlTotalSuffix);
    if (session == NULL) {

        //pChEnv->taskScheduler().disableBackgroundHandling(fOurSocket);

        _TRACE(TRACE_LOG_DEBUG, "socket[%d] 在[%s]中, 源未就緒[%s]\n", this->fOurSocket, pEnv->GetEnvirName(), urlTotalSuffix);

        this->assignSink = False;
        this->pEnv = NULL;

        handleCmdRet = -1;

        //envir().taskScheduler().disableBackgroundHandling(fOurSocket);

        //fOurServer.ResetEnvBySuffix(urlSuffix, this);

        handleCmd_notFound();

        break;
    }

    session->incrementReferenceCount();

    // Then, assemble a SDP description for this session:
    sdpDescription = session->generateSDPDescription(fOurIPVer);
    if (sdpDescription == NULL) {
      // This usually means that a file name that was specified for a
      // "ServerMediaSubsession" does not exist.
      setRTSPResponse("404 File Not Found, Or In Incorrect Format");

      break;
    }

    unsigned sdpDescriptionSize = strlen(sdpDescription);

    // Also, generate our RTSP URL, for the "Content-Base:" header
    // (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).
    rtspURL = fOurRTSPServer.rtspURL(session, fOurIPVer, fClientInputSocket);

    snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,
         "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
         "%s"
         "Content-Base: %s/\r\n"
         "Content-Type: application/sdp\r\n"
         "Content-Length: %d\r\n\r\n"
         "%s",
         fCurrentCSeq,
         dateHeader(),
         rtspURL,
         sdpDescriptionSize,
         sdpDescription);
  } while (0);

  if (session != NULL) {
    // Decrement its reference count, now that we're done using it:
    session->decrementReferenceCount();
    if (session->referenceCount() == 0 && session->deleteWhenUnreferenced()) {
      fOurServer.removeServerMediaSession(pEnv, session, True);
    }

    session->SetStreamStatus(1);        //置標志,讓后續訪問該通道的客戶端可以得到迅速響應
  }

  delete[] sdpDescription;
  delete[] rtspURL;

  return handleCmdRet;
}

歷經2個多月,終於將多線程問題搞定. 在此記錄一下, 歡迎探討;

live555技術交流

郵件:289042893@qq.com

live555技術交流群:475947825


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM