licode學習之erizo篇--Pipeline_handle


erizo的pipeline的handle,是媒體數據處理的基本操作,handle分為3類:IN,OUT,BOTH

IN:數據進入handle,handle需要read數據並傳遞給下一級

OUT:數據進入handle,handle需要write數據並傳遞給下一級

BOTH:可以同時進行read和write

在宏觀語義上,IN的目標是輸出rtp裸數據;OUT的目標是封裝rtp裸數據

pipeline的handle的繼承體系如下:

handle有三種:InboundHandler、Handler、OutboundHandler;他們分別對應一個Context

InboundHandler==>InboundContextImpl

Handler==>ContextImpl

OutboundHandler==>OutboundContextImpl

他們的工作方式差不多,就挑一個InboundHandler來進行說明。pipeline有addFront方法,該方法是注冊Handler的,先看看這個方法的相關實現:

template <class H>
PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
  typedef typename ContextType<H>::type Context;
  return addHelper(
      std::make_shared<Context>(shared_from_this(), std::move(handler)),
      true);
}
template <class Context>
PipelineBase& PipelineBase::addHelper(
    std::shared_ptr<Context>&& ctx,  // NOLINT
    bool front) {
  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
  if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_IN) {
    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
  }
  if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_OUT) {
    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
  }
  return *this;
}

addFront的功能,就是創建一個Context的對象,並將之傳遞給addHelper為參數。Context的構造還被傳遞了Handler自己的指針,讓Context能夠知道handler

addHelper將Context存儲起來,並且判斷類型,分別放到inCtxs_和outCtxs_里面,待用

Context的實際類型是啥呢?

template <class Handler>
struct ContextType {
  typedef typename std::conditional<
    Handler::dir == HandlerDir_BOTH,
    ContextImpl<Handler>,
    typename std::conditional<
      Handler::dir == HandlerDir_IN,
      InboundContextImpl<Handler>,
      OutboundContextImpl<Handler>
    >::type>::type
  type;
};

Context的實際類型,要依托於Handler::dir成員的值,這個值是一個常量,是每個Handler都有的。

HandlerDir_IN:Context為InboundContextImpl

HandlerDir_BOTH:Context為ContextImpl

HandlerDir_其他:Context為OutboundContextImpl

所以,pipeline的addFront方法,實際上是創建HandlerContext的實例,並且將之存儲的過程。

 

pipeline的數據鏈路建立,是怎么樣的呢

void Pipeline::finalize() {
  front_ = nullptr;
  if (!inCtxs_.empty()) {
    front_ = dynamic_cast<InboundLink*>(inCtxs_.front());
    for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
      inCtxs_[i]->setNextIn(inCtxs_[i+1]);
    }
    inCtxs_.back()->setNextIn(nullptr);
  }

  back_ = nullptr;
  if (!outCtxs_.empty()) {
    back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());
    for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
      outCtxs_[i]->setNextOut(outCtxs_[i-1]);
    }
    outCtxs_.front()->setNextOut(nullptr);
  }

  if (!front_) {
    // detail::logWarningIfNotUnit<R>(
    //     "No inbound handler in Pipeline, inbound operations will throw "
    //     "std::invalid_argument");
  }
  if (!back_) {
    // detail::logWarningIfNotUnit<W>(
    //     "No outbound handler in Pipeline, outbound operations will throw "
    //     "std::invalid_argument");
  }

  for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  for (auto it = service_ctxs_.rbegin(); it != service_ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  notifyUpdate();
}

pipeline里面的finalize方法為In和Out的Context設置任務鏈,並且設置頭結點(front_,back_)之后,每個HandlerContext就知道自己的下一級任務是什么了

還需要了解一下觸發方式:

MediaStream的OnTransportData獲得packet數據

void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) {
  if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) {
    return;
  }

  std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);

  if (transport->mediaType == AUDIO_TYPE) {
    packet->type = AUDIO_PACKET;
  } else if (transport->mediaType == VIDEO_TYPE) {
    packet->type = VIDEO_PACKET;
  }
  auto stream_ptr = shared_from_this();

  worker_->task([stream_ptr, packet]{
    if (!stream_ptr->pipeline_initialized_) {
      ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog());
      return;
    }

    char* buf = packet->data;
    RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
    RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
    if (!chead->isRtcp()) {
      uint32_t recvSSRC = head->getSSRC();
      if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
        packet->type = VIDEO_PACKET;
      } else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
        packet->type = AUDIO_PACKET;
      }
    }

    if (stream_ptr->pipeline_) {
      stream_ptr->pipeline_->read(std::move(packet));
    }
  });
}

在里面調用了pipeline的read方法,還需要知道read里面做什么了

void Pipeline::read(std::shared_ptr<DataPacket> packet) {
  if (!front_) {
    return;
  }
  front_->read(std::move(packet));
}

調用了front的read,front是一個HandlerContext對象,並且是處理鏈的頭結點

  void read(std::shared_ptr<DataPacket> packet) override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->read(this, std::move(packet));
  }

在HandlerContext里面調用了handler_的read方法,並且把自己作為參數也同時傳遞給了handler。

之后找一個真正的handler對象,看一下它的read實現:

void LayerDetectorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
  RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
  if (!chead->isRtcp() && enabled_ && packet->type == VIDEO_PACKET) {
    if (packet->codec == "VP8") {
      parseLayerInfoFromVP8(packet);
    } else if (packet->codec == "VP9") {
      parseLayerInfoFromVP9(packet);
    } else if (packet->codec == "H264") {
      parseLayerInfoFromH264(packet);
    }
  }
  ctx->fireRead(std::move(packet));
}

在handler的read里面,再次調用了ctx的fireRead方法,並且把packet進行傳遞

  void fireRead(std::shared_ptr<DataPacket> packet) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->read(std::move(packet));
    }
  }

在fireRead中,調用了nextIn_的read方法,nextIn_是下一個HandlerContext。

這樣就形成了一個任務鏈。

ContextRead-->HanderRead-->Context fireRead-->Context next read-->ContextRead.........

形成一個read的任務鏈。

write的基本原理也是相似的。

 

總結:erizo的pipeline的handler是負責實際數據處理的,通過處理鏈路,將之串聯起來


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM