licode學習之erizo篇--Pipeline_service


Pipeline是媒體處理的核心流程邏輯。

Pipeline里面定義了兩個主要的概念:Service和Handler。

Service負責處理那些不僅要看當前數據包,還要分析之前的數據包的那些業務,比如丟包重傳;Handler處理當前的數據包的情形,比如生成填充字節。

在Pipeline里面,Handler和Service是配合起來一起工作的,他們通過一套框架將之關聯起來。

先看看使用上:

void MediaStream::initializePipeline() {
  handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
  pipeline_->addService(shared_from_this());
  pipeline_->addService(handler_manager_);
  pipeline_->addService(rtcp_processor_);
  pipeline_->addService(stats_);
  pipeline_->addService(quality_manager_);
  pipeline_->addService(packet_buffer_);

  pipeline_->addFront(std::make_shared<PacketReader>(this));

  pipeline_->addFront(std::make_shared<RtcpProcessorHandler>());
  pipeline_->addFront(std::make_shared<FecReceiverHandler>());
  pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>());
  pipeline_->addFront(std::make_shared<QualityFilterHandler>());
  pipeline_->addFront(std::make_shared<IncomingStatsHandler>());
  pipeline_->addFront(std::make_shared<RtpTrackMuteHandler>());
  pipeline_->addFront(std::make_shared<RtpSlideShowHandler>());
  pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>());
  pipeline_->addFront(std::make_shared<PliPacerHandler>());
  pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>());
  pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>());
  pipeline_->addFront(std::make_shared<RtcpFeedbackGenerationHandler>());
  pipeline_->addFront(std::make_shared<RtpRetransmissionHandler>());
  pipeline_->addFront(std::make_shared<SRPacketHandler>());
  pipeline_->addFront(std::make_shared<SenderBandwidthEstimationHandler>());
  pipeline_->addFront(std::make_shared<LayerDetectorHandler>());
  pipeline_->addFront(std::make_shared<OutgoingStatsHandler>());
  pipeline_->addFront(std::make_shared<PacketCodecParser>());

  pipeline_->addFront(std::make_shared<PacketWriter>(this));
  pipeline_->finalize();
  pipeline_initialized_ = true;
}

 

在初始化時,pipeline調用了addService和addFront接口,將Service和Handler添加到pipeline中去。在初始化里面,我們可以看到其支持了哪些處理。

在實際使用中,接收到的數據,調用pipeline的read接口,就完成了解析為裸數據的事兒;調用write接口,就完成了fec等處理數據的事兒。

pipeline的數據,read的源需要是srtp解密后的數據,處理后為rtp裸數據;write的源為rtp裸數據,處理后的數據經過srtp加密輸出到網絡。(網絡使用的是DtlsTransport接口對接的)

這些功能先不去管它,這里先弄清楚他們的架構和工作方式。

閱讀這塊兒的代碼真是不容易,使用了很多模板類,為了方便理解,菜鳥哥根據代碼,把所有的模板類替換為了實際的基類,來進行理解。

先看看pipeline的Service部分的繼承體系以及數據結構:

 

再結合PipelineBase的addService實現,看一下Service是干啥用的

template <class S>
void PipelineBase::addService(std::shared_ptr<S> service) {
  typedef typename ServiceContextType<S>::type Context;
  service_ctxs_.push_back(std::make_shared<Context>(shared_from_this(), std::move(service)));
}
template <class Service>
struct ServiceContextType {
  typedef ServiceContextImpl<Service> type;
};

addService其實就是傳遞一個Service的子類對象,這個子類對象是用來給Context的構造函數傳遞參數的;Context就是ServiceContextImpl,也就是說addService里面的參數,就是為了創建一個ServiceContextImpl對象,這個對象創建出來以后,被存儲在pipelinebase的service_ctxs_成員中。在addService接口中,還將pipeline自身,作為參數,傳遞給了ServiceContextImpl。通過代碼看看這些參數怎么用

  explicit ServiceContextImpl(
      std::weak_ptr<PipelineBase> pipeline,
      std::weak_ptr<S> service) {
    this->impl_ = this;
    this->initialize(pipeline, std::move(service));
  }

  void initialize(
      std::weak_ptr<PipelineBase> pipeline,
      std::weak_ptr<S> service) {
    pipeline_weak_ = pipeline;
    pipeline_raw_ = pipeline.lock().get();
    service_ = std::move(service);
  }

    std::weak_ptr<S> getService() {
       return service_;
    }

 
        

ServiceContextImpl在構造時存儲了PipelineBase和Service,這樣外面再使用時,可以通過getService來獲取到Service的實例。

這個獲取操作很重要,看一下pipeline的notifyUpdate方法,看實際的處理handler(RtcpProcessorHandler)

void RtcpProcessorHandler::notifyUpdate() {
  auto pipeline = getContext()->getPipelineShared();
  if (pipeline && !stream_) {
    stream_ = pipeline->getService<MediaStream>().get();
    processor_ = pipeline->getService<RtcpProcessor>();
    stats_ = pipeline->getService<Stats>();
  }
}

這個地方好神奇,通過調用getService方法,模板傳遞不同的類型,則能夠獲取到不同的對象實例。看一下getService方法

template <class S>
std::shared_ptr<S> PipelineBase::getService() {
  auto ctx = getServiceContext<S>();
  return ctx ? ctx->getService().lock() : std::shared_ptr<S>();
}

template <class S>
typename ServiceContextType<S>::type* PipelineBase::getServiceContext() {
  for (auto pipeline_service_ctx : service_ctxs_) {
    auto ctx = dynamic_cast<typename ServiceContextType<S>::type*>(pipeline_service_ctx.get());
    if (ctx) {
      return ctx;
    }
  }
  return nullptr;
}

在getServiceContext方法里面,遍歷了pipeline的service_ctxs_,並對每一個ctx進行dynamic_cast轉換,能夠成功,就返回,不能成功就繼續。這個地方真是靈活使用,奇思妙想。

到這里,就形成了一個共享的方式,所有的handler,都可以獲得到所有的service的子類實例,在實現過程中就極大的提升了靈活性,每個service獨立做自己的事兒,並且由handler直接進行數據驅動,簡直太符合這個媒體處理的需要了。

 

總結:Service的核心意義是共享,即每個handler都可以通過類型來獲取到所有的Service子類實例,進行使用,而不必要為每個Handler定義不同的接口來傳遞Service對象。Service也為了多個Handler公用數據而提供服務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM