1.前言
本文介紹了webrtc中視頻的基本發送流程,闡述了視頻如何從編碼,到RTP打包,到Paced控制,經過ICE發送的流程
webrtc版本:M91
2.正文
2.1整體概覽
本節介紹涉及到媒體發送設計的整體類圖層次,結構上如下:
-
PeerConnection:代表對等連接的一端,其下有Transceiver數組
-
Transceiver :在Uplan中一個Track對應一個mid,一個mid對應一個Transceiver,每個Transceiver下有一個RtpSender和RtpReceiver,用於發送、接收媒體當前mid的媒體,Transceiver屬於Uplan,和PlanB下一個mid可能有多個Track是不同的
-
RtpSender: 只是一個中間類,其下有MediaChannel、DataChannel,表征媒體收發通道
-
MediaChannel: 媒體通道,其下管理着SendStream和ReceiveStream數組,由於UPlan中一個Track對應一個主ssrc,一個主ssrc對應一個stream,數組在這里應該是為了兼容以前的PlanB; 同時還有各種Transport如SRtpTransport用於對數據做dtls加密處理, ICETransport提供網絡數據發送功能
-
SendStream: 發送流,不同的主ssrc對應不同的SendStream。其下管理着編碼器StreamEncoder,對媒體數據編碼;管理着RtpVideoSender用以對編碼數據進行Rtp打包和發送。
-
RTPVideoSender: 視頻幀的RTP發送器,主要是對編碼出來的幀信息進行解析,然后轉發給RtpSenderVideo
-
RTPSenderVideo: 將payload編碼完成后的幀進行RTP封裝,傳給PacedSender發送
-
PacedSender: 碼率控制發送器,將Rtp包按照一定的速率發送
2.2 視頻發送過程分析
視頻的實際編碼和發送過程如上圖所示:
- 媒體源將原始圖像傳遞到StreamEncoder進行編碼
- StreamEncoder將編碼出的幀給到RtpVideoSender, 讓其解析幀相關信息,更新編碼和發送碼率控制
- RtpVideoSender將編碼幀轉發給RtpSenderVideo進行Rtp封裝
- RtpSenderVideo將封裝的好的Rtp Packet投遞到PacedSebder的隊列,讓其按照設定的速率發送
- PacedSender將包發到MediaChannel下, MediaChannel將傳遞到傳輸鏈SrtpTransport, RtpTransport, DtlsTransport, IceTransport, 最后發往網絡
2.2.1 StreamEncoder編碼幀
本節概述下圖標記部分:
在class VideoSendStreamImpl
下擁有兩個重要對象: video_stream_encoder_
和rtp_video_sender_
class VideoSendStreamImpl{
...
VideoStreamEncoderInterface* const video_stream_encoder_;
RtpVideoSenderInterface* const rtp_video_sender_;
}
video_stream_encoder_
是負責視頻編碼的類,其從媒體源獲取視頻幀然后進行編碼,類為 class VideoStreamEncoder
// VideoStreamEncoder represent a video encoder that accepts raw video frames as
// input and produces an encoded bit stream.
// Usage:
// Instantiate.
// Call SetSink.
// Call SetSource.
// Call ConfigureEncoder with the codec settings.
// Call Stop() when done.
class VideoStreamEncoder : public VideoStreamEncoderInterface,
private EncodedImageCallback,
public VideoSourceRestrictionsListener {
public:
// TODO(bugs.webrtc.org/12000): Reporting of VideoBitrateAllocation is being
// deprecated. Instead VideoLayersAllocation should be reported.
enum class BitrateAllocationCallbackType {
kVideoBitrateAllocation,
kVideoBitrateAllocationWhenScreenSharing,
kVideoLayersAllocation
};
VideoStreamEncoder(Clock* clock,
uint32_t number_of_cores,
VideoStreamEncoderObserver* encoder_stats_observer,
const VideoStreamEncoderSettings& settings,
std::unique_ptr<OveruseFrameDetector> overuse_detector,
TaskQueueFactory* task_queue_factory,
BitrateAllocationCallbackType allocation_cb_type);
~VideoStreamEncoder() override;
void AddAdaptationResource(rtc::scoped_refptr<Resource> resource) override;
std::vector<rtc::scoped_refptr<Resource>> GetAdaptationResources() override;
void SetSource(rtc::VideoSourceInterface<VideoFrame>* source,
const DegradationPreference& degradation_preference) override;
void SetSink(EncoderSink* sink, bool rotation_applied) override;
// TODO(perkj): Can we remove VideoCodec.startBitrate ?
void SetStartBitrate(int start_bitrate_bps) override;
void SetFecControllerOverride(
FecControllerOverride* fec_controller_override) override;
void ConfigureEncoder(VideoEncoderConfig config,
size_t max_data_payload_length) override;
// Permanently stop encoding. After this method has returned, it is
// guaranteed that no encoded frames will be delivered to the sink.
void Stop() override;
void SendKeyFrame() override;
void OnLossNotification(
const VideoEncoder::LossNotification& loss_notification) override;
void OnBitrateUpdated(DataRate target_bitrate,
DataRate stable_target_bitrate,
DataRate target_headroom,
uint8_t fraction_lost,
int64_t round_trip_time_ms,
double cwnd_reduce_ratio) override;
DataRate UpdateTargetBitrate(DataRate target_bitrate,
double cwnd_reduce_ratio);
protected:
.....
};
類很長,在類的開頭已經很明確的說明該類的工作邏輯
2.2.1.1 VideoStreamEncoder的工作邏輯
對象實例化
-> Call SetSink() : 設置好外來的事件接收槽
-> Call SetSource(): 設置好媒體源
-> Call ConfigureEncoder():設置編碼器
-> Call Stop():結束的時候使用Stop()
2.2.1.2 SetSink()和EncoderSink
WebRTC提供了EncoderSink編碼回調的接口給其它類繼承實現,其中就有我們最關注的用於獲取編碼后的幀的接口OnEncodedImage()
class RTC_EXPORT EncodedImageCallback {
public:
virtual ~EncodedImageCallback() {}
// Callback function which is called when an image has been encoded.
virtual Result OnEncodedImage(
const EncodedImage& encoded_image,
const CodecSpecificInfo* codec_specific_info) = 0;
virtual void OnDroppedFrame(DropReason reason) {}
};
class EncoderSink : public EncodedImageCallback {
public:
virtual void OnEncoderConfigurationChanged(
std::vector<VideoStream> streams,
bool is_svc,
VideoEncoderConfig::ContentType content_type,
int min_transmit_bitrate_bps) = 0;
virtual void OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) = 0;
virtual void OnVideoLayersAllocationUpdated(
VideoLayersAllocation allocation) = 0;
};
VideoStreamEncoder的SetSink()設置的是class VideoSendStreamImpl
, 即當前對象的持有者,能夠看到其繼承了VideoStreamEncoderInterface::EncoderSink
,並override相關的方法,其中OnEncodedImage()
是我們特別關注的,一旦encoder編好了幀會調用該回調。
class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver,
public VideoStreamEncoderInterface::EncoderSink {
...
void OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) override;
void OnVideoLayersAllocationUpdated(
VideoLayersAllocation allocation) override;
// Implements EncodedImageCallback. The implementation routes encoded frames
// to the |payload_router_| and |config.pre_encode_callback| if set.
// Called on an arbitrary encoder callback thread.
EncodedImageCallback::Result OnEncodedImage(
const EncodedImage& encoded_image,
const CodecSpecificInfo* codec_specific_info) override;
// Implements EncodedImageCallback.
void OnDroppedFrame(EncodedImageCallback::DropReason reason) override;
};
2.2.1.3 SetSource()
WebRTC中使用Source和sink的方式來管理媒體,source代表媒體源,是我們在創建PeerConnection后addTrack()時添加的,sink代表媒體訂閱槽,給要獲取媒體的類實現后,通過source實現的class VideoSourceInterface
可被添加/移除訂閱列表
template <typename VideoFrameT>
class VideoSourceInterface {
public:
virtual ~VideoSourceInterface() = default;
virtual void AddOrUpdateSink(VideoSinkInterface<VideoFrameT>* sink,
const VideoSinkWants& wants) = 0;
// RemoveSink must guarantee that at the time the method returns,
// there is no current and no future calls to VideoSinkInterface::OnFrame.
virtual void RemoveSink(VideoSinkInterface<VideoFrameT>* sink) = 0;
};
此處的VideoStreamEncoder是直接繼承了VideoStreamEncoderInterface
, 從而間接繼承了sink
class VideoStreamEncoderInterface : public rtc::VideoSinkInterface<VideoFrame> {
....
};
能夠在class VideoStreamEncoder
看到其override相應函數
class VideoStreamEncoder{
...
void OnFrame(const VideoFrame& video_frame) override;
void OnDiscardedFrame() override;
}
VideoStreamEncoder::SetSource()在實現上繞了一圈,使用了一個代理對象VideoSourceSinkController video_source_sink_controller_
去進行SetSource()
void VideoStreamEncoder::SetSource(
rtc::VideoSourceInterface<VideoFrame>* source,
const DegradationPreference& degradation_preference) {
RTC_DCHECK_RUN_ON(main_queue_);
video_source_sink_controller_.SetSource(source);
input_state_provider_.OnHasInputChanged(source);
// This may trigger reconfiguring the QualityScaler on the encoder queue.
encoder_queue_.PostTask([this, degradation_preference] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
degradation_preference_manager_->SetDegradationPreference(
degradation_preference);
stream_resource_manager_.SetDegradationPreferences(degradation_preference);
if (encoder_) {
stream_resource_manager_.ConfigureQualityScaler(
encoder_->GetEncoderInfo());
}
});
}
在代理對象中可以看見source添加了訂閱槽,sink_就是VideoStreamEncoder
void VideoSourceSinkController::SetSource(
rtc::VideoSourceInterface<VideoFrame>* source) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
rtc::VideoSourceInterface<VideoFrame>* old_source = source_;
source_ = source;
if (old_source != source && old_source)
old_source->RemoveSink(sink_);
if (!source)
return;
// source添加訂閱槽
source->AddOrUpdateSink(sink_, CurrentSettingsToSinkWants());
}
2.2.1.4 編碼幀
當source有媒體出現的時候會調用 VideoStreamEncoder::OnFrame()
void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {
RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_);
VideoFrame incoming_frame = video_frame;
// Local time in webrtc time base.
Timestamp now = clock_->CurrentTime();
// In some cases, e.g., when the frame from decoder is fed to encoder,
// the timestamp may be set to the future. As the encoding pipeline assumes
// capture time to be less than present time, we should reset the capture
// timestamps here. Otherwise there may be issues with RTP send stream.
if (incoming_frame.timestamp_us() > now.us())
incoming_frame.set_timestamp_us(now.us());
// Capture time may come from clock with an offset and drift from clock_.
int64_t capture_ntp_time_ms;
if (video_frame.ntp_time_ms() > 0) {
capture_ntp_time_ms = video_frame.ntp_time_ms();
} else if (video_frame.render_time_ms() != 0) {
capture_ntp_time_ms = video_frame.render_time_ms() + delta_ntp_internal_ms_;
} else {
capture_ntp_time_ms = now.ms() + delta_ntp_internal_ms_;
}
incoming_frame.set_ntp_time_ms(capture_ntp_time_ms);
// Convert NTP time, in ms, to RTP timestamp.
const int kMsToRtpTimestamp = 90;
incoming_frame.set_timestamp(
kMsToRtpTimestamp * static_cast<uint32_t>(incoming_frame.ntp_time_ms()));
if (incoming_frame.ntp_time_ms() <= last_captured_timestamp_) {
// We don't allow the same capture time for two frames, drop this one.
RTC_LOG(LS_WARNING) << "Same/old NTP timestamp ("
<< incoming_frame.ntp_time_ms()
<< " <= " << last_captured_timestamp_
<< ") for incoming frame. Dropping.";
encoder_queue_.PostTask([this, incoming_frame]() {
RTC_DCHECK_RUN_ON(&encoder_queue_);
accumulated_update_rect_.Union(incoming_frame.update_rect());
accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
});
return;
}
bool log_stats = false;
if (now.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) {
last_frame_log_ms_ = now.ms();
log_stats = true;
}
last_captured_timestamp_ = incoming_frame.ntp_time_ms();
int64_t post_time_us = clock_->CurrentTime().us();
++posted_frames_waiting_for_encode_;
encoder_queue_.PostTask(
[this, incoming_frame, post_time_us, log_stats]() {
RTC_DCHECK_RUN_ON(&encoder_queue_);
encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(),
incoming_frame.height());
++captured_frame_count_;
const int posted_frames_waiting_for_encode =
posted_frames_waiting_for_encode_.fetch_sub(1);
RTC_DCHECK_GT(posted_frames_waiting_for_encode, 0);
CheckForAnimatedContent(incoming_frame, post_time_us);
bool cwnd_frame_drop =
cwnd_frame_drop_interval_ &&
(cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0);
if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {
MaybeEncodeVideoFrame(incoming_frame, post_time_us);
} else {
if (cwnd_frame_drop) {
// Frame drop by congestion window pusback. Do not encode this
// frame.
++dropped_frame_cwnd_pushback_count_;
encoder_stats_observer_->OnFrameDropped(
VideoStreamEncoderObserver::DropReason::kCongestionWindow);
} else {
// There is a newer frame in flight. Do not encode this frame.
RTC_LOG(LS_VERBOSE)
<< "Incoming frame dropped due to that the encoder is blocked.";
++dropped_frame_encoder_block_count_;
encoder_stats_observer_->OnFrameDropped(
VideoStreamEncoderObserver::DropReason::kEncoderQueue);
}
accumulated_update_rect_.Union(incoming_frame.update_rect());
accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
}
if (log_stats) {
RTC_LOG(LS_INFO) << "Number of frames: captured "
<< captured_frame_count_
<< ", dropped (due to congestion window pushback) "
<< dropped_frame_cwnd_pushback_count_
<< ", dropped (due to encoder blocked) "
<< dropped_frame_encoder_block_count_
<< ", interval_ms " << kFrameLogIntervalMs;
captured_frame_count_ = 0;
dropped_frame_cwnd_pushback_count_ = 0;
dropped_frame_encoder_block_count_ = 0;
}
});
}
OnFrame()
主要:
- 記錄了該幀的NTP時間
- 向任務線程post了一個任務去調用
MaybeEncodeVideoFrame()
處理該幀
void VideoStreamEncoder::MaybeEncodeVideoFrame(const VideoFrame& video_frame,
int64_t time_when_posted_us) {
RTC_DCHECK_RUN_ON(&encoder_queue_);
input_state_provider_.OnFrameSizeObserved(video_frame.size());
if (!last_frame_info_ || video_frame.width() != last_frame_info_->width ||
video_frame.height() != last_frame_info_->height ||
video_frame.is_texture() != last_frame_info_->is_texture) {
pending_encoder_reconfiguration_ = true;
last_frame_info_ = VideoFrameInfo(video_frame.width(), video_frame.height(),
video_frame.is_texture());
RTC_LOG(LS_INFO) << "Video frame parameters changed: dimensions="
<< last_frame_info_->width << "x"
<< last_frame_info_->height
<< ", texture=" << last_frame_info_->is_texture << ".";
// Force full frame update, since resolution has changed.
accumulated_update_rect_ =
VideoFrame::UpdateRect{0, 0, video_frame.width(), video_frame.height()};
}
// We have to create then encoder before the frame drop logic,
// because the latter depends on encoder_->GetScalingSettings.
// According to the testcase
// InitialFrameDropOffWhenEncoderDisabledScaling, the return value
// from GetScalingSettings should enable or disable the frame drop.
// Update input frame rate before we start using it. If we update it after
// any potential frame drop we are going to artificially increase frame sizes.
// Poll the rate before updating, otherwise we risk the rate being estimated
// a little too high at the start of the call when then window is small.
uint32_t framerate_fps = GetInputFramerateFps();
input_framerate_.Update(1u, clock_->TimeInMilliseconds());
int64_t now_ms = clock_->TimeInMilliseconds();
// 對編碼器進行reconfig
if (pending_encoder_reconfiguration_) {
ReconfigureEncoder();
last_parameters_update_ms_.emplace(now_ms);
} else if (!last_parameters_update_ms_ ||
now_ms - *last_parameters_update_ms_ >=
kParameterUpdateIntervalMs) {
if (last_encoder_rate_settings_) {
// Clone rate settings before update, so that SetEncoderRates() will
// actually detect the change between the input and
// |last_encoder_rate_setings_|, triggering the call to SetRate() on the
// encoder.
EncoderRateSettings new_rate_settings = *last_encoder_rate_settings_;
new_rate_settings.rate_control.framerate_fps =
static_cast<double>(framerate_fps);
SetEncoderRates(UpdateBitrateAllocation(new_rate_settings));
}
last_parameters_update_ms_.emplace(now_ms);
}
// Because pending frame will be dropped in any case, we need to
// remember its updated region.
if (pending_frame_) {
encoder_stats_observer_->OnFrameDropped(
VideoStreamEncoderObserver::DropReason::kEncoderQueue);
accumulated_update_rect_.Union(pending_frame_->update_rect());
accumulated_update_rect_is_valid_ &= pending_frame_->has_update_rect();
}
if (DropDueToSize(video_frame.size())) {
RTC_LOG(LS_INFO) << "Dropping frame. Too large for target bitrate.";
stream_resource_manager_.OnFrameDroppedDueToSize();
// Storing references to a native buffer risks blocking frame capture.
if (video_frame.video_frame_buffer()->type() !=
VideoFrameBuffer::Type::kNative) {
pending_frame_ = video_frame;
pending_frame_post_time_us_ = time_when_posted_us;
} else {
// Ensure that any previously stored frame is dropped.
pending_frame_.reset();
accumulated_update_rect_.Union(video_frame.update_rect());
accumulated_update_rect_is_valid_ &= video_frame.has_update_rect();
}
return;
}
stream_resource_manager_.OnMaybeEncodeFrame();
if (EncoderPaused()) {
// Storing references to a native buffer risks blocking frame capture.
if (video_frame.video_frame_buffer()->type() !=
VideoFrameBuffer::Type::kNative) {
if (pending_frame_)
TraceFrameDropStart();
pending_frame_ = video_frame;
pending_frame_post_time_us_ = time_when_posted_us;
} else {
// Ensure that any previously stored frame is dropped.
pending_frame_.reset();
TraceFrameDropStart();
accumulated_update_rect_.Union(video_frame.update_rect());
accumulated_update_rect_is_valid_ &= video_frame.has_update_rect();
}
return;
}
pending_frame_.reset();
frame_dropper_.Leak(framerate_fps);
// Frame dropping is enabled iff frame dropping is not force-disabled, and
// rate controller is not trusted.
const bool frame_dropping_enabled =
!force_disable_frame_dropper_ &&
!encoder_info_.has_trusted_rate_controller;
frame_dropper_.Enable(frame_dropping_enabled);
if (frame_dropping_enabled && frame_dropper_.DropFrame()) {
RTC_LOG(LS_VERBOSE)
<< "Drop Frame: "
"target bitrate "
<< (last_encoder_rate_settings_
? last_encoder_rate_settings_->encoder_target.bps()
: 0)
<< ", input frame rate " << framerate_fps;
OnDroppedFrame(
EncodedImageCallback::DropReason::kDroppedByMediaOptimizations);
accumulated_update_rect_.Union(video_frame.update_rect());
accumulated_update_rect_is_valid_ &= video_frame.has_update_rect();
return;
}
EncodeVideoFrame(video_frame, time_when_posted_us);
}
MaybeEncodeVideoFrame()
主要:
-
檢查是否執行了
ConfigureEncoder()
,如果是則在編碼前重設編碼器 -
如果開啟了丟幀,則檢查幀的大小和幀率去判斷是否丟幀,如果不丟幀則執行
EncodeVideoFrame()
去對幀進行編碼
void VideoStreamEncoder::EncodeVideoFrame(const VideoFrame& video_frame,
int64_t time_when_posted_us) {
RTC_DCHECK_RUN_ON(&encoder_queue_);
// If the encoder fail we can't continue to encode frames. When this happens
// the WebrtcVideoSender is notified and the whole VideoSendStream is
// recreated.
if (encoder_failed_)
return;
TraceFrameDropEnd();
// Encoder metadata needs to be updated before encode complete callback.
VideoEncoder::EncoderInfo info = encoder_->GetEncoderInfo();
if (info.implementation_name != encoder_info_.implementation_name) {
encoder_stats_observer_->OnEncoderImplementationChanged(
info.implementation_name);
if (bitrate_adjuster_) {
// Encoder implementation changed, reset overshoot detector states.
bitrate_adjuster_->Reset();
}
}
// 檢查編碼器是否變化,通知上層
if (encoder_info_ != info) {
OnEncoderSettingsChanged();
stream_resource_manager_.ConfigureEncodeUsageResource();
RTC_LOG(LS_INFO) << "Encoder settings changed from "
<< encoder_info_.ToString() << " to " << info.ToString();
}
// 根據變更的幀率,重新設定碼率調節器
if (bitrate_adjuster_) {
for (size_t si = 0; si < kMaxSpatialLayers; ++si) {
if (info.fps_allocation[si] != encoder_info_.fps_allocation[si]) {
bitrate_adjuster_->OnEncoderInfo(info);
break;
}
}
}
encoder_info_ = info;
last_encode_info_ms_ = clock_->TimeInMilliseconds();
// 對原始幀進行420轉碼
VideoFrame out_frame(video_frame);
if (out_frame.video_frame_buffer()->type() ==
VideoFrameBuffer::Type::kNative &&
!info.supports_native_handle) {
// This module only supports software encoding.
rtc::scoped_refptr<VideoFrameBuffer> buffer =
out_frame.video_frame_buffer()->GetMappedFrameBuffer(
info.preferred_pixel_formats);
bool buffer_was_converted = false;
if (!buffer) {
// 上面的preferred format失敗了,強制420轉碼
buffer = out_frame.video_frame_buffer()->ToI420();
// TODO(https://crbug.com/webrtc/12021): Once GetI420 is pure virtual,
// this just true as an I420 buffer would return from
// GetMappedFrameBuffer.
buffer_was_converted =
(out_frame.video_frame_buffer()->GetI420() == nullptr);
}
if (!buffer) {
RTC_LOG(LS_ERROR) << "Frame conversion failed, dropping frame.";
return;
}
VideoFrame::UpdateRect update_rect = out_frame.update_rect();
if (!update_rect.IsEmpty() &&
out_frame.video_frame_buffer()->GetI420() == nullptr) {
// UpdatedRect is reset to full update if it's not empty, and buffer was
// converted, therefore we can't guarantee that pixels outside of
// UpdateRect didn't change comparing to the previous frame.
update_rect =
VideoFrame::UpdateRect{0, 0, out_frame.width(), out_frame.height()};
}
out_frame.set_video_frame_buffer(buffer);
out_frame.set_update_rect(update_rect);
}
// 對幀按照預設的crop_width_和crop_height_進行剪裁
// Crop frame if needed.
if ((crop_width_ > 0 || crop_height_ > 0) &&
out_frame.video_frame_buffer()->type() !=
VideoFrameBuffer::Type::kNative) {
// If the frame can't be converted to I420, drop it.
int cropped_width = video_frame.width() - crop_width_;
int cropped_height = video_frame.height() - crop_height_;
rtc::scoped_refptr<VideoFrameBuffer> cropped_buffer;
// TODO(ilnik): Remove scaling if cropping is too big, as it should never
// happen after SinkWants signaled correctly from ReconfigureEncoder.
VideoFrame::UpdateRect update_rect = video_frame.update_rect();
if (crop_width_ < 4 && crop_height_ < 4) {
cropped_buffer = video_frame.video_frame_buffer()->CropAndScale(
crop_width_ / 2, crop_height_ / 2, cropped_width, cropped_height,
cropped_width, cropped_height);
update_rect.offset_x -= crop_width_ / 2;
update_rect.offset_y -= crop_height_ / 2;
update_rect.Intersect(
VideoFrame::UpdateRect{0, 0, cropped_width, cropped_height});
} else {
cropped_buffer = video_frame.video_frame_buffer()->Scale(cropped_width,
cropped_height);
if (!update_rect.IsEmpty()) {
// Since we can't reason about pixels after scaling, we invalidate whole
// picture, if anything changed.
update_rect =
VideoFrame::UpdateRect{0, 0, cropped_width, cropped_height};
}
}
if (!cropped_buffer) {
RTC_LOG(LS_ERROR) << "Cropping and scaling frame failed, dropping frame.";
return;
}
out_frame.set_video_frame_buffer(cropped_buffer);
out_frame.set_update_rect(update_rect);
out_frame.set_ntp_time_ms(video_frame.ntp_time_ms());
// Since accumulated_update_rect_ is constructed before cropping,
// we can't trust it. If any changes were pending, we invalidate whole
// frame here.
if (!accumulated_update_rect_.IsEmpty()) {
accumulated_update_rect_ =
VideoFrame::UpdateRect{0, 0, out_frame.width(), out_frame.height()};
accumulated_update_rect_is_valid_ = false;
}
}
if (!accumulated_update_rect_is_valid_) {
out_frame.clear_update_rect();
} else if (!accumulated_update_rect_.IsEmpty() &&
out_frame.has_update_rect()) {
accumulated_update_rect_.Union(out_frame.update_rect());
accumulated_update_rect_.Intersect(
VideoFrame::UpdateRect{0, 0, out_frame.width(), out_frame.height()});
out_frame.set_update_rect(accumulated_update_rect_);
accumulated_update_rect_.MakeEmptyUpdate();
}
accumulated_update_rect_is_valid_ = true;
TRACE_EVENT_ASYNC_STEP0("webrtc", "Video", video_frame.render_time_ms(),
"Encode");
stream_resource_manager_.OnEncodeStarted(out_frame, time_when_posted_us);
RTC_DCHECK_LE(send_codec_.width, out_frame.width());
RTC_DCHECK_LE(send_codec_.height, out_frame.height());
// Native frames should be scaled by the client.
// For internal encoders we scale everything in one place here.
RTC_DCHECK((out_frame.video_frame_buffer()->type() ==
VideoFrameBuffer::Type::kNative) ||
(send_codec_.width == out_frame.width() &&
send_codec_.height == out_frame.height()));
TRACE_EVENT1("webrtc", "VCMGenericEncoder::Encode", "timestamp",
out_frame.timestamp());
frame_encode_metadata_writer_.OnEncodeStarted(out_frame);
// 將幀送入編碼隊列
const int32_t encode_status = encoder_->Encode(out_frame, &next_frame_types_);
was_encode_called_since_last_initialization_ = true;
if (encode_status < 0) {
if (encode_status == WEBRTC_VIDEO_CODEC_ENCODER_FAILURE) {
RTC_LOG(LS_ERROR) << "Encoder failed, failing encoder format: "
<< encoder_config_.video_format.ToString();
if (settings_.encoder_switch_request_callback) {
if (encoder_selector_) {
if (auto encoder = encoder_selector_->OnEncoderBroken()) {
QueueRequestEncoderSwitch(*encoder);
}
} else {
encoder_failed_ = true;
main_queue_->PostTask(ToQueuedTask(task_safety_, [this]() {
RTC_DCHECK_RUN_ON(main_queue_);
settings_.encoder_switch_request_callback->RequestEncoderFallback();
}));
}
} else {
RTC_LOG(LS_ERROR)
<< "Encoder failed but no encoder fallback callback is registered";
}
} else {
RTC_LOG(LS_ERROR) << "Failed to encode frame. Error code: "
<< encode_status;
}
return;
}
for (auto& it : next_frame_types_) {
it = VideoFrameType::kVideoFrameDelta;
}
}
EncodeVideoFrame()
主要:
- 檢查編碼器是否變化,通知observer
- 根據變更的幀率,重新設定碼率調節器
- 對原始幀進行420轉碼
- 對視頻幀按照預設的crop_width_和crop_height_進行剪裁
- 將幀送入編碼隊列
由於篇幅原因,此處不再深入具體的編碼過程,直接過度到出幀
2.2.1.5 出幀
由於ReConfigureEncoder()
中將當前對象注冊進去編碼器中
encoder_->RegisterEncodeCompleteCallback(this);
在編碼完成后會執行回調VideoStreamEncoder::OnEncodedImage()
,
EncodedImageCallback::Result VideoStreamEncoder::OnEncodedImage(
const EncodedImage& encoded_image,
const CodecSpecificInfo* codec_specific_info) {
TRACE_EVENT_INSTANT1("webrtc", "VCMEncodedFrameCallback::Encoded",
"timestamp", encoded_image.Timestamp());
// 解析Image的Info: ExperimentId,simulcast
const size_t spatial_idx = encoded_image.SpatialIndex().value_or(0);
EncodedImage image_copy(encoded_image);
frame_encode_metadata_writer_.FillTimingInfo(spatial_idx, &image_copy);
frame_encode_metadata_writer_.UpdateBitstream(codec_specific_info,
&image_copy);
// Piggyback ALR experiment group id and simulcast id into the content type.
const uint8_t experiment_id =
experiment_groups_[videocontenttypehelpers::IsScreenshare(
image_copy.content_type_)];
// TODO(ilnik): This will force content type extension to be present even
// for realtime video. At the expense of miniscule overhead we will get
// sliced receive statistics.
RTC_CHECK(videocontenttypehelpers::SetExperimentId(&image_copy.content_type_,
experiment_id));
// We count simulcast streams from 1 on the wire. That's why we set simulcast
// id in content type to +1 of that is actual simulcast index. This is because
// value 0 on the wire is reserved for 'no simulcast stream specified'.
RTC_CHECK(videocontenttypehelpers::SetSimulcastId(
&image_copy.content_type_, static_cast<uint8_t>(spatial_idx + 1)));
// vp9使用了內部的qp scaler,需要顯式的傳一些指標過去
// Currently internal quality scaler is used for VP9 instead of webrtc qp
// scaler (in no-svc case or if only a single spatial layer is encoded).
// It has to be explicitly detected and reported to adaptation metrics.
// Post a task because |send_codec_| requires |encoder_queue_| lock.
unsigned int image_width = image_copy._encodedWidth;
unsigned int image_height = image_copy._encodedHeight;
VideoCodecType codec = codec_specific_info
? codec_specific_info->codecType
: VideoCodecType::kVideoCodecGeneric;
encoder_queue_.PostTask([this, codec, image_width, image_height] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
if (codec == VideoCodecType::kVideoCodecVP9 &&
send_codec_.VP9()->automaticResizeOn) {
unsigned int expected_width = send_codec_.width;
unsigned int expected_height = send_codec_.height;
int num_active_layers = 0;
for (int i = 0; i < send_codec_.VP9()->numberOfSpatialLayers; ++i) {
if (send_codec_.spatialLayers[i].active) {
++num_active_layers;
expected_width = send_codec_.spatialLayers[i].width;
expected_height = send_codec_.spatialLayers[i].height;
}
}
RTC_DCHECK_LE(num_active_layers, 1)
<< "VP9 quality scaling is enabled for "
"SVC with several active layers.";
// 告知編碼器降分辨率是否已經完成
encoder_stats_observer_->OnEncoderInternalScalerUpdate(
image_width < expected_width || image_height < expected_height);
}
});
// Encoded is called on whatever thread the real encoder implementation run
// on. In the case of hardware encoders, there might be several encoders
// running in parallel on different threads.
encoder_stats_observer_->OnSendEncodedImage(image_copy, codec_specific_info);
// bug: simulcast_id用了image.SpatialIndex()的位置,對於提供spatial的編碼器就無法
// 獲取spatial layer信息了,
// The simulcast id is signaled in the SpatialIndex. This makes it impossible
// to do simulcast for codecs that actually support spatial layers since we
// can't distinguish between an actual spatial layer and a simulcast stream.
// TODO(bugs.webrtc.org/10520): Signal the simulcast id explicitly.
int simulcast_id = 0;
if (codec_specific_info &&
(codec_specific_info->codecType == kVideoCodecVP8 ||
codec_specific_info->codecType == kVideoCodecH264 ||
codec_specific_info->codecType == kVideoCodecGeneric)) {
simulcast_id = encoded_image.SpatialIndex().value_or(0);
}
// 將幀傳給VideoSendStreamImpl
EncodedImageCallback::Result result =
sink_->OnEncodedImage(image_copy, codec_specific_info);
// We are only interested in propagating the meta-data about the image, not
// encoded data itself, to the post encode function. Since we cannot be sure
// the pointer will still be valid when run on the task queue, set it to null.
DataSize frame_size = DataSize::Bytes(image_copy.size());
image_copy.ClearEncodedData();
int temporal_index = 0;
if (codec_specific_info) {
if (codec_specific_info->codecType == kVideoCodecVP9) {
temporal_index = codec_specific_info->codecSpecific.VP9.temporal_idx;
} else if (codec_specific_info->codecType == kVideoCodecVP8) {
temporal_index = codec_specific_info->codecSpecific.VP8.temporalIdx;
}
}
if (temporal_index == kNoTemporalIdx) {
temporal_index = 0;
}
// 使用該幀去更新碼率調節器,媒體源調節器等
RunPostEncode(image_copy, clock_->CurrentTime().us(), temporal_index,
frame_size);
if (result.error == Result::OK) {
// In case of an internal encoder running on a separate thread, the
// decision to drop a frame might be a frame late and signaled via
// atomic flag. This is because we can't easily wait for the worker thread
// without risking deadlocks, eg during shutdown when the worker thread
// might be waiting for the internal encoder threads to stop.
if (pending_frame_drops_.load() > 0) {
int pending_drops = pending_frame_drops_.fetch_sub(1);
RTC_DCHECK_GT(pending_drops, 0);
result.drop_next_frame = true;
}
}
return result;
}
VideoStreamEncoder::OnEncodedImage()
主要:
- 解析Image的Info,比如ExperimentId, simulcast
- vp9使用了內部的qp scaler, 顯式告知編碼器降分辨率是否已經完成
- 將幀傳給sink(也就是VideoSendStreamImpl)
- 調用RunPostEncode使用該幀去更新碼率調節器,媒體源調節器等
其中還提到一個bug: 由於simulcast_id用了image.SpatialIndex()的位置,對於支持spatial的編碼器image就無法獲取spatial layer信息了
2.2.2 VideoSendStreamImpl處理幀
在2.2.1.5中所述,編碼器出幀之后,會將幀傳給VideoSendStreamImpl
, 其會做一些碼率分配的檢查后才將幀轉發出去,本節描述的部分如下圖所示:
2.2.2.1 收幀和轉發
編碼器的出幀回調,會調用VideoSendStreamImpl::OnEncodedImage()
EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage(
const EncodedImage& encoded_image,
const CodecSpecificInfo* codec_specific_info) {
// Encoded is called on whatever thread the real encoder implementation run
// on. In the case of hardware encoders, there might be several encoders
// running in parallel on different threads.
// Indicate that there still is activity going on.
activity_ = true;
// enable padding
auto enable_padding_task = [this]() {
if (disable_padding_) {
RTC_DCHECK_RUN_ON(worker_queue_);
disable_padding_ = false;
// To ensure that padding bitrate is propagated to the bitrate allocator.
SignalEncoderActive();
}
};
if (!worker_queue_->IsCurrent()) {
worker_queue_->PostTask(enable_padding_task);
} else {
enable_padding_task();
}
// 將image發送給RtpVideoSender
EncodedImageCallback::Result result(EncodedImageCallback::Result::OK);
result =
rtp_video_sender_->OnEncodedImage(encoded_image, codec_specific_info);
// Check if there's a throttled VideoBitrateAllocation that we should try
// sending.
rtc::WeakPtr<VideoSendStreamImpl> send_stream = weak_ptr_;
auto update_task = [send_stream]() {
if (send_stream) {
RTC_DCHECK_RUN_ON(send_stream->worker_queue_);
auto& context = send_stream->video_bitrate_allocation_context_;
if (context && context->throttled_allocation) {
//告知相關觀察者,分配碼率的變化
send_stream->OnBitrateAllocationUpdated(*context->throttled_allocation);
}
}
};
if (!worker_queue_->IsCurrent()) {
worker_queue_->PostTask(update_task);
} else {
update_task();
}
return result;
}
VideoSendStreamImpl::OnEncodedImage()
主要:
- 啟用enable_padding,通知編碼器中的碼率分配器,讓其做碼率分配的時候把padding也考慮上
- 將編好的幀和相關信息轉到RtpVideoSender處理
- 檢查碼率分配是否已經改變,通知下層
其中關於第三點的處理分配碼率更新的函數OnBitrateAllocationUpdated()
,存在一些細節
void VideoSendStreamImpl::OnBitrateAllocationUpdated(
const VideoBitrateAllocation& allocation) {
if (!worker_queue_->IsCurrent()) {
auto ptr = weak_ptr_;
worker_queue_->PostTask([=] {
if (!ptr.get())
return;
ptr->OnBitrateAllocationUpdated(allocation);
});
return;
}
RTC_DCHECK_RUN_ON(worker_queue_);
int64_t now_ms = clock_->TimeInMilliseconds();
if (encoder_target_rate_bps_ != 0) {
if (video_bitrate_allocation_context_) {
// If new allocation is within kMaxVbaSizeDifferencePercent larger than
// the previously sent allocation and the same streams are still enabled,
// it is considered "similar". We do not want send similar allocations
// more once per kMaxVbaThrottleTimeMs.
// 如果allocation處於previously allocation + kMaxVbaSizeDifferencePercent 區間
// 則被當作similar,認為一個kMaxVbaThrottleTimeMs不更新
const VideoBitrateAllocation& last =
video_bitrate_allocation_context_->last_sent_allocation;
const bool is_similar =
allocation.get_sum_bps() >= last.get_sum_bps() &&
allocation.get_sum_bps() <
(last.get_sum_bps() * (100 + kMaxVbaSizeDifferencePercent)) /
100 &&
SameStreamsEnabled(allocation, last);
if (is_similar &&
(now_ms - video_bitrate_allocation_context_->last_send_time_ms) <
kMaxVbaThrottleTimeMs) {
// This allocation is too similar, cache it and return.
video_bitrate_allocation_context_->throttled_allocation = allocation;
return;
}
} else {
video_bitrate_allocation_context_.emplace();
}
video_bitrate_allocation_context_->last_sent_allocation = allocation;
video_bitrate_allocation_context_->throttled_allocation.reset();
video_bitrate_allocation_context_->last_send_time_ms = now_ms;
// Send bitrate allocation metadata only if encoder is not paused.
// 告知給下層的觀察者
rtp_video_sender_->OnBitrateAllocationUpdated(allocation);
}
}
struct VbaSendContext {
// 上一次分配的碼率
VideoBitrateAllocation last_sent_allocation;
// 對頻繁更新進行節流的分配碼率
absl::optional<VideoBitrateAllocation> throttled_allocation;
// 上次的發送時間
int64_t last_send_time_ms;
};
上述出現的video_bitrate_allocation_context_的的類型是struct VbaSendContext
,這個對象是用來緩存分配碼率的更改的context,下層對碼流分配的變化進行注冊,但如果每次細微的改變是沒有必要引起下層觀察者的處理的,所以引入了一個throttled_allocation
,當判斷一定時間內碼率分配沒有發生大的變化,則將這個變更存起來``throttled_allocation`,等到一定事件后再把這個值重新傳入這個函數看看是否要通知下層的觀察者;
這里再進一步細說一下: VideoSendStreamImpl::OnBitrateAllocationUpdated()
這個函數有兩個地方會調用,一個是在編碼器設置編碼碼率的時候通知VideoSendStreamImpl
,而另一個則是此處的收幀函數OnEncodedImage()
,這里只是將緩存的舊值(throttled_allocation)傳入,
2.2.3 RtpVideoSender處理幀
RtpVideoSender接收該幀的函數是RtpVideoSender::OnEncodedImage()
EncodedImageCallback::Result RtpVideoSender::OnEncodedImage(
const EncodedImage& encoded_image,
const CodecSpecificInfo* codec_specific_info) {
// fec_controller在分配的網絡容量下,計算多少用於編碼
// 多少用於fec和nack
// 根據image的size和type更新fec_controller
fec_controller_->UpdateWithEncodedData(encoded_image.size(),
encoded_image._frameType);
MutexLock lock(&mutex_);
RTC_DCHECK(!rtp_streams_.empty());
if (!active_)
return Result(Result::ERROR_SEND_FAILED);
shared_frame_id_++;
size_t stream_index = 0;
if (codec_specific_info &&
(codec_specific_info->codecType == kVideoCodecVP8 ||
codec_specific_info->codecType == kVideoCodecH264 ||
codec_specific_info->codecType == kVideoCodecGeneric)) {
// Map spatial index to simulcast.
// webrtc內使用spatial_index 作為simulcast id
stream_index = encoded_image.SpatialIndex().value_or(0);
}
RTC_DCHECK_LT(stream_index, rtp_streams_.size());
uint32_t rtp_timestamp =
encoded_image.Timestamp() +
rtp_streams_[stream_index].rtp_rtcp->StartTimestamp();
// RTCPSender has it's own copy of the timestamp offset, added in
// RTCPSender::BuildSR, hence we must not add the in the offset for this call.
// TODO(nisse): Delete RTCPSender:timestamp_offset_, and see if we can confine
// knowledge of the offset to a single place.
// 檢測是否要給該stream發送rtcp report
if (!rtp_streams_[stream_index].rtp_rtcp->OnSendingRtpFrame(
encoded_image.Timestamp(), encoded_image.capture_time_ms_,
rtp_config_.payload_type,
encoded_image._frameType == VideoFrameType::kVideoFrameKey)) {
// The payload router could be active but this module isn't sending.
return Result(Result::ERROR_SEND_FAILED);
}
absl::optional<int64_t> expected_retransmission_time_ms;
if (encoded_image.RetransmissionAllowed()) {
expected_retransmission_time_ms =
rtp_streams_[stream_index].rtp_rtcp->ExpectedRetransmissionTimeMs();
}
// 解析編碼幀攜帶有幀依賴信息,可在后續rtp擴展頭中使用
if (IsFirstFrameOfACodedVideoSequence(encoded_image, codec_specific_info)) {
// If encoder adapter produce FrameDependencyStructure, pass it so that
// dependency descriptor rtp header extension can be used.
// If not supported, disable using dependency descriptor by passing nullptr.
rtp_streams_[stream_index].sender_video->SetVideoStructure(
(codec_specific_info && codec_specific_info->template_structure)
? &*codec_specific_info->template_structure
: nullptr);
}
// 發送視頻幀
bool send_result = rtp_streams_[stream_index].sender_video->SendEncodedImage(
rtp_config_.payload_type, codec_type_, rtp_timestamp, encoded_image,
params_[stream_index].GetRtpVideoHeader(
encoded_image, codec_specific_info, shared_frame_id_),
expected_retransmission_time_ms);
if (frame_count_observer_) {
FrameCounts& counts = frame_counts_[stream_index];
if (encoded_image._frameType == VideoFrameType::kVideoFrameKey) {
++counts.key_frames;
} else if (encoded_image._frameType == VideoFrameType::kVideoFrameDelta) {
++counts.delta_frames;
} else {
RTC_DCHECK(encoded_image._frameType == VideoFrameType::kEmptyFrame);
}
frame_count_observer_->FrameCountUpdated(counts,
rtp_config_.ssrcs[stream_index]);
}
if (!send_result)
return Result(Result::ERROR_SEND_FAILED);
return Result(Result::OK, rtp_timestamp);
}
RtpVideoSender::OnEncodedImage()
主要:
- 根據image的size和type更新fec_controller, 讓其更新對fec和nack編碼包的碼率分配
- 檢測是否要給該stream發送rtcp report
- 解析編碼幀攜帶有幀依賴信息,可放入后續rtp擴展頭中
- 解析video_header,將視頻幀和header轉發給
RtpSenderVideo
! 注意rtp_streams_
這個變量,其是RtpVideoSender
下的一個類型為RtpStreamSender
的數組,其以simulcast index去標識每一個simulcast stream;
RtpStreamSender
有三個成員變量:rtp_rtcp(rtp\rtcp 打包、接收、發送), sender_video(pacer 發送), fec_generator(fec),
struct RtpStreamSender {
RtpStreamSender(std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp,
std::unique_ptr<RTPSenderVideo> sender_video,
std::unique_ptr<VideoFecGenerator> fec_generator);
~RtpStreamSender();
RtpStreamSender(RtpStreamSender&&) = default;
RtpStreamSender& operator=(RtpStreamSender&&) = default;
// Note: Needs pointer stability.
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp;
std::unique_ptr<RTPSenderVideo> sender_video;
std::unique_ptr<VideoFecGenerator> fec_generator;
};
class RtpVideoSender{
...
const std::vector<webrtc_internal_rtp_video_sender::RtpStreamSender>
rtp_streams_;
};
2.2.4 RtpSenderVideo對數據Rtp封裝
RtpSenderVideo主要是根據video_header生成rtp packet, 然后將rtp packet轉發給pacer進行發送,
對應的過程是如下圖所框
視頻幀來到RtpSenderVideo
之后通過處理:
bool RTPSenderVideo::SendEncodedImage(
int payload_type,
absl::optional<VideoCodecType> codec_type,
uint32_t rtp_timestamp,
const EncodedImage& encoded_image,
RTPVideoHeader video_header,
absl::optional<int64_t> expected_retransmission_time_ms) {
// 如果注入了幀變換器,則對幀進行變換,完成后會被異步發送
if (frame_transformer_delegate_) {
// The frame will be sent async once transformed.
return frame_transformer_delegate_->TransformFrame(
payload_type, codec_type, rtp_timestamp, encoded_image, video_header,
expected_retransmission_time_ms);
}
// 轉發image
return SendVideo(payload_type, codec_type, rtp_timestamp,
encoded_image.capture_time_ms_, encoded_image, video_header,
expected_retransmission_time_ms);
}
RTPSenderVideo::SendEncodedImage()
主要:
- 檢查是否有幀變換器,則對幀進行變換,完成后會被異步發送
- 調用SendVideo()開始對image進行rtp組包
bool RTPSenderVideo::SendVideo(
int payload_type,
absl::optional<VideoCodecType> codec_type,
uint32_t rtp_timestamp,
int64_t capture_time_ms,
rtc::ArrayView<const uint8_t> payload,
RTPVideoHeader video_header,
absl::optional<int64_t> expected_retransmission_time_ms,
absl::optional<int64_t> estimated_capture_clock_offset_ms) {
#if RTC_TRACE_EVENTS_ENABLED
TRACE_EVENT_ASYNC_STEP1("webrtc", "Video", capture_time_ms, "Send", "type",
FrameTypeToString(video_header.frame_type));
#endif
RTC_CHECK_RUNS_SERIALIZED(&send_checker_);
if (video_header.frame_type == VideoFrameType::kEmptyFrame)
return true;
if (payload.empty())
return false;
int32_t retransmission_settings = retransmission_settings_;
if (codec_type == VideoCodecType::kVideoCodecH264) {
// Backward compatibility for older receivers without temporal layer logic.
retransmission_settings = kRetransmitBaseLayer | kRetransmitHigherLayers;
}
// 根據video_header信息,更新播放延遲(current_playout_delay_)
MaybeUpdateCurrentPlayoutDelay(video_header);
if (video_header.frame_type == VideoFrameType::kVideoFrameKey) {
if (!IsNoopDelay(current_playout_delay_)) {
// Force playout delay on key-frames, if set.
playout_delay_pending_ = true;
}
if (allocation_) {
// Send the bitrate allocation on every key frame.
send_allocation_ = SendVideoLayersAllocation::kSendWithResolution;
}
}
// 更新rtp header 中的active_decode_target的掩碼,是av1編碼器中的一個概念
// 詳見: https://aomediacodec.github.io/av1-rtp-spec/#a4-active-decode-targets
// 大概就是說,編解碼的參考幀隊列存在好幾個,發生改變的時候要通知對端更換
// 參考幀隊列
if (video_structure_ != nullptr && video_header.generic) {
active_decode_targets_tracker_.OnFrame(
video_structure_->decode_target_protected_by_chain,
video_header.generic->active_decode_targets,
video_header.frame_type == VideoFrameType::kVideoFrameKey,
video_header.generic->frame_id, video_header.generic->chain_diffs);
}
const uint8_t temporal_id = GetTemporalId(video_header);
// No FEC protection for upper temporal layers, if used.
// 檢查是否使用FEC,FEC的保護只對於時域層為0,或非SVC編碼的幀使用
const bool use_fec = fec_type_.has_value() &&
(temporal_id == 0 || temporal_id == kNoTemporalIdx);
// Maximum size of packet including rtp headers.
// Extra space left in case packet will be resent using fec or rtx.
// 計算去除了fec和rtx頭之后,packet所剩的容量
int packet_capacity = rtp_sender_->MaxRtpPacketSize() -
(use_fec ? FecPacketOverhead() : 0) -
(rtp_sender_->RtxStatus() ? kRtxHeaderSize : 0);
// 構造packet,設置payload_type, timstamp
std::unique_ptr<RtpPacketToSend> single_packet =
rtp_sender_->AllocatePacket();
RTC_DCHECK_LE(packet_capacity, single_packet->capacity());
single_packet->SetPayloadType(payload_type);
single_packet->SetTimestamp(rtp_timestamp);
single_packet->set_capture_time_ms(capture_time_ms);
const absl::optional<AbsoluteCaptureTime> absolute_capture_time =
absolute_capture_time_sender_.OnSendPacket(
AbsoluteCaptureTimeSender::GetSource(single_packet->Ssrc(),
single_packet->Csrcs()),
single_packet->Timestamp(), kVideoPayloadTypeFrequency,
Int64MsToUQ32x32(single_packet->capture_time_ms() + NtpOffsetMs()),
/*estimated_capture_clock_offset=*/
include_capture_clock_offset_ ? estimated_capture_clock_offset_ms
: absl::nullopt);
auto first_packet = std::make_unique<RtpPacketToSend>(*single_packet);
auto middle_packet = std::make_unique<RtpPacketToSend>(*single_packet);
auto last_packet = std::make_unique<RtpPacketToSend>(*single_packet);
// Simplest way to estimate how much extensions would occupy is to set them.
// 根據video_header 給packet添加extension
AddRtpHeaderExtensions(video_header, absolute_capture_time,
/*first_packet=*/true, /*last_packet=*/true,
single_packet.get());
AddRtpHeaderExtensions(video_header, absolute_capture_time,
/*first_packet=*/true, /*last_packet=*/false,
first_packet.get());
AddRtpHeaderExtensions(video_header, absolute_capture_time,
/*first_packet=*/false, /*last_packet=*/false,
middle_packet.get());
AddRtpHeaderExtensions(video_header, absolute_capture_time,
/*first_packet=*/false, /*last_packet=*/true,
last_packet.get());
RTC_DCHECK_GT(packet_capacity, single_packet->headers_size());
RTC_DCHECK_GT(packet_capacity, first_packet->headers_size());
RTC_DCHECK_GT(packet_capacity, middle_packet->headers_size());
RTC_DCHECK_GT(packet_capacity, last_packet->headers_size());
RtpPacketizer::PayloadSizeLimits limits;
limits.max_payload_len = packet_capacity - middle_packet->headers_size();
RTC_DCHECK_GE(single_packet->headers_size(), middle_packet->headers_size());
limits.single_packet_reduction_len =
single_packet->headers_size() - middle_packet->headers_size();
RTC_DCHECK_GE(first_packet->headers_size(), middle_packet->headers_size());
limits.first_packet_reduction_len =
first_packet->headers_size() - middle_packet->headers_size();
RTC_DCHECK_GE(last_packet->headers_size(), middle_packet->headers_size());
limits.last_packet_reduction_len =
last_packet->headers_size() - middle_packet->headers_size();
bool has_generic_descriptor =
first_packet->HasExtension<RtpGenericFrameDescriptorExtension00>() ||
first_packet->HasExtension<RtpDependencyDescriptorExtension>();
// Minimization of the vp8 descriptor may erase temporal_id, so use
// |temporal_id| rather than reference |video_header| beyond this point.
if (has_generic_descriptor) {
MinimizeDescriptor(&video_header);
}
// 如果幀加密了,對payload和header進行加密
// TODO(benwright@webrtc.org) - Allocate enough to always encrypt inline.
rtc::Buffer encrypted_video_payload;
if (frame_encryptor_ != nullptr) {
if (!has_generic_descriptor) {
return false;
}
// 獲取幀加密后最大的長度
const size_t max_ciphertext_size =
frame_encryptor_->GetMaxCiphertextByteSize(cricket::MEDIA_TYPE_VIDEO,
payload.size());
encrypted_video_payload.SetSize(max_ciphertext_size);
size_t bytes_written = 0;
// Enable header authentication if the field trial isn't disabled.
std::vector<uint8_t> additional_data;
if (generic_descriptor_auth_experiment_) {
additional_data = RtpDescriptorAuthentication(video_header);
}
if (frame_encryptor_->Encrypt(
cricket::MEDIA_TYPE_VIDEO, first_packet->Ssrc(), additional_data,
payload, encrypted_video_payload, &bytes_written) != 0) {
return false;
}
encrypted_video_payload.SetSize(bytes_written);
payload = encrypted_video_payload;
} else if (require_frame_encryption_) {
RTC_LOG(LS_WARNING)
<< "No FrameEncryptor is attached to this video sending stream but "
"one is required since require_frame_encryptor is set";
}
std::unique_ptr<RtpPacketizer> packetitpzer =
RtpPacketizer::Create(codec_type, payload, limits, video_header);
// TODO(bugs.webrtc.org/10714): retransmission_settings_ should generally be
// replaced by expected_retransmission_time_ms.has_value(). For now, though,
// only VP8 with an injected frame buffer controller actually controls it.
// RTX的重傳精細到幀
// 檢查上層是否設置了允許重傳的時間(default:125ms)從而設置該幀允許重傳
const bool allow_retransmission =
expected_retransmission_time_ms.has_value()
? AllowRetransmission(temporal_id, retransmission_settings,
expected_retransmission_time_ms.value())
: false;
const size_t num_packets = packetizer->NumPackets();
if (num_packets == 0)
return false;
bool first_frame = first_frame_sent_();
std::vector<std::unique_ptr<RtpPacketToSend>> rtp_packets;
for (size_t i = 0; i < num_packets; ++i) {
std::unique_ptr<RtpPacketToSend> packet;
int expected_payload_capacity;
// Choose right packet template:
if (num_packets == 1) {
packet = std::move(single_packet);
expected_payload_capacity =
limits.max_payload_len - limits.single_packet_reduction_len;
} else if (i == 0) {
packet = std::move(first_packet);
expected_payload_capacity =
limits.max_payload_len - limits.first_packet_reduction_len;
} else if (i == num_packets - 1) {
packet = std::move(last_packet);
expected_payload_capacity =
limits.max_payload_len - limits.last_packet_reduction_len;
} else {
packet = std::make_unique<RtpPacketToSend>(*middle_packet);
expected_payload_capacity = limits.max_payload_len;
}
packet->set_first_packet_of_frame(i == 0);
if (!packetizer->NextPacket(packet.get()))
return false;
RTC_DCHECK_LE(packet->payload_size(), expected_payload_capacity);
// 設置重傳,關鍵幀,
packet->set_allow_retransmission(allow_retransmission);
packet->set_is_key_frame(video_header.frame_type ==
VideoFrameType::kVideoFrameKey);
// Put packetization finish timestamp into extension.
if (packet->HasExtension<VideoTimingExtension>()) {
packet->set_packetization_finish_time_ms(clock_->TimeInMilliseconds());
}
packet->set_fec_protect_packet(use_fec);
if (red_enabled()) {
// 如果啟用了red封裝,重新對payload進行red封裝后,將包設置程redpacket
// TODO(sprang): Consider packetizing directly into packets with the RED
// header already in place, to avoid this copy.
std::unique_ptr<RtpPacketToSend> red_packet(new RtpPacketToSend(*packet));
BuildRedPayload(*packet, red_packet.get());//對media payload進行red封裝
red_packet->SetPayloadType(*red_payload_type_);
red_packet->set_is_red(true);
// Append |red_packet| instead of |packet| to output.
red_packet->set_packet_type(RtpPacketMediaType::kVideo);
red_packet->set_allow_retransmission(packet->allow_retransmission());
rtp_packets.emplace_back(std::move(red_packet));
} else {
packet->set_packet_type(RtpPacketMediaType::kVideo);
rtp_packets.emplace_back(std::move(packet));
}
if (first_frame) {
if (i == 0) {
RTC_LOG(LS_INFO)
<< "Sent first RTP packet of the first video frame (pre-pacer)";
}
if (i == num_packets - 1) {
RTC_LOG(LS_INFO)
<< "Sent last RTP packet of the first video frame (pre-pacer)";
}
}
}
// 設置sequence
if (!rtp_sender_->AssignSequenceNumbersAndStoreLastPacketState(rtp_packets)) {
// Media not being sent.
return false;
}
// 轉發到pacer
LogAndSendToNetwork(std::move(rtp_packets), payload.size());
// Update details about the last sent frame.
last_rotation_ = video_header.rotation;
if (video_header.color_space != last_color_space_) {
last_color_space_ = video_header.color_space;
transmit_color_space_next_frame_ = !IsBaseLayer(video_header);
} else {
transmit_color_space_next_frame_ =
transmit_color_space_next_frame_ ? !IsBaseLayer(video_header) : false;
}
// 復位delay設置,delay是由video_header決定的,之前解析的時候設置了playout_delay_pending_
// 為true,此處對它進行復位
if (video_header.frame_type == VideoFrameType::kVideoFrameKey ||
PacketWillLikelyBeRequestedForRestransmitionIfLost(video_header)) {
// This frame will likely be delivered, no need to populate playout
// delay extensions until it changes again.
playout_delay_pending_ = false;
send_allocation_ = SendVideoLayersAllocation::kDontSend;
}
TRACE_EVENT_ASYNC_END1("webrtc", "Video", capture_time_ms, "timestamp",
rtp_timestamp);
return true;
}
SendEncodedImage()很長,有300行,但主要工作從video_header解析出相關信息用於該幀rtp_packet初始化,主要包括如下內容:
- 調用MaybeUpdateCurrentPlayoutDelay()解析video_header,看是否需要延遲播放,修正延遲播放時間
- 由於payload可能超過一個rtp packet size,需要分包,引入了單包:single_packet,分包: first_packet, middle_packet, last_packet 兩種情況
- 使用AddRtpHeaderExtensions()從video_header解析出rtp擴展,設置到rtp_packet中
- 如果啟用幀加密,調用frame_encryptor_->Encrypt()對幀和rtp擴展進行加密
- 使用video_header, payload_type, 等生成Rtp打包器RtpPacketizer,RtpPacketizer將payload內容填充到rtp packet中
- 設置packet是否使用fec
- 如果enable red封裝(詳見rfc2198),使用BuildRedPayload()對payload進行red封裝(就是在media payload前添加一個red header), 然后設置packet的payload_type
- 使用AssignSequenceNumbersAndStoreLastPacketState()對packet設置sequence
- 通過LogAndSendToNetwork()將rtp packet轉發到pacer
void RTPSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
RTC_DCHECK(!packets.empty());
int64_t now_ms = clock_->TimeInMilliseconds();
for (auto& packet : packets) {
RTC_DCHECK(packet);
RTC_CHECK(packet->packet_type().has_value())
<< "Packet type must be set before sending.";
if (packet->capture_time_ms() <= 0) {
packet->set_capture_time_ms(now_ms);
}
}
// 將packet投遞到pacer中
paced_sender_->EnqueuePackets(std::move(packets));
}
void RTPSenderVideo::LogAndSendToNetwork(
std::vector<std::unique_ptr<RtpPacketToSend>> packets,
size_t unpacketized_payload_size) {
{
MutexLock lock(&stats_mutex_);
size_t packetized_payload_size = 0;
for (const auto& packet : packets) {
if (*packet->packet_type() == RtpPacketMediaType::kVideo) {
packetized_payload_size += packet->payload_size();
}
}
// 統計打包后的payload的碼率開銷
// AV1 and H264 packetizers may produce less packetized bytes than
// unpacketized.
if (packetized_payload_size >= unpacketized_payload_size) {
packetization_overhead_bitrate_.Update(
packetized_payload_size - unpacketized_payload_size,
clock_->TimeInMilliseconds());
}
}
// 將packet投入paced的發送隊列中
rtp_sender_->EnqueuePackets(std::move(packets));
}
packet實際上被投遞到pacing_controller_的發送隊列中
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
{
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacedSender::EnqueuePackets");
MutexLock lock(&mutex_);
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacedSender::EnqueuePackets::Loop", "sequence_number",
packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp());
RTC_DCHECK_GE(packet->capture_time_ms(), 0);
pacing_controller_.EnqueuePacket(std::move(packet));
}
}
// 喚醒處理線程
MaybeWakupProcessThread();
}
2.2.5 PacedSender
2.2.5.1 喚醒Module過程
RtpPacket被投遞到paced_controller_的發送隊列后,會執行PacedSender::MaybeWakupProcessThread()
喚醒處理線程調用注冊的process()
去處理發送隊列:
void PacedSender::MaybeWakupProcessThread() {
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new time for when to call Process().
if (process_thread_ &&
process_mode_ == PacingController::ProcessMode::kDynamic) {
process_thread_->WakeUp(&module_proxy_);
}
}
此處的喚醒機制涉及到到webrtc中的ProcessThread
,ProcessThread
提供以下接口:
class ProcessThread : public TaskQueueBase {
public:
~ProcessThread() override;
static std::unique_ptr<ProcessThread> Create(const char* thread_name);
// Starts the worker thread. Must be called from the construction thread.
virtual void Start() = 0;
// Stops the worker thread. Must be called from the construction thread.
virtual void Stop() = 0;
// Wakes the thread up to give a module a chance to do processing right
// away. This causes the worker thread to wake up and requery the specified
// module for when it should be called back. (Typically the module should
// return 0 from TimeUntilNextProcess on the worker thread at that point).
// Can be called on any thread.
virtual void WakeUp(Module* module) = 0;
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;
// Removes a previously registered module.
// Can be called from any thread.
virtual void DeRegisterModule(Module* module) = 0;
};
執行ProcessThread::RegisterModule(module)
將一個Module添加進來,Module的接口如下,對Process()
進行override后,使用ProcessThread::WakeUp(module)
則會喚醒去調用Process()
class Module {
public:
virtual int64_t TimeUntilNextProcess() = 0;
// Process any pending tasks such as timeouts.
// Called on a worker thread.
virtual void Process() = 0;
// This method is called when the module is attached to a *running* process
// thread or detached from one. In the case of detaching, |process_thread|
// will be nullptr.
virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
protected:
virtual ~Module() {}
};
PacedSender使用私有實現的方式,繼承class Module
將其接口進行private override,保證了對象對外public接口的整潔性,因為這部分邏輯屬於類的內部。因為private override所以饒了一個圈引入了一個module_proxy和delegate去和process_thread聯系?
class PacedSender : public Module,
public RtpPacketPacer,
public RtpPacketSender {
...
private:
void ProcessThreadAttached(ProcessThread* process_thread) override;
void MaybeWakupProcessThread();
// Private implementation of Module to not expose those implementation details
// publicly and control when the class is registered/deregistered.
class ModuleProxy : public Module {
public:
explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {}
private:
int64_t TimeUntilNextProcess() override {
return delegate_->TimeUntilNextProcess();
}
void Process() override { return delegate_->Process(); }
void ProcessThreadAttached(ProcessThread* process_thread) override {
return delegate_->ProcessThreadAttached(process_thread);
}
PacedSender* const delegate_;
} module_proxy_{this};
};
但實際上private override 一個 public 函數,仍然可以通過基類多態的public調用,module_proxy_這個圈好像沒必要繞
class c1 {
public:
virtual void fn() = 0;
};
class c2 : public c1 {
private:
void fn() {
std::cout << "c2";
}
};
int main()
{
c1* temp = new c2();
temp->fn(); //輸出 c2
return 0;
}
wakeup后最后觸發的回調是PacedSender::Process()
,處理邏輯轉到了pacing_controller
void PacedSender::Process() {
MutexLock lock(&mutex_);
pacing_controller_.ProcessPackets();
}
2.2.5.2 ProcessPackets()
ProcessPackets()
中會從發送隊列取包進行轉發,其中增添了很多邏輯,用於帶寬探測和發送速率控制:
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
Timestamp target_send_time = now;
// 更新處理時間和發送流量budget
// mode_有兩種:
// kPeriodic:使用IntervalBudget class 跟蹤碼率,期望Process以為固定速率(5ms)進行
// kDynamic:Process是以不定速率進行的
if (mode_ == ProcessMode::kDynamic) {
// 此處有一個時間模型
// target_send_time 屬於區間 [now, now + early_execute_margin]
// 獲取目標發送時間
target_send_time = NextSendTime();
// 獲取最大預執行時刻
// 帶寬探測允許1ms提前處理,正常發幀不允許
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
if (target_send_time.IsMinusInfinity()) {
// 目標發送時間是無窮小,則設置為現在
target_send_time = now;
} else if (now < target_send_time - early_execute_margin) {
// 當前時刻比預執行時刻早太多,更新發送碼率的Budget
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
return;
}
if (target_send_time < last_process_time_) {
// target_send_time可能由NextSendTime()一些錯誤的行為或回環導致其比
// last_process_time_小,此時就要特殊處理Budeget的更新,否則將導致
// 發送的budget沒有隨時間的流逝而增加,從而使得包沒有budget可供發送
// 所以采用last_process_time_ - target_send_time的方式計算ElapsedTime
// After the last process call, at time X, the target send time
// shifted to be earlier than X. This should normally not happen
// but we want to make sure rounding errors or erratic behavior
// of NextSendTime() does not cause issue. In particular, if the
// buffer reduction of
// rate * (target_send_time - previous_process_time)
// in the main loop doesn't clean up the existing debt we may not
// be able to send again. We don't want to check this reordering
// there as it is the normal exit condtion when the buffer is
// exhausted and there are packets in the queue.
UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
target_send_time = last_process_time_;
}
}
Timestamp previous_process_time = last_process_time_;
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
// 檢測是否需要發keepalive包
if (ShouldSendKeepalive(now)) {
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ == 0) {
last_send_time_ = now;
} else {
DataSize keepalive_data_sent = DataSize::Zero();
// 生成一個1 Bytes的keepalive包
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
keepalive_data_sent +=
DataSize::Bytes(packet->payload_size() + packet->padding_size());
// 發送keepalive包
packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
// 看是否有新增的fec包,有則投入發送隊列中
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
}
OnPaddingSent(keepalive_data_sent);
}
}
if (paused_) {
return;
}
// 根據發送隊列大小計算目標碼率,使用目標碼率更新
// 預算
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateQueueTime(now);
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit - packet_queue_.AverageQueueTime());
// 根據隊列大小計算最小目標碼率
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< target_rate.kbps();
}
}
}
if (mode_ == ProcessMode::kPeriodic) {
// In periodic processing mode, the IntevalBudget allows positive budget
// up to (process interval duration) * (target rate), so we only need to
// update it once before the packet sending loop.
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
} else {
media_rate_ = target_rate;
}
}
// 從prober_獲取探測碼率
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
// 從當前探測包簇中獲取探測碼率
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
if (first_packet_in_probe) {
// 還處於碼率探測時期(還未開始碼率探測?),插入一個極小padding包從而設置慢啟動窗口初始大小
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// 將探測包放入prober中
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
first_packet_in_probe = false;
}
// 循環內更新發送budget
if (mode_ == ProcessMode::kDynamic &&
previous_process_time < target_send_time) {
// Reduce buffer levels with amount corresponding to time between last
// process and target send time for the next packet.
// If the process call is late, that may be the time between the optimal
// send times for two packets we should already have sent.
UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
previous_process_time = target_send_time;
}
// Fetch the next packet, so long as queue is not empty or budget is not
// exhausted.
// 根據budget和發送時間從queue中獲取下一個要發的包
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
// 計算除去發送媒體所占用的碼率后,還能夠padding去做通道探測的大小
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
// pading包入隊列
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
// 發包
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
// 將新生成的fec包也發出去
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
if (mode_ == ProcessMode::kDynamic) {
// Update target send time in case that are more packets that we are late
// in processing.
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsMinusInfinity()) {
target_send_time = now;
} else {
target_send_time = std::min(now, next_send_time);
}
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
//prober更新已發送大小
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
}
總結起來,主要有以下內容:
-
使用budget進行發送碼率控制,budget會隨着時間流逝而增長,隨着發送而減小,budget在內部的維護上,又根據ProcessPackets()是被固定周期調用(ProcessMode::kPeriodic)使用
media_budget_
,還是被動態調用(ProcessMode::kDynamic) 則使用media_debt_
維護 -
在入口使用UpdateBudgetWithElapsedTime()函數通過流逝的時間更新發送budget
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { if (mode_ == ProcessMode::kPeriodic) { delta = std::min(kMaxProcessingInterval, delta); media_budget_.IncreaseBudget(delta.ms()); padding_budget_.IncreaseBudget(delta.ms()); } else { media_debt_ -= std::min(media_debt_, media_rate_ * delta); padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta); } }
-
使用
ShouldSendKeepalive()
檢查是否需要發送keepalive包, 判定的依據如下,如果需要則構造一個1Bytes的包發送bool PacingController::ShouldSendKeepalive(Timestamp now) const { if (send_padding_if_silent_ || paused_ || Congested() || packet_counter_ == 0) { // 沒有feedback過來就處於congested狀態,則每500ms就會有一個keepalive探測包 // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. TimeDelta elapsed_since_last_send = now - last_send_time_; if (elapsed_since_last_send >= kCongestedPacketInterval) { return true; } } return false; }
-
根據發送隊列大小和packet能在隊列存放的時間計算一個目標發送碼率target_rate
-
從帶寬探測器prober_中獲取當前的探測碼率,具體的prober原理可參考此篇, 通過發送包簇的發送碼率和接受碼率是否出現差值判斷是否達到鏈路最大容量
-
循環掃發送隊列,將隊列的包通過packet_sender->SendPacket()進行發送,並通過packet_sender->FetchFec()獲取已發送包的fec包,進行發送
2.2.5.3 PacketRouter轉發幀
Rtp Packet 經過轉發到了PacketRouter::SendPacket()中
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"), "PacketRouter::SendPacket",
"sequence_number", packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp());
MutexLock lock(&modules_mutex_);
// With the new pacer code path, transport sequence numbers are only set here,
// on the pacer thread. Therefore we don't need atomics/synchronization.
// 設置transpoort sequence number
if (packet->HasExtension<TransportSequenceNumber>()) {
packet->SetExtension<TransportSequenceNumber>((++transport_seq_) & 0xFFFF);
}
// 找到ssrc對應的send module
uint32_t ssrc = packet->Ssrc();
auto kv = send_modules_map_.find(ssrc);
if (kv == send_modules_map_.end()) {
RTC_LOG(LS_WARNING)
<< "Failed to send packet, matching RTP module not found "
"or transport error. SSRC = "
<< packet->Ssrc() << ", sequence number " << packet->SequenceNumber();
return;
}
// rtp_module -> ModuleRtpRtcpImpl2
RtpRtcpInterface* rtp_module = kv->second;
if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
return;
}
if (rtp_module->SupportsRtxPayloadPadding()) {
// This is now the last module to send media, and has the desired
// properties needed for payload based padding. Cache it for later use.
last_send_module_ = rtp_module;
}
// 取出當前包fec packet,存入pending_fec_packets_中
// 等待PacedController取出發送
for (auto& packet : rtp_module->FetchFecPackets()) {
pending_fec_packets_.push_back(std::move(packet));
}
}
主要做了三件事:
- 如果設置transport suquence number 的Rtp Extension,則將該sequence number填入
- PacketRouter會根據當前包的ssrc對應的ModuleRtpRtcpImpl2(開啟了simulcast后,每一個simulcast都有獨立的ModuleRtpRtcpImpl2), 然后調用
ModuleRtpRtcpImpl2::TrySendPacket()
進行發包 - 將發送過程中產生的fec包取出暫存到pending_fec_packets_中,等待外部獲取,決定是否投遞發送
2.2.5.4 ModuleRtpRtcpImpl2
packet接下來會到ModuleRtpRtcpImpl2的TrySendPacket()函數,主要是調用rtp_sender_->packet_sender進行發送
bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(rtp_sender_);
// TODO(sprang): Consider if we can remove this check.
if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
}
rtp_sender_->packet_sender.SendPacket(packet, pacing_info);
return true;
}
ModuleRtpRtcpImpl2內部含有rtp_sender, rtcp_sender_, rtcp_receiver_, 相當於rtp/rtcp的收發功能聚合在一起,同時提供了一些SendNack()等這樣的功能;而TrySendPacket()中使用的rtp_sender_對象是ModuleRtpRtcpImpl2::RtpSenderContext類型, 該類型內部有之前用來生成RTP包的RTPSender packet_generator;
還有此處調用來發包的RtpSenderEgress packet_sender;
class ModuleRtpRtcpImpl2{
...
struct RtpSenderContext : public SequenceNumberAssigner {
explicit RtpSenderContext(const RtpRtcpInterface::Configuration& config);
void AssignSequenceNumber(RtpPacketToSend* packet) override;
// Storage of packets, for retransmissions and padding, if applicable.
RtpPacketHistory packet_history;
// Handles final time timestamping/stats/etc and handover to Transport.
RtpSenderEgress packet_sender;
// If no paced sender configured, this class will be used to pass packets
// from |packet_generator_| to |packet_sender_|.
RtpSenderEgress::NonPacedPacketSender non_paced_sender;
// Handles creation of RTP packets to be sent.
RTPSender packet_generator;
};
std::unique_ptr<RtpSenderContext> rtp_sender_;
RTCPSender rtcp_sender_;
RTCPReceiver rtcp_receiver_;
};
2.2.5.6 RtpSenderEgress轉發包到transport
經轉發后,packet來到了RtpSenderEgress::SendPacket()
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(&pacer_checker_);
RTC_DCHECK(packet);
RTC_DCHECK(packet->packet_type().has_value());
RTC_DCHECK(HasCorrectSsrc(*packet));
const uint32_t packet_ssrc = packet->Ssrc();
const int64_t now_ms = clock_->TimeInMilliseconds();
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
worker_queue_->PostTask(
ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc]() {
BweTestLoggingPlot(now_ms, packet_ssrc);
}));
#endif
// 將packet的timestamp,sequence number等保存下來
if (need_rtp_packet_infos_ &&
packet->packet_type() == RtpPacketToSend::Type::kVideo) {
worker_queue_->PostTask(ToQueuedTask(
task_safety_,
[this, packet_timestamp = packet->Timestamp(),
is_first_packet_of_frame = packet->is_first_packet_of_frame(),
is_last_packet_of_frame = packet->Marker(),
sequence_number = packet->SequenceNumber()]() {
RTC_DCHECK_RUN_ON(worker_queue_);
// Last packet of a frame, add it to sequence number info map.
const uint32_t timestamp = packet_timestamp - timestamp_offset_;
rtp_sequence_number_map_->InsertPacket(
sequence_number,
RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
is_last_packet_of_frame));
}));
}
//fec處理
if (fec_generator_ && packet->fec_protect_packet()) {
// This packet should be protected by FEC, add it to packet generator.
RTC_DCHECK(fec_generator_);
RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
new_fec_params;
{
MutexLock lock(&lock_);
new_fec_params.swap(pending_fec_params_);
}
// fec_rate和fec_max_frame 可能被更新了
if (new_fec_params) {
fec_generator_->SetProtectionParameters(new_fec_params->first,
new_fec_params->second);
}
if (packet->is_red()) {
// 對packet做了red封裝(rfc2198),需要解red封裝得到原始rtp包
// 很奇怪,會進行red封包的應該只有fec,普通包也會進行fec封裝?
// 復制整個包
RtpPacketToSend unpacked_packet(*packet);
const rtc::CopyOnWriteBuffer buffer = packet->Buffer();
// Grab media payload type from RED header.
const size_t headers_size = packet->headers_size();
unpacked_packet.SetPayloadType(buffer[headers_size]);
// 對copyonwirte的payload進行拷貝
// Copy the media payload into the unpacked buffer.
uint8_t* payload_buffer =
unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
std::copy(&packet->payload()[0] + 1,
&packet->payload()[0] + packet->payload_size(), payload_buffer);
// 對包做fec
fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
} else {
// If not RED encapsulated - we can just insert packet directly.
fec_generator_->AddPacketAndGenerateFec(*packet);
}
}
//
// Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
// the pacer, these modifications of the header below are happening after the
// FEC protection packets are calculated. This will corrupt recovered packets
// at the same place. It's not an issue for extensions, which are present in
// all the packets (their content just may be incorrect on recovered packets).
// In case of VideoTimingExtension, since it's present not in every packet,
// data after rtp header may be corrupted if these packets are protected by
// the FEC.
int64_t diff_ms = now_ms - packet->capture_time_ms();
if (packet->HasExtension<TransmissionOffset>()) {
packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff_ms);
}
if (packet->HasExtension<AbsoluteSendTime>()) {
packet->SetExtension<AbsoluteSendTime>(
AbsoluteSendTime::MsTo24Bits(now_ms));
}
if (packet->HasExtension<VideoTimingExtension>()) {
if (populate_network2_timestamp_) {
packet->set_network2_time_ms(now_ms);
} else {
packet->set_pacer_exit_time_ms(now_ms);
}
}
const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
packet->packet_type() == RtpPacketMediaType::kVideo;
PacketOptions options;
{
MutexLock lock(&lock_);
options.included_in_allocation = force_part_of_allocation_;
}
// Downstream code actually uses this flag to distinguish between media and
// everything else.
options.is_retransmit = !is_media;
if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true;
options.included_in_allocation = true;
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
options.additional_data = packet->additional_data();
if (packet->packet_type() != RtpPacketMediaType::kPadding &&
packet->packet_type() != RtpPacketMediaType::kRetransmission) {
UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
packet_ssrc);
}
// 轉發packet
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
// Put packet in retransmission history or update pending status even if
// actual sending fails.
if (is_media && packet->allow_retransmission()) {
// 放入重傳隊列
packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
now_ms);
} else if (packet->retransmitted_sequence_number()) {
// 標記此包已重傳
packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
}
if (send_success) {
// |media_has_been_sent_| is used by RTPSender to figure out if it can send
// padding in the absence of transport-cc or abs-send-time.
// In those cases media must be sent first to set a reference timestamp.
media_has_been_sent_ = true;
// TODO(sprang): Add support for FEC protecting all header extensions, add
// media packet to generator here instead.
RTC_DCHECK(packet->packet_type().has_value());
RtpPacketMediaType packet_type = *packet->packet_type();
RtpPacketCounter counter(*packet);
size_t size = packet->size();
worker_queue_->PostTask(
ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc, packet_type,
counter = std::move(counter), size]() {
RTC_DCHECK_RUN_ON(worker_queue_);
// 更新發送速率等內容
UpdateRtpStats(now_ms, packet_ssrc, packet_type, std::move(counter),
size);
}));
}
}
該函數主要做了以下幾件事:
- 將當前packet的timestamp和sequence做了個映射保存下來
- 將包拷貝一份之后放到fec中做fec
- 給packet設置一個時間擴展,此處提到了一個bug: 當fec包在pacer前做,timestamp會被改變,將導致fec恢復包恢復的相關數據包可能出現payload上的問題,所以用了set_pacer_exit_time_ms(now_ms); 這個bug沒看太懂,按道理上面做fec,下面應該不要改包了,還是對fec流程理解的不夠深入,有空回頭看看
- 調用SendPacketToNetwork()轉發packet
- 將packet放入記錄隊列packet_history_
- 發送成功則調用UpdateRtpStats()更新發送速率等
其中,轉發包SendPacketToNetwork()
如下
bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info) {
int bytes_sent = -1;
if (transport_) {
// 調用transport轉發
bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
? static_cast<int>(packet.size())
: -1;
if (event_log_ && bytes_sent > 0) {
event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
packet, pacing_info.probe_cluster_id));
}
}
if (bytes_sent <= 0) {
RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
return false;
}
return true;
}
主要是調用了transport_->SendRtp()轉發包,其實調用的是WebRtcVideoChannel::SendRtp()
2.2.6 MediaChannel
本節所述的內容如下圖標記
包經過轉發來到了MediaChannel,從MediaChannel發往SRtpTransport,ICETransport最終發往網絡
bool WebRtcVideoChannel::SendRtp(const uint8_t* data,
size_t len,
const webrtc::PacketOptions& options) {
rtc::CopyOnWriteBuffer packet(data, len, kMaxRtpPacketLen);
rtc::PacketOptions rtc_options;
rtc_options.packet_id = options.packet_id;
if (DscpEnabled()) {
rtc_options.dscp = PreferredDscp();
}
rtc_options.info_signaled_after_sent.included_in_feedback =
options.included_in_feedback;
rtc_options.info_signaled_after_sent.included_in_allocation =
options.included_in_allocation;
//轉發
return MediaChannel::SendPacket(&packet, rtc_options);
}
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
//轉發
return DoSendPacket(packet, false, options);
}
bool DoSendPacket(rtc::CopyOnWriteBuffer* packet,
bool rtcp,
const rtc::PacketOptions& options)
RTC_LOCKS_EXCLUDED(network_interface_mutex_) {
webrtc::MutexLock lock(&network_interface_mutex_);
if (!network_interface_)
return false;
// 根據rtp還是rtcp類型進行轉發
return (!rtcp) ? network_interface_->SendPacket(packet, options)
: network_interface_->SendRtcp(packet, options);
}
rtp包來到的是BaseChannel::SendPacket
bool BaseChannel::SendPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
return SendPacket(false, packet, options);
}
bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
// Until all the code is migrated to use RtpPacketType instead of bool.
RtpPacketType packet_type = rtcp ? RtpPacketType::kRtcp : RtpPacketType::kRtp;
// SendPacket gets called from MediaEngine, on a pacer or an encoder thread.
// If the thread is not our network thread, we will post to our network
// so that the real work happens on our network. This avoids us having to
// synchronize access to all the pieces of the send path, including
// SRTP and the inner workings of the transport channels.
// The only downside is that we can't return a proper failure code if
// needed. Since UDP is unreliable anyway, this should be a non-issue.
// 線程檢查
if (!network_thread_->IsCurrent()) {
// Avoid a copy by transferring the ownership of the packet data.
int message_id = rtcp ? MSG_SEND_RTCP_PACKET : MSG_SEND_RTP_PACKET;
SendPacketMessageData* data = new SendPacketMessageData;
data->packet = std::move(*packet);
data->options = options;
network_thread_->Post(RTC_FROM_HERE, this, message_id, data);
return true;
}
RTC_DCHECK_RUN_ON(network_thread());
TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
// Now that we are on the correct thread, ensure we have a place to send this
// packet before doing anything. (We might get RTCP packets that we don't
// intend to send.) If we've negotiated RTCP mux, send RTCP over the RTP
// transport.
if (!rtp_transport_ || !rtp_transport_->IsWritable(rtcp)) {
return false;
}
// Protect ourselves against crazy data.
if (!IsValidRtpPacketSize(packet_type, packet->size())) {
RTC_LOG(LS_ERROR) << "Dropping outgoing " << ToString() << " "
<< RtpPacketTypeToString(packet_type)
<< " packet: wrong size=" << packet->size();
return false;
}
if (!srtp_active()) {
if (srtp_required_) {
// The audio/video engines may attempt to send RTCP packets as soon as the
// streams are created, so don't treat this as an error for RTCP.
// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=6809
if (rtcp) {
return false;
}
// However, there shouldn't be any RTP packets sent before SRTP is set up
// (and SetSend(true) is called).
RTC_LOG(LS_ERROR) << "Can't send outgoing RTP packet for " << ToString()
<< " when SRTP is inactive and crypto is required";
RTC_NOTREACHED();
return false;
}
std::string packet_type = rtcp ? "RTCP" : "RTP";
RTC_DLOG(LS_WARNING) << "Sending an " << packet_type
<< " packet without encryption for " << ToString()
<< ".";
}
// Bon voyage.
return rtcp ? rtp_transport_->SendRtcpPacket(packet, options, PF_SRTP_BYPASS)
: rtp_transport_->SendRtpPacket(packet, options, PF_SRTP_BYPASS);
}
- 其首先會做線程檢查,當前不是network線程則將任務重投
- 然后使用rtp_transport_->SendRtpPacket()轉發包,其類型為webrtc::SrtpTransport
bool SrtpTransport::SendRtpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) {
if (!IsSrtpActive()) {
RTC_LOG(LS_ERROR)
<< "Failed to send the packet because SRTP transport is inactive.";
return false;
}
rtc::PacketOptions updated_options = options;
TRACE_EVENT0("webrtc", "SRTP Encode");
bool res;
uint8_t* data = packet->MutableData();
int len = rtc::checked_cast<int>(packet->size());
// If ENABLE_EXTERNAL_AUTH flag is on then packet authentication is not done
// inside libsrtp for a RTP packet. A external HMAC module will be writing
// a fake HMAC value. This is ONLY done for a RTP packet.
// Socket layer will update rtp sendtime extension header if present in
// packet with current time before updating the HMAC.
#if !defined(ENABLE_EXTERNAL_AUTH)
// 加密數據
res = ProtectRtp(data, len, static_cast<int>(packet->capacity()), &len);
#else
if (!IsExternalAuthActive()) {
res = ProtectRtp(data, len, static_cast<int>(packet->capacity()), &len);
} else {
updated_options.packet_time_params.rtp_sendtime_extension_id =
rtp_abs_sendtime_extn_id_;
res = ProtectRtp(data, len, static_cast<int>(packet->capacity()), &len,
&updated_options.packet_time_params.srtp_packet_index);
// If protection succeeds, let's get auth params from srtp.
if (res) {
uint8_t* auth_key = nullptr;
int key_len = 0;
res = GetRtpAuthParams(
&auth_key, &key_len,
&updated_options.packet_time_params.srtp_auth_tag_len);
if (res) {
updated_options.packet_time_params.srtp_auth_key.resize(key_len);
updated_options.packet_time_params.srtp_auth_key.assign(
auth_key, auth_key + key_len);
}
}
}
#endif
if (!res) {
int seq_num = -1;
uint32_t ssrc = 0;
cricket::GetRtpSeqNum(data, len, &seq_num);
cricket::GetRtpSsrc(data, len, &ssrc);
RTC_LOG(LS_ERROR) << "Failed to protect RTP packet: size=" << len
<< ", seqnum=" << seq_num << ", SSRC=" << ssrc;
return false;
}
// Update the length of the packet now that we've added the auth tag.
packet->SetSize(len);
return SendPacket(/*rtcp=*/false, packet, updated_options, flags);
}
其主要:
-
對數據執行ProtectRtp()進行加密
-
調用SendPacket()將包轉發到RtpTransport去轉發
bool RtpTransport::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) {
rtc::PacketTransportInternal* transport = rtcp && !rtcp_mux_enabled_
? rtcp_packet_transport_
: rtp_packet_transport_;
// Transport 為DtlsTransport
int ret = transport->SendPacket(packet->cdata<char>(), packet->size(),
options, flags);
if (ret != static_cast<int>(packet->size())) {
if (transport->GetError() == ENOTCONN) {
RTC_LOG(LS_WARNING) << "Got ENOTCONN from transport.";
SetReadyToSend(rtcp, false);
}
return false;
}
return true;
}
由於這里做的是本機的回環測試沒有開DTLS,所以直接通過ice_tranport_->SendPacket() 發包
// Called from upper layers to send a media packet.
int DtlsTransport::SendPacket(const char* data,
size_t size,
const rtc::PacketOptions& options,
int flags) {
if (!dtls_active_) {
// Not doing DTLS.
return ice_transport_->SendPacket(data, size, options);// <-
}
switch (dtls_state()) {
case DTLS_TRANSPORT_NEW:
// Can't send data until the connection is active.
// TODO(ekr@rtfm.com): assert here if dtls_ is NULL?
return -1;
case DTLS_TRANSPORT_CONNECTING:
// Can't send data until the connection is active.
return -1;
case DTLS_TRANSPORT_CONNECTED:
if (flags & PF_SRTP_BYPASS) {
RTC_DCHECK(!srtp_ciphers_.empty());
if (!IsRtpPacket(data, size)) {
return -1;
}
return ice_transport_->SendPacket(data, size, options);
} else {
return (dtls_->WriteAll(data, size, NULL, NULL) == rtc::SR_SUCCESS)
? static_cast<int>(size)
: -1;
}
case DTLS_TRANSPORT_FAILED:
// Can't send anything when we're failed.
RTC_LOG(LS_ERROR)
<< ToString()
<< ": Couldn't send packet due to DTLS_TRANSPORT_FAILED.";
return -1;
case DTLS_TRANSPORT_CLOSED:
// Can't send anything when we're closed.
RTC_LOG(LS_ERROR)
<< ToString()
<< ": Couldn't send packet due to DTLS_TRANSPORT_CLOSED.";
return -1;
default:
RTC_NOTREACHED();
return -1;
}
}
最后就能看到包在ice_connection中被發送,在往下就是UDP層了
int P2PTransportChannel::SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags) {
RTC_DCHECK_RUN_ON(network_thread_);
if (flags != 0) {
error_ = EINVAL;
return -1;
}
// If we don't think the connection is working yet, return ENOTCONN
// instead of sending a packet that will probably be dropped.
if (!ReadyToSend(selected_connection_)) {
error_ = ENOTCONN;
return -1;
}
last_sent_packet_id_ = options.packet_id;
rtc::PacketOptions modified_options(options);
modified_options.info_signaled_after_sent.packet_type =
rtc::PacketType::kData;
// 發送數據
int sent = selected_connection_->Send(data, len, modified_options);
if (sent <= 0) {
RTC_DCHECK(sent < 0);
error_ = selected_connection_->GetError();
}
return sent;
}