licode 學習總結


參考:

licode編譯以及源碼分析:https://www.cnblogs.com/limedia/category/1350712.html

Licode—基於webrtc的SFU/MCU實現:https://www.jianshu.com/p/dcc5ba06b49f

Licode架構分析:https://blog.csdn.net/haitian403/article/details/89136984#6__42

licode 的singlepc 模式是怎么回事?:https://blog.csdn.net/ddr77/article/details/90065302

1.系統組成

 

 

licode-server系統組成

 

     抽取的代碼沒有webrtc自帶的音視頻處理框架(voice engine/video engine/media engine),這是由於SFC只在RTP層進行轉發並不解碼,只需要check一下媒體流屬性(如I幀頭)即可。

    抽取代碼的亮點在於將webrtc強大的抵抗網絡時延和丟包的網絡適應性算法和協議都提煉出來了,可以供廣大研究視頻傳輸的網絡適應方向的開發者們單獨學習、實驗並快速集成到自己非webrtc-based的產品中來。

    libav是從ffmpeg中分離出來的開發者發布的開源工程,代碼量更小、第三方依賴庫更少、編譯簡單。目前FFmpeg的使用更廣泛。

    libnice基於ICE協議實現的網絡層庫,Licode使用libnice庫來實現端到端的ICE連接和數據流發送接收,以及candidates(候選地址)和SDP(媒體描述文件)的相互交換。(但是配置文件中默認使用nicer,此配置文件僅是example的配置)

    libsrtp庫主要是是用來加密rtp/rtcp的。

2.licode抽取webrtc組件

 

 3.C++ Source

 

 4.WebrtcConnection

WebrtcConnection是erizo進行Webrtc交互的基礎類。

交互之主動發起流程:

licode-webrtcconnection主動發起交互

被動呼叫:

licode-webrtcconnection被動呼叫發起交互

std::string connection_id_; //唯一的ID
  bool audio_enabled_; //如果主動發起請求,被createOffer函數賦值,否則被processRemote賦值,表示是否有音頻交互
  bool video_enabled_; //表示是否有視頻交互
  bool trickle_enabled_; //啟用ice的trickle模式
  bool slide_show_mode_; //沒有用,應該是留作擴展或者是以前版本的殘留
  bool sending_;//發送數據開關
  int bundle_;//ice 使用,是否使用合並端口收發數據(音頻和視頻)
  WebRtcConnectionEventListener* conn_event_listener_; //webrtc連接事件監聽
  IceConfig ice_config_;//ice配置
  std::vector<RtpMap> rtp_mappings_; //支持的RtpMap,生成sdp使用
  RtpExtensionProcessor extension_processor_; //支持的extension_processor,生成sdp使用
  boost::condition_variable cond_;  

  std::shared_ptr<Transport> video_transport_, audio_transport_; //視頻數據鏈路,音頻數據鏈路

  std::shared_ptr<Stats> stats_; //輸出狀態
  WebRTCEvent global_state_; //webrtc事件狀態枚舉

  boost::mutex update_state_mutex_; //
  boost::mutex event_listener_mutex_;

  std::shared_ptr<Worker> worker_; 
  std::shared_ptr<IOWorker> io_worker_;
  std::vector<std::shared_ptr<MediaStream>> media_streams_; //這個connection使用的流
  std::shared_ptr<SdpInfo> remote_sdp_; //對端的sdp
  std::shared_ptr<SdpInfo> local_sdp_; //本地的sdp
  bool audio_muted_; //擴展,在源文件中未看到使用,僅初始化
  bool video_muted_; //擴展,在源文件中未看到使用,僅初始化
  bool first_remote_sdp_processed_; //是否第一次處理sdp標識

  std::unique_ptr<BandwidthDistributionAlgorithm> distributor_; //remb碼率估計處理
/**
 * WebRTC Events
 */
enum WebRTCEvent {
  CONN_INITIAL = 101, CONN_STARTED = 102, CONN_GATHERED = 103, CONN_READY = 104, CONN_FINISHED = 105,
  CONN_CANDIDATE = 201, CONN_SDP = 202, CONN_SDP_PROCESSED = 203,
  CONN_FAILED = 500
};

class WebRtcConnectionEventListener {
 public:
    virtual ~WebRtcConnectionEventListener() {
    }
    virtual void notifyEvent(WebRTCEvent newEvent, const std::string& message) = 0;
};

從成員可以看出,webrtcconnection,主要控制的有鏈路transport,交互local_sdp remote_sdp, ice控制,事件監聽回調,數據流media_streams。主要依托於webrtc的標准交互:offer,candidate,answer來進行。整體流程需要根據ice的相關事件進行驅動,實際上是程序控制與ice狀態的雙重驅動控制。

 

 5.Worker

 erizo使用Worker來管理Task,每個Task是一個函數片段,其執行完全由Worker來接管。

 

class Worker : public std::enable_shared_from_this<Worker> {
 public:
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
  typedef std::function<void()> Task; //返回值為空的函數為Task
  typedef std::function<bool()> ScheduledTask;  //返回值為bool的函數為scheduletask

  explicit Worker(std::weak_ptr<Scheduler> scheduler,
                  std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());
  ~Worker();

  virtual void task(Task f); //設置運行Task

  virtual void start(); //開啟線程
  virtual void start(std::shared_ptr<std::promise<void>> start_promise);  //同步方式開啟線程,即確定線程啟動后,調用者才會返回
  virtual void close(); //停止線程

  virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta); //定時器,可以取消的定時器
  virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id); //取消定時器

  virtual void scheduleEvery(ScheduledTask f, duration period);  //循環定時器,f返回false時停止執行

 private:
  void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
  std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);

 protected:
  int next_scheduled_ = 0;

 private:
  std::weak_ptr<Scheduler> scheduler_;
  std::shared_ptr<Clock> clock_;
  boost::asio::io_service service_;
  asio_worker service_worker_;
  boost::thread_group group_;
  std::atomic<bool> closed_;
};

在構造函數中,使用boost::asio::io_service,構建了基本的線程架構。

Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
    : scheduler_{scheduler}, //構造定時器變量
      clock_{the_clock}, //構造自己的時鍾變量
      service_{}, //構造io_service對象
      service_worker_{new asio_worker::element_type(service_)}, //為io_service注入service_worker,避免直接退出
      closed_{false} { //線程控制變量,為true時,退出
}

Worker提供了兩個start函數,無參的直接創建一個promise,調用有參數的,並且並未使用get_future.wait進行流程控制。這里就可以理解為:無參數start,不關心線程是否創建成功,如果在線程沒有創建成功時,調用了task函數,則可能出現異常錯誤。有參數的start為開發控制線程存在,優化處理流程提供了可能。

void Worker::start() {
  auto promise = std::make_shared<std::promise<void>>();
  start(promise);
}

void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
  auto this_ptr = shared_from_this();
  auto worker = [this_ptr, start_promise] { //創建一個代理worker,准備好執行過程
    start_promise->set_value(); //通知promise,線程已經就緒
    if (!this_ptr->closed_) { //如果不是close狀態
      return this_ptr->service_.run(); //調用io service的run函數,開啟線程過程
    }
    return size_t(0);
  };
  group_.add_thread(new boost::thread(worker)); //實際創建線程,並將之添加到group里面
}
void Worker::close() {
  closed_ = true;
  service_worker_.reset();
  group_.join_all();
  service_.stop();
}

在close函數中,將變量設為true,並調用各種析構。從start和close的控制可以看到,Worker的start和close只能成功調用一次,如果close以后,再start,線程就會直接退出了。這應該也是一個小弊端。

task調用io service的dispatch,直接執行任務。也就是說task實際上就是一個基礎的處理,讓任務進行執行。

void Worker::task(Task f) {
  service_.dispatch(f);//函數在當前線程立即執行,且同步(post在另外線程執行,異步)
}

Worker也設計了定時執行.在scheduleFromNow里面,調用了scheduler的scheduleFromNow方法,在scheduler里面,放入任務隊列,進入定時線程,到達時間后,執行Worker的task方法,投遞一個Task,進而激活Worker,運行Task內容,完成定時執行操作。

std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) {
  auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
  auto id = std::make_shared<ScheduledTaskReference>();
  if (auto scheduler = scheduler_.lock()) {
    scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,給scheduler的scheduleFromNow做參數傳遞
      this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,生成的task功能是如果id->isCancelled為true,直接返回,否則執行f,並將這個task傳遞給自己的任務投遞方法
        {
          if (id->isCancelled()) {
            return;
          }
        }
        f();
      }));
    }), delta_ms);
  }
  return id;
}

循環定時器,使用遞歸調用,來實現循環定時器,其停止依托於ScheduledTask的返回值為false,停止循環。

void Worker::scheduleEvery(ScheduledTask f, duration period) {
  scheduleEvery(f, period, period);
}

void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay) {
  time_point start = clock_->now();
  std::shared_ptr<Clock> clock = clock_;

  scheduleFromNow(safeTask([start, period, next_delay, f, clock](std::shared_ptr<Worker> this_ptr) {
    if (f()) {
      duration clock_skew = clock->now() - start - next_delay;
      duration delay = period - clock_skew;
      this_ptr->scheduleEvery(f, period, delay); //循環遞歸調用
    }
  }), std::max(next_delay, duration{0}));
}

Worker提供了基本的線程管理,提供了Task執行機制以及定時器控制機制,但是沒有提供資源重復使用的機制,即多次調用close,start的機制

 6.IOWorker

 erizo使用IOWorker進行ICE,DTLS的狀態交互處理。

namespace erizo {

class IOWorker : public std::enable_shared_from_this<IOWorker> {
 public:
  typedef std::function<void()> Task;
  IOWorker();
  virtual ~IOWorker();

  virtual void start();
  virtual void start(std::shared_ptr<std::promise<void>> start_promise);
  virtual void close();

  virtual void task(Task f);

 private:
  std::atomic<bool> started_;
  std::atomic<bool> closed_;
  std::unique_ptr<std::thread> thread_;
  std::vector<Task> tasks_;
  mutable std::mutex task_mutex_;
};
}  // namespace erizo

接口定義與Worker基本沒有區別,但是內部使用了atomic變量,而沒有使用boost的io service,說明線程的執行是自己控制.

void IOWorker::start() {
  auto promise = std::make_shared<std::promise<void>>();
  start(promise);
}

void IOWorker::start(std::shared_ptr<std::promise<void>> start_promise) {
  if (started_.exchange(true)) {
    return;
  }

  thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] {
    start_promise->set_value();
    while (!closed_) {
      int events;
      struct timeval towait = {0, 100000};
      struct timeval tv;
      int r = NR_async_event_wait2(&events, &towait);
      if (r == R_EOD) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
      }
      gettimeofday(&tv, 0);
      NR_async_timer_update_time(&tv);
      std::vector<Task> tasks;
      {
        std::unique_lock<std::mutex> lock(task_mutex_);
        tasks.swap(tasks_);
      }
      for (Task &task : tasks) {
        task();
      }
    }
  }));
}

void IOWorker::task(Task f) {
  std::unique_lock<std::mutex> lock(task_mutex_);
  tasks_.push_back(f);
}

在start里面做了重入檢測判斷,如果重入,直接返回。

在線程函數體內部,進行了定時處理,即sleep一段時間,執行所有的task。

在task函數中,將Task放入vector中,從總體實現上,與worker有很大的區別,但是從使用角度,基本是無差別的。

在IOWorker里面,使用效率可能不如Worker的效率高,而且人為的將任務集中執行,可能造成瞬時cpu過高。

7.Transport

erizo的transport部分負責網絡鏈路處理,其包含ice處理,數據包packet處理傳遞。

transport存在,主要是為Dtls-srtp數據處理提供封裝,其關聯着ice與外部接口webrtcconnection。其關系圖如下:

 

 erizo提供了兩套ICE的方案,分別使用不同的ice庫,可以再iceconfig參數里進行設置。

erizo的DtlsTransport,提供了dtls交互,srtp加密和解密。

erizo的鏈路數據接收模型,是定義一個listener,下一級繼承listener,同級聚合listener,實現異步數據的逐級傳遞。

在 DtlsTransport類的構造函數中根據配置決定初始化哪個庫:

    if (iceConfig_.use_nicer) {
      ice_ = NicerConnection::create(io_worker_, iceConfig_);
    } else {
      ice_.reset(LibNiceConnection::create(iceConfig_));
    }

DtlsTransport會將上層來的數據傳給ICE,負責進行加解密,listener會被設置成dtls實例

void DtlsTransport::start() {
  ice_->setIceListener(shared_from_this());
  ice_->copyLogContextFrom(*this);
  ELOG_DEBUG("%s message: starting ice", toLog());
  ice_->start();
}
void DtlsTransport::onDtlsPacket(DtlsSocketContext *ctx, const unsigned char* data, unsigned int len) {
  bool is_rtcp = ctx == dtlsRtcp.get();
  int component_id = is_rtcp ? 2 : 1;

  packetPtr packet = std::make_shared<DataPacket>(component_id, data, len);

  if (is_rtcp) {
    writeDtlsPacket(dtlsRtcp.get(), packet);
  } else {
    writeDtlsPacket(dtlsRtp.get(), packet);
  }

  ELOG_DEBUG("%s message: Sending DTLS message, transportName: %s, componentId: %d",
             toLog(), transport_name.c_str(), packet->comp);
}

void DtlsTransport::writeDtlsPacket(DtlsSocketContext *ctx, packetPtr packet) {
  char data[1500];
  unsigned int len = packet->length;
  memcpy(data, packet->data, len);
  writeOnIce(packet->comp, data, len);
}
//Transport.h
 void writeOnIce(int comp, void* buf, int len) {
    if (!running_) {
      return;
    }
    ice_->sendData(comp, buf, len);
 }
 
        
void DtlsTransport::onIceData(packetPtr packet) {
if (!running_) {
return;
}

int len = packet->length;

char *data = packet->data;
unsigned int component_id = packet->comp; 
int length = len;
SrtpChannel
*srtp = srtp_.get();
//判斷是否是dtls包,
rtp, dtls, stun, unknown
if (DtlsTransport::isDtlsPacket(data, len)) {
    ELOG_DEBUG("%s message: Received DTLS message, transportName: %s, componentId: %u",
               toLog(), transport_name.c_str(), component_id);
    if (component_id == 1) {
      std::lock_guard<std::mutex> guard(dtls_mutex);
      dtlsRtp->read(reinterpret_cast<unsigned char*>(data), len);
    } else {
      std::lock_guard<std::mutex> guard(dtls_mutex);
      dtlsRtcp->read(reinterpret_cast<unsigned char*>(data), len);
    }
    return;
  } else if (this->getTransportState() == TRANSPORT_READY) {
    std::shared_ptr<DataPacket> unprotect_packet = std::make_shared<DataPacket>(component_id,
      data, len, VIDEO_PACKET, packet->received_time_ms);

    if (dtlsRtcp != NULL && component_id == 2) {
      srtp = srtcp_.get();
    }
//如果不是dtls包會進行解密
if (srtp != NULL) { RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(unprotect_packet->data); if (chead->isRtcp()) { if (srtp->unprotectRtcp(unprotect_packet->data, &unprotect_packet->length) < 0) { return; } } else { if (srtp->unprotectRtp(unprotect_packet->data, &unprotect_packet->length) < 0) { return; } } } else { return; } if (length <= 0) { return; } if (auto listener = getTransportListener().lock()) { listener->onTransportData(unprotect_packet, this); } } }

解析出來的數據會傳給listener,而listener是webrtcconnection的實例,通過pipe調用read傳入media_stream中,UDP---ICE---DTLS---SRTP---RTP

void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Transport *transport) {
  if (getCurrentState() != CONN_READY) {
    return;
  }
  if (transport->mediaType == AUDIO_TYPE) {
    packet->type = AUDIO_PACKET;
  } else if (transport->mediaType == VIDEO_TYPE) {
    packet->type = VIDEO_PACKET;
  }
  asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {
    if (!connection->pipeline_initialized_) {
      ELOG_DEBUG("%s message: Pipeline not initialized yet.", connection->toLog());
      return;
    }

    if (connection->pipeline_) {
      connection->pipeline_->read(std::move(packet));
    }
  });
}

void WebRtcConnection::read(std::shared_ptr<DataPacket> packet) {
  Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
  if (transport == nullptr) {
    return;
  }
  char* buf = packet->data;
  RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
  if (chead->isRtcp()) {
    onRtcpFromTransport(packet, transport);
    return;
  } else {
    RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
    uint32_t ssrc = head->getSSRC();
    forEachMediaStream([packet, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream) {
      if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) {
        media_stream->onTransportData(packet, transport);
      }
    });
  }
}

 8.MediaStream

 MediaStream是erizo進行流數據處理的核心模塊。當網絡數據,經過DtlsTransport進行srtp解密后,得到的rtp裸數據與rtcp裸數據,都要進入MediaStream進行處理;需要發送給對方的rtp數據與rtcp裸數據也要經過MediaStream處理后,才會給DtlsTransport進行加密並發送。

 
/**
 * A MediaStream. This class represents a Media Stream that can be established with other peers via a SDP negotiation
 */
class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
                        public FeedbackSource, public LogContext, public HandlerManagerListener,
                        public std::enable_shared_from_this<MediaStream>, public Service {

整個繼承體系里面,涉及處理的基類有:MediaSink,MediaSource,FeedbackSink,FeedbackSource, HandlerManagerListener,Service

MediaStream同時承載着收數據處理,和發送數據處理兩部分內容。其中和丟包重傳等結合起來,就變為:接收rtp數據,發送rtcp重傳信息;發送rtp數據,接收rtcp重傳信息。

MediaStream還繼承了HandlerManagerListener,Service這兩部分是媒體處理的核心模塊

在接口上面,需要對外提供發送和接收到的裸數據回調。

這里面erizo的實現,是將MediaSink與MediaSource糾纏到一起的。

MediaSink:負責發送數據(write to client)

FeedbackSink:負責發送數據(write to client)

MediaSource:負責read出來rtp數據 (read from client)

FeedbackSource:負責read出來數據(read from client)

MediaStream繼承MediaSink和FeedbackSink,所以直接調用MediaStream對象的deliverVideoData,deliverAudioData,deliverFeedback即可直接向對端發送數據。

要想接收對方的數據,需要MediaSource,FeedbackSource進行數據回調:

/**
 * A MediaSource is any class that produces audio or video data.
 */
class MediaSource: public virtual Monitor {
 protected:
    // SSRCs coming from the source
    uint32_t audio_source_ssrc_;
    std::vector<uint32_t> video_source_ssrc_list_;
    MediaSink* video_sink_;
    MediaSink* audio_sink_;
    MediaSink* event_sink_;
    // can it accept feedback
    FeedbackSink* source_fb_sink_;

 public:
    void setAudioSink(MediaSink* audio_sink) {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        this->audio_sink_ = audio_sink;
    }
    void setVideoSink(MediaSink* video_sink) {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        this->video_sink_ = video_sink;
    }
    void setEventSink(MediaSink* event_sink) {
      boost::mutex::scoped_lock lock(monitor_mutex_);
      this->event_sink_ = event_sink;
    }

    FeedbackSink* getFeedbackSink() {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        return source_fb_sink_;
    }
    virtual int sendPLI() = 0;
    uint32_t getVideoSourceSSRC() {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        if (video_source_ssrc_list_.empty()) {
          return 0;
        }
        return video_source_ssrc_list_[0];
    }
    void setVideoSourceSSRC(uint32_t ssrc) {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        if (video_source_ssrc_list_.empty()) {
          video_source_ssrc_list_.push_back(ssrc);
          return;
        }
        video_source_ssrc_list_[0] = ssrc;
    }
    std::vector<uint32_t> getVideoSourceSSRCList() {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        return video_source_ssrc_list_;  //  return by copy to avoid concurrent access
    }
    void setVideoSourceSSRCList(const std::vector<uint32_t>& new_ssrc_list) {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        video_source_ssrc_list_ = new_ssrc_list;
    }
    uint32_t getAudioSourceSSRC() {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        return audio_source_ssrc_;
    }
    void setAudioSourceSSRC(uint32_t ssrc) {
        boost::mutex::scoped_lock lock(monitor_mutex_);
        audio_source_ssrc_ = ssrc;
    }

    bool isVideoSourceSSRC(uint32_t ssrc) {
      auto found_ssrc = std::find_if(video_source_ssrc_list_.begin(), video_source_ssrc_list_.end(),
          [ssrc](uint32_t known_ssrc) {
          return known_ssrc == ssrc;
          });
      return (found_ssrc != video_source_ssrc_list_.end());
    }

    bool isAudioSourceSSRC(uint32_t ssrc) {
      return audio_source_ssrc_ == ssrc;
    }

    MediaSource() : audio_source_ssrc_{0}, video_source_ssrc_list_{std::vector<uint32_t>(1, 0)},
      video_sink_{nullptr}, audio_sink_{nullptr}, event_sink_{nullptr}, source_fb_sink_{nullptr} {}
    virtual ~MediaSource() {}

    virtual boost::future<void> close() = 0;
};

MediaSource定義了4個MediaSink對象,分別對應video,audio,event,feedback。

MediaStream繼承了MediaSource同樣的4個set接口,讓調用者可以設置MediaSource的這4個對象。當發生讀取事件時,MediaStream會調用這4個設置的MediaSink對象的相應方法,來向外傳遞數據。FeedbackSource也是同樣的道理。

void MediaStream::read(std::shared_ptr<DataPacket> packet) {
  char* buf = packet->data;
  int len = packet->length;
  // PROCESS RTCP
  RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
  RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
  uint32_t recvSSRC = 0;
  if (!chead->isRtcp()) {
    recvSSRC = head->getSSRC();
  } else if (chead->packettype == RTCP_Sender_PT || chead->packettype == RTCP_SDES_PT) {  // Sender Report
    recvSSRC = chead->getSSRC();
  }
  // DELIVER FEEDBACK (RR, FEEDBACK PACKETS)
  if (chead->isFeedback()) {
    if (fb_sink_ != nullptr && should_send_feedback_) {
      fb_sink_->deliverFeedback(std::move(packet));
    }
  } else {
    // RTP or RTCP Sender Report
    if (bundle_) {
      // Check incoming SSRC
      // Deliver data
      if (isVideoSourceSSRC(recvSSRC) && video_sink_) {
        parseIncomingPayloadType(buf, len, VIDEO_PACKET);
        parseIncomingExtensionId(buf, len, VIDEO_PACKET);
        video_sink_->deliverVideoData(std::move(packet));
      } else if (isAudioSourceSSRC(recvSSRC) && audio_sink_) {
        parseIncomingPayloadType(buf, len, AUDIO_PACKET);
        parseIncomingExtensionId(buf, len, AUDIO_PACKET);
        audio_sink_->deliverAudioData(std::move(packet));
      } else {
        ELOG_DEBUG("%s read video unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
                    toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
      }
    } else {
      if (packet->type == AUDIO_PACKET && audio_sink_) {
        parseIncomingPayloadType(buf, len, AUDIO_PACKET);
        parseIncomingExtensionId(buf, len, AUDIO_PACKET);
        // Firefox does not send SSRC in SDP
        if (getAudioSourceSSRC() == 0) {
          ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recvSSRC);
          this->setAudioSourceSSRC(recvSSRC);
        }
        audio_sink_->deliverAudioData(std::move(packet));
      } else if (packet->type == VIDEO_PACKET && video_sink_) {
        parseIncomingPayloadType(buf, len, VIDEO_PACKET);
        parseIncomingExtensionId(buf, len, VIDEO_PACKET);
        // Firefox does not send SSRC in SDP
        if (getVideoSourceSSRC() == 0) {
          ELOG_DEBUG("%s discoveredVideoSourceSSRC:%u", toLog(), recvSSRC);
          this->setVideoSourceSSRC(recvSSRC);
        }
        // change ssrc for RTP packets, don't touch here if RTCP
        video_sink_->deliverVideoData(std::move(packet));
      }
    }  // if not bundle
  }  // if not Feedback
}

這里,MediaStream的讀寫就清楚了,如果我們需要使用MediaStream,則需要做:

1、定義一個MediaSink的子類,將之設置給MediaStream,用於接收MediaStream的數據

2、直接調用MediaStream的deliver方法,讓其向外發送數據。

9.WebRTC中的SDP

 SDP描述由許多文本行組成,文本行的格式為<類型>=<值>,<類型>是一個字母,<值>是結構化的文本串,其格式依<類型>而定。

 

SDP的 文本信息包括:

  • 會話名稱和意圖
  • 會話持續時間
  • 構成會話的媒體
  • 有關接收媒體的信息

會話名稱和意圖描述

v = (協議版本)
o = (所有者/創建者和會話標識符)
s = (會話名稱)
i = * (會話信息)
u = * (URI 描述)
e = * (Email 地址)
p = * (電話號碼)
c = * (連接信息 ― 如果包含在所有媒體中,則不需要該字段)
b = * (帶寬信息)

時間描述

t = (會話活動時間)
r = * (0或多次重復次數)

媒體描述

m = (媒體名稱和傳輸地址)
i = * (媒體標題)
c = * (連接信息 — 如果包含在會話層則該字段可選)
b = * (帶寬信息)
k = * (加密密鑰)
a = * (0 個或多個會話屬性行)

v=0
//sdp版本號,一直為0,rfc4566規定
o=- 7017624586836067756 2 IN IP4 127.0.0.1
// o=<username> <sess-id> <sess-version> <nettype> <addrtype> <unicast-address>
//username如何沒有使用-代替,7017624586836067756是整個會話的編號,2代表會話版本,如果在會話
//過程中有改變編碼之類的操作,重新生成sdp時,sess-id不變,sess-version加1
s=-
//會話名,沒有的話使用-代替
t=0 0
//兩個值分別是會話的起始時間和結束時間,這里都是0代表沒有限制
a=group:BUNDLE audio video data //需要共用一個傳輸通道傳輸的媒體,如果沒有這一行,音視頻,數據就會分別單獨用一個udp端口來發送
a=msid-semantic: WMS h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C
//WMS是WebRTC Media Stream簡稱,這一行定義了本客戶端支持同時傳輸多個流,一個流可以包括多個track,
//一般定義了這個,后面a=ssrc這一行就會有msid,mslabel等屬性
m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 126
//m=audio說明本會話包含音頻,9代表音頻使用端口9來傳輸,但是在webrtc中一現在一般不使用,如果設置為0,代表不
//傳輸音頻,UDP/TLS/RTP/SAVPF是表示用戶來傳輸音頻支持的協議,udp,tls,rtp代表使用udp來傳輸rtp包,並使用tls加密
//SAVPF代表使用srtcp的反饋機制來控制通信過程,后台111 103 104 9 0 8 106 105 13 126表示本會話音頻支持的編碼,后台幾行會有詳細補充說明
c=IN IP4 0.0.0.0
//這一行表示你要用來接收或者發送音頻使用的IP地址,webrtc使用ice傳輸,不使用這個地址
a=rtcp:9 IN IP4 0.0.0.0
//用來傳輸rtcp地地址和端口,webrtc中不使用
a=ice-ufrag:khLS
a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ
//以上兩行是ice協商過程中的安全驗證信息
a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17
//以上這行是dtls協商過程中需要的認證信息
a=setup:actpass
//以上這行代表本客戶端在dtls協商過程中,可以做客戶端也可以做服務端,參考rfc4145 rfc4572
a=mid:audio
//在前面BUNDLE這一行中用到的媒體標識
a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
//上一行指出我要在rtp頭部中加入音量信息,參考 rfc6464
a=sendrecv
//上一行指出我是雙向通信,另外幾種類型是recvonly,sendonly,inactive
a=rtcp-mux
//上一行指出rtp,rtcp包使用同一個端口來傳輸
//下面幾行都是對m=audio這一行的媒體編碼補充說明,指出了編碼采用的編號,采樣率,聲道等
a=rtpmap:111 opus/48000/2
a=rtcp-fb:111 transport-cc
//以上這行說明opus編碼支持使用rtcp來控制擁塞,參考https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
a=fmtp:111 minptime=10;useinbandfec=1
//對opus編碼可選的補充說明,minptime代表最小打包時長是10ms,useinbandfec=1代表使用opus編碼內置fec特性
a=rtpmap:103 ISAC/16000
a=rtpmap:104 ISAC/32000
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=rtpmap:106 CN/32000
a=rtpmap:105 CN/16000
a=rtpmap:13 CN/8000
a=rtpmap:126 telephone-event/8000
a=ssrc:18509423 cname:sTjtznXLCNH7nbRw
//cname用來標識一個數據源,ssrc當發生沖突時可能會發生變化,但是cname不會發生變化,也會出現在rtcp包中SDEC中,
//用於音視頻同步
a=ssrc:18509423 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C 15598a91-caf9-4fff-a28f-3082310b2b7a
//以上這一行定義了ssrc和WebRTC中的MediaStream,AudioTrack之間的關系,msid后面第一個屬性是stream-d,第二個是track-id
a=ssrc:18509423 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C
a=ssrc:18509423 label:15598a91-caf9-4fff-a28f-3082310b2b7a
m=video 9 UDP/TLS/RTP/SAVPF 100 101 107 116 117 96 97 99 98
//參考上面m=audio,含義類似
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:khLS
a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ
a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17
a=setup:actpass
a=mid:video
a=extmap:2 urn:ietf:params:rtp-hdrext:toffset
a=extmap:3 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
a=extmap:4 urn:3gpp:video-orientation
a=extmap:5 http://www.ietf.org/id/draft-hol ... de-cc-extensions-01
a=extmap:6 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay
a=sendrecv
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:100 VP8/90000
a=rtcp-fb:100 ccm fir
//ccm是codec control using RTCP feedback message簡稱,意思是支持使用rtcp反饋機制來實現編碼控制,fir是Full Intra Request
//簡稱,意思是接收方通知發送方發送幅完全幀過來
a=rtcp-fb:100 nack
//支持丟包重傳,參考rfc4585
a=rtcp-fb:100 nack pli
//支持關鍵幀丟包重傳,參考rfc4585
a=rtcp-fb:100 goog-remb
//支持使用rtcp包來控制發送方的碼流
a=rtcp-fb:100 transport-cc
//參考上面opus
a=rtpmap:101 VP9/90000
a=rtcp-fb:101 ccm fir
a=rtcp-fb:101 nack
a=rtcp-fb:101 nack pli
a=rtcp-fb:101 goog-remb
a=rtcp-fb:101 transport-cc
a=rtpmap:107 H264/90000
a=rtcp-fb:107 ccm fir
a=rtcp-fb:107 nack
a=rtcp-fb:107 nack pli
a=rtcp-fb:107 goog-remb
a=rtcp-fb:107 transport-cc
a=fmtp:107 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f
//h264編碼可選的附加說明
a=rtpmap:116 red/90000
//fec冗余編碼,一般如果sdp中有這一行的話,rtp頭部負載類型就是116,否則就是各編碼原生負責類型
a=rtpmap:117 ulpfec/90000
//支持ULP FEC,參考rfc5109
a=rtpmap:96 rtx/90000
a=fmtp:96 apt=100
//以上兩行是VP8編碼的重傳包rtp類型
a=rtpmap:97 rtx/90000
a=fmtp:97 apt=101
a=rtpmap:99 rtx/90000
a=fmtp:99 apt=107
a=rtpmap:98 rtx/90000
a=fmtp:98 apt=116
a=ssrc-group:FID 3463951252 1461041037
//在webrtc中,重傳包和正常包ssrc是不同的,上一行中前一個是正常rtp包的ssrc,后一個是重傳包的ssrc
a=ssrc:3463951252 cname:sTjtznXLCNH7nbRw
a=ssrc:3463951252 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C ead4b4e9-b650-4ed5-86f8-6f5f5806346d
a=ssrc:3463951252 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C
a=ssrc:3463951252 label:ead4b4e9-b650-4ed5-86f8-6f5f5806346d
a=ssrc:1461041037 cname:sTjtznXLCNH7nbRw
a=ssrc:1461041037 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C ead4b4e9-b650-4ed5-86f8-6f5f5806346d
a=ssrc:1461041037 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C
a=ssrc:1461041037 label:ead4b4e9-b650-4ed5-86f8-6f5f5806346d
m=application 9 DTLS/SCTP 5000
c=IN IP4 0.0.0.0
a=ice-ufrag:khLS
a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ
a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17
a=setup:actpass
a=mid:data
a=sctpmap:5000 webrtc-datachannel 1024

 

 10.Pipeline_service

Pipeline是媒體處理的核心流程邏輯。

Pipeline里面定義了兩個主要的概念:Service和Handler。

Service負責處理那些不僅要看當前數據包,還要分析之前的數據包的那些業務,比如丟包重傳;Handler處理當前的數據包的情形,比如生成填充字節。

在Pipeline里面,Handler和Service是配合起來一起工作的,他們通過一套框架將之關聯起來。

void MediaStream::initializePipeline() {
  if (pipeline_initialized_) {
    return;
  }
  handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
  pipeline_->addService(shared_from_this());
  pipeline_->addService(handler_manager_);
  pipeline_->addService(rtcp_processor_);
  pipeline_->addService(stats_);
  pipeline_->addService(quality_manager_);
  pipeline_->addService(packet_buffer_);

  pipeline_->addFront(std::make_shared<PacketReader>(this));

  pipeline_->addFront(std::make_shared<RtcpProcessorHandler>());
  pipeline_->addFront(std::make_shared<FecReceiverHandler>());
  pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>());
  pipeline_->addFront(std::make_shared<QualityFilterHandler>());
  pipeline_->addFront(std::make_shared<IncomingStatsHandler>());
  pipeline_->addFront(std::make_shared<FakeKeyframeGeneratorHandler>());
  pipeline_->addFront(std::make_shared<RtpTrackMuteHandler>());
  pipeline_->addFront(std::make_shared<RtpSlideShowHandler>());
  pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>());
  pipeline_->addFront(std::make_shared<PeriodicPliHandler>());
  pipeline_->addFront(std::make_shared<PliPriorityHandler>());
  pipeline_->addFront(std::make_shared<PliPacerHandler>());
  pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>());
  pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>());
  pipeline_->addFront(std::make_shared<RtcpFeedbackGenerationHandler>());
  pipeline_->addFront(std::make_shared<RtpRetransmissionHandler>());
  pipeline_->addFront(std::make_shared<SRPacketHandler>());
  pipeline_->addFront(std::make_shared<LayerDetectorHandler>());
  pipeline_->addFront(std::make_shared<OutgoingStatsHandler>());
  pipeline_->addFront(std::make_shared<PacketCodecParser>());

  pipeline_->addFront(std::make_shared<PacketWriter>(this));
  pipeline_->finalize();

  if (connection_) {
    quality_manager_->setConnectionQualityLevel(connection_->getConnectionQualityLevel());
  }
  pipeline_initialized_ = true;
}

在初始化時,pipeline調用了addService和addFront接口,將Service和Handler添加到pipeline中去。在初始化里面,我們可以看到其支持了哪些處理。

在實際使用中,接收到的數據,調用pipeline的read接口,就完成了解析為裸數據的事兒;調用write接口,就完成了fec等處理數據的事兒。

pipeline的數據,read的源需要是srtp解密后的數據,處理后為rtp裸數據;write的源為rtp裸數據,處理后的數據經過srtp加密輸出到網絡。(網絡使用的是DtlsTransport接口對接的)

pipeline的Service部分的繼承體系以及數據結構:

 

 結合PipelineBase的addService實現:

template <class S>
void PipelineBase::addService(std::shared_ptr<S> service) {
  typedef typename ServiceContextType<S>::type Context;
  service_ctxs_.push_back(std::make_shared<Context>(shared_from_this(), std::move(service)));
}

template <class Service>
struct ServiceContextType {
  typedef ServiceContextImpl<Service> type;
};

addService其實就是傳遞一個Service的子類對象,這個子類對象是用來給Context的構造函數傳遞參數的;Context就是ServiceContextImpl,也就是說addService里面的參數,就是為了創建一個ServiceContextImpl對象,這個對象創建出來以后,被存儲在pipelinebase的service_ctxs_成員中。在addService接口中,還將pipeline自身,作為參數,傳遞給了ServiceContextImpl。

 

  explicit ServiceContextImpl(
      std::weak_ptr<PipelineBase> pipeline,
      std::weak_ptr<S> service) {
    this->impl_ = this;
    this->initialize(pipeline, std::move(service));
  }

  void initialize(
      std::weak_ptr<PipelineBase> pipeline,
      std::weak_ptr<S> service) {
    pipeline_weak_ = pipeline;
    pipeline_raw_ = pipeline.lock().get();
    service_ = std::move(service);
  }

ServiceContextImpl在構造時存儲了PipelineBase和Service,這樣外面再使用時,可以通過getService來獲取到Service的實例。

pipeline的notifyUpdate方法,看實際的處理handler(RtcpProcessorHandler)

void RtcpProcessorHandler::notifyUpdate() {
  auto pipeline = getContext()->getPipelineShared();
  if (pipeline && !stream_) {
    stream_ = pipeline->getService<MediaStream>().get();
    processor_ = pipeline->getService<RtcpProcessor>();
    stats_ = pipeline->getService<Stats>();
  }
}

通過調用getService方法,模板傳遞不同的類型,則能夠獲取到不同的對象實例.

template <class S>
typename ServiceContextType<S>::type* PipelineBase::getServiceContext() {
  for (auto pipeline_service_ctx : service_ctxs_) {
    auto ctx = dynamic_cast<typename ServiceContextType<S>::type*>(pipeline_service_ctx.get());
    if (ctx) {
      return ctx;
    }
  }
  return nullptr;
}

template <class S>
std::shared_ptr<S> PipelineBase::getService() {
  auto ctx = getServiceContext<S>();
  return ctx ? ctx->getService().lock() : std::shared_ptr<S>();
}

在getServiceContext方法里面,遍歷了pipeline的service_ctxs_,並對每一個ctx進行dynamic_cast轉換,能夠成功,就返回,不能成功就繼續。形成了一個共享的方式,所有的handler,都可以獲得到所有的service的子類實例,在實現過程中就極大的提升了靈活性,每個service獨立做自己的事兒,並且由handler直接進行數據驅動。

