Pipeline是媒體處理的核心流程邏輯。
Pipeline里面定義了兩個主要的概念:Service和Handler。
Service負責處理那些不僅要看當前數據包,還要分析之前的數據包的那些業務,比如丟包重傳;Handler處理當前的數據包的情形,比如生成填充字節。
在Pipeline里面,Handler和Service是配合起來一起工作的,他們通過一套框架將之關聯起來。
先看看使用上:
void MediaStream::initializePipeline() { handler_manager_ = std::make_shared<HandlerManager>(shared_from_this()); pipeline_->addService(shared_from_this()); pipeline_->addService(handler_manager_); pipeline_->addService(rtcp_processor_); pipeline_->addService(stats_); pipeline_->addService(quality_manager_); pipeline_->addService(packet_buffer_); pipeline_->addFront(std::make_shared<PacketReader>(this)); pipeline_->addFront(std::make_shared<RtcpProcessorHandler>()); pipeline_->addFront(std::make_shared<FecReceiverHandler>()); pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>()); pipeline_->addFront(std::make_shared<QualityFilterHandler>()); pipeline_->addFront(std::make_shared<IncomingStatsHandler>()); pipeline_->addFront(std::make_shared<RtpTrackMuteHandler>()); pipeline_->addFront(std::make_shared<RtpSlideShowHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>()); pipeline_->addFront(std::make_shared<PliPacerHandler>()); pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>()); pipeline_->addFront(std::make_shared<RtcpFeedbackGenerationHandler>()); pipeline_->addFront(std::make_shared<RtpRetransmissionHandler>()); pipeline_->addFront(std::make_shared<SRPacketHandler>()); pipeline_->addFront(std::make_shared<SenderBandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<LayerDetectorHandler>()); pipeline_->addFront(std::make_shared<OutgoingStatsHandler>()); pipeline_->addFront(std::make_shared<PacketCodecParser>()); pipeline_->addFront(std::make_shared<PacketWriter>(this)); pipeline_->finalize(); pipeline_initialized_ = true; }
在初始化時,pipeline調用了addService和addFront接口,將Service和Handler添加到pipeline中去。在初始化里面,我們可以看到其支持了哪些處理。
在實際使用中,接收到的數據,調用pipeline的read接口,就完成了解析為裸數據的事兒;調用write接口,就完成了fec等處理數據的事兒。
pipeline的數據,read的源需要是srtp解密后的數據,處理后為rtp裸數據;write的源為rtp裸數據,處理后的數據經過srtp加密輸出到網絡。(網絡使用的是DtlsTransport接口對接的)
這些功能先不去管它,這里先弄清楚他們的架構和工作方式。
閱讀這塊兒的代碼真是不容易,使用了很多模板類,為了方便理解,菜鳥哥根據代碼,把所有的模板類替換為了實際的基類,來進行理解。
先看看pipeline的Service部分的繼承體系以及數據結構:
再結合PipelineBase的addService實現,看一下Service是干啥用的
template <class S> void PipelineBase::addService(std::shared_ptr<S> service) { typedef typename ServiceContextType<S>::type Context; service_ctxs_.push_back(std::make_shared<Context>(shared_from_this(), std::move(service))); }
template <class Service> struct ServiceContextType { typedef ServiceContextImpl<Service> type; };
addService其實就是傳遞一個Service的子類對象,這個子類對象是用來給Context的構造函數傳遞參數的;Context就是ServiceContextImpl,也就是說addService里面的參數,就是為了創建一個ServiceContextImpl對象,這個對象創建出來以后,被存儲在pipelinebase的service_ctxs_成員中。在addService接口中,還將pipeline自身,作為參數,傳遞給了ServiceContextImpl。通過代碼看看這些參數怎么用
explicit ServiceContextImpl( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { this->impl_ = this; this->initialize(pipeline, std::move(service)); } void initialize( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { pipeline_weak_ = pipeline; pipeline_raw_ = pipeline.lock().get(); service_ = std::move(service); }
std::weak_ptr<S> getService() {
return service_;
}
ServiceContextImpl在構造時存儲了PipelineBase和Service,這樣外面再使用時,可以通過getService來獲取到Service的實例。
這個獲取操作很重要,看一下pipeline的notifyUpdate方法,看實際的處理handler(RtcpProcessorHandler)
void RtcpProcessorHandler::notifyUpdate() { auto pipeline = getContext()->getPipelineShared(); if (pipeline && !stream_) { stream_ = pipeline->getService<MediaStream>().get(); processor_ = pipeline->getService<RtcpProcessor>(); stats_ = pipeline->getService<Stats>(); } }
這個地方好神奇,通過調用getService方法,模板傳遞不同的類型,則能夠獲取到不同的對象實例。看一下getService方法
template <class S> std::shared_ptr<S> PipelineBase::getService() { auto ctx = getServiceContext<S>(); return ctx ? ctx->getService().lock() : std::shared_ptr<S>(); } template <class S> typename ServiceContextType<S>::type* PipelineBase::getServiceContext() { for (auto pipeline_service_ctx : service_ctxs_) { auto ctx = dynamic_cast<typename ServiceContextType<S>::type*>(pipeline_service_ctx.get()); if (ctx) { return ctx; } } return nullptr; }
在getServiceContext方法里面,遍歷了pipeline的service_ctxs_,並對每一個ctx進行dynamic_cast轉換,能夠成功,就返回,不能成功就繼續。這個地方真是靈活使用,奇思妙想。
到這里,就形成了一個共享的方式,所有的handler,都可以獲得到所有的service的子類實例,在實現過程中就極大的提升了靈活性,每個service獨立做自己的事兒,並且由handler直接進行數據驅動,簡直太符合這個媒體處理的需要了。
總結:Service的核心意義是共享,即每個handler都可以通過類型來獲取到所有的Service子類實例,進行使用,而不必要為每個Handler定義不同的接口來傳遞Service對象。Service也為了多個Handler公用數據而提供服務。