Service的核心意義是共享,即每個handler都可以通過類型來獲取到所有的Service子類實例,進行使用,而不必要為每個Handler定義不同的接口來傳遞Service對象。Service也為了多個Handler公用數據而提供服務。

11.Pipeline_handle

erizo的pipeline的handle,是媒體數據處理的基本操作,handle分為3類:IN,OUT,BOTH

IN:數據進入handle,handle需要read數據並傳遞給下一級

OUT:數據進入handle,handle需要write數據並傳遞給下一級

BOTH:可以同時進行read和write

在宏觀語義上,IN的目標是輸出rtp裸數據;OUT的目標是封裝rtp裸數據

pipeline的handle的繼承體系如下:

 

 

handle有三種:InboundHandler、Handler、OutboundHandler,他們的工作方式都差不多;他們分別對應一個Context:

InboundHandler==>InboundContextImpl

Handler==>ContextImpl

OutboundHandler==>OutboundContextImpl

pipeline有addFront方法,該方法是注冊Handler的,先看看這個方法的相關實現:

template <class H>
PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
  typedef typename ContextType<H>::type Context;
  return addHelper(
      std::make_shared<Context>(shared_from_this(), std::move(handler)),
      true);
}

template <class H>
PipelineBase& PipelineBase::addFront(H&& handler) {
  return addFront(std::make_shared<H>(std::forward<H>(handler)));
}

template <class H>
PipelineBase& PipelineBase::addFront(H* handler) {
  return addFront(std::shared_ptr<H>(handler, [](H*){}));  // NOLINT
}

template <class Context>
PipelineBase& PipelineBase::addHelper(
    std::shared_ptr<Context>&& ctx,  // NOLINT
    bool front) {
  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
  }
  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
  }
  return *this;
}

在licode中輸入傳參都是shared_ptr,addFront的功能,就是創建一個Context的對象,並將之傳遞給addHelper為參數。Context的構造還被傳遞了Handler自己的指針,讓Context能夠知道handler。

addHelper將Context存儲起來,並且判斷類型,分別放到inCtxs_和outCtxs_里面,待用

template <class Handler>
struct ContextType {
  typedef typename std::conditional<
    Handler::dir == HandlerDir_BOTH,
    ContextImpl<Handler>,
    typename std::conditional<
      Handler::dir == HandlerDir_IN,
      InboundContextImpl<Handler>,
      OutboundContextImpl<Handler>
    >::type>::type
  type;
};

Context的實際類型,要依托於Handler::dir成員的值,這個值是一個常量,是每個Handler都有的。

HandlerDir_IN:Context為InboundContextImpl

HandlerDir_BOTH:Context為ContextImpl

HandlerDir_其他:Context為OutboundContextImpl

所以,pipeline的addFront方法,實際上是創建HandlerContext的實例,並且將之存儲的過程。

pipeline的數據鏈路建立:

void Pipeline::finalize() {
  front_ = nullptr;
  if (!inCtxs_.empty()) {
    front_ = dynamic_cast<InboundLink*>(inCtxs_.front());
    for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
      inCtxs_[i]->setNextIn(inCtxs_[i+1]);
    }
    inCtxs_.back()->setNextIn(nullptr);
  }

  back_ = nullptr;
  if (!outCtxs_.empty()) {
    back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());
    for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
      outCtxs_[i]->setNextOut(outCtxs_[i-1]);
    }
    outCtxs_.front()->setNextOut(nullptr);
  }

  if (!front_) {
    // detail::logWarningIfNotUnit<R>(
    //     "No inbound handler in Pipeline, inbound operations will throw "
    //     "std::invalid_argument");
  }
  if (!back_) {
    // detail::logWarningIfNotUnit<W>(
    //     "No outbound handler in Pipeline, outbound operations will throw "
    //     "std::invalid_argument");
  }

  for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  for (auto it = service_ctxs_.rbegin(); it != service_ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  notifyUpdate();
}

pipeline里面的finalize方法為In和Out的Context設置任務鏈,並且設置頭結點(front_,back_)之后,每個HandlerContext就知道自己的下一級任務是什么了。

MediaStream的OnTransportData獲得packet數據:

void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) {
  if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) {
    return;
  }

  std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);

  if (transport->mediaType == AUDIO_TYPE) {
    packet->type = AUDIO_PACKET;
  } else if (transport->mediaType == VIDEO_TYPE) {
    packet->type = VIDEO_PACKET;
  }
  auto stream_ptr = shared_from_this();

  worker_->task([stream_ptr, packet]{
    if (!stream_ptr->pipeline_initialized_) {
      ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog());
      return;
    }

    char* buf = packet->data;
    RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
    RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
    if (!chead->isRtcp()) {
      uint32_t recvSSRC = head->getSSRC();
      if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
        packet->type = VIDEO_PACKET;
      } else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
        packet->type = AUDIO_PACKET;
      }
    }

    if (stream_ptr->pipeline_) {
      stream_ptr->pipeline_->read(std::move(packet));
    }
  });
}

在里面調用了pipeline的read方法:

//調用了front的read,front是一個HandlerContext對象,並且是處理鏈的頭結點
void Pipeline::read(std::shared_ptr<DataPacket> packet) {
  if (!front_) {
    return;
  }
  front_->read(std::move(packet));//InboundLink 
}
//在HandlerContext里面調用了handler_的read方法,並且把自己作為參數也同時傳遞給了handler。
  // InboundLink overrides
void read(std::shared_ptr<DataPacket> packet) override {
  auto guard = this->pipelineWeak_.lock();
  this->handler_->read(this, std::move(packet));
}
//在handler的read里面,再次調用了ctx的fireRead方法,並且把packet進行傳遞    
void LayerDetectorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
  RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
  if (!chead->isRtcp() && enabled_ && packet->type == VIDEO_PACKET) {
    if (packet->codec == "VP8") {
      parseLayerInfoFromVP8(packet);
    } else if (packet->codec == "VP9") {
      parseLayerInfoFromVP9(packet);
    } else if (packet->codec == "H264") {
      parseLayerInfoFromH264(packet);
    }
  }
  ctx->fireRead(std::move(packet));
}

在fireRead中,調用了nextIn_的read方法,nextIn_是下一個HandlerContext。

這樣就形成了一個任務鏈。

ContextRead-->HanderRead-->Context fireRead-->Context next read-->ContextRead.........

形成一個read的任務鏈。write的基本原理也是相似的。

erizo的pipeline的handler是負責實際數據處理的,通過處理鏈路,將之串聯起來

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM