1. 前言
本文是webrtc中擁塞控制的上文,主要是分析webrtc中的擁塞控制的碼率探測,預估和調整的部分,介紹了整體框架和原理以及相關的類;
webrtc版本:M91
2.正文
2.1 整體框架
webrtc中的部分碼控結構如下圖所示,從socket層接收到數據后,到transport解析rtcp包處理得到feedback,通過call將feedback轉發到對應sendstream上的rtcp處理模塊,最終通過RtpTransportControllerSend將feedback轉發到GoogCcNetworkController進行碼率預估后,把預估的碼率(target bitrate), 探測策略(probe config), congestion windows給pacer,pacer轉發給pacingContrller去使用進行發送碼率控制
其中以GoogCcNetworkController作為整個碼率預估及調整的核心 ,涉及的類和流程如下圖所示,紅框中的類在GoogCcNetworkController下
ProbeBitrateEstimator : 根據feedback計算探測碼率,PacingController中會將包按照cluster進行划分,transport-CC報文能得到包所屬的cluster以及發送和接收信息,通過發送和接收的數據大小比判斷是否到達鏈路上限從而進行帶寬探測
AcknowledgedBitrateEstimator : 估算當前的吞吐量
BitrateEstimator :使用滑動窗口 + 卡爾曼濾波計算當前發送吞吐量
DelayBasedBwe : 基於延遲預估碼率
TrendlineEstimator : 使用線性回歸計算當前網絡擁堵情況
AimdRateControl : 通過TrendLine預測出來的網絡狀態對碼率進行aimd方式調整
SendSideBandwidthEstimation : 基於丟包計算預估碼率,結合延遲預估碼率,得到最終的目標碼率
ProbeController : 探測控制器,通過目標碼率判斷下次是否探測,探測碼率大小
CongestionWindowPushbackController : 基於當前的rtt設置一個時間窗口,同時基於當前的碼率設置當前時間窗口下的數據量,通過判斷當前窗口的使用量,如果使用量過大的時候,降低編碼時使用的目標碼率,加速窗口消退,減少延遲
AlrDetector : 應用(碼率)受限檢測,檢測當前的發送碼率是否和目標碼率由於編碼器等原因相差過大受限了,受限情況下會觸發帶寬預測過程的特殊處理
NetworkStateEstimator 、 NetworkStatePredictor : 此兩者屬於待開發類,只是在代碼中有,但是還沒開發完,沒用上.
接下來會以GoogCcNetworkController的碼率預估過程為例, 詳細介紹webrtc中帶寬控制的架構和過程。
2.2 GoogCcNetworkController
GoogCcNetworkController是碼率預估的核心類, 如2.1中所示的webrtc中的部分碼控結構上
,可以看到其所屬於**class RtpTransportControllerSend **
2.2.1 GoogCcNetworkController創建時刻
在底層網絡可用的時候,會觸發RtpTransportControllerSend::OnNetworkAvailability()
回調
void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
<< (network_available ? "Up" : "Down");
NetworkAvailability msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.network_available = network_available;
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (network_available_ == msg.network_available)
return;
network_available_ = msg.network_available;
if (network_available_) {
pacer()->Resume();
} else {
pacer()->Pause();
}
pacer()->UpdateOutstandingData(DataSize::Zero());
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
PostUpdates(controller_->OnNetworkAvailability(msg));
UpdateControlState();
} else {
// 未創建controller,創建
MaybeCreateControllers();
}
});
for (auto& rtp_sender : video_rtp_senders_) {
rtp_sender->OnNetworkAvailability(network_available);
}
}
其檢測到未創建controller_時,會調用 RtpTransportControllerSend::MaybeCreateControllers()
創建
void RtpTransportControllerSend::MaybeCreateControllers() {
RTC_DCHECK(!controller_);
RTC_DCHECK(!control_handler_);
if (!network_available_ || !observer_)
return;
control_handler_ = std::make_unique<congestioncontrolhandler>();
initial_config_.constraints.at_time =
Timestamp::Millis(clock_->TimeInMilliseconds());
initial_config_.stream_based_config = streams_config_;
// TODO(srte): Use fallback controller if no feedback is available.
// 創建GoogCcNetworkController
if (controller_factory_override_) {
RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
controller_ = controller_factory_override_->Create(initial_config_);
process_interval_ = controller_factory_override_->GetProcessInterval();
} else {
RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
controller_ = controller_factory_fallback_->Create(initial_config_);
process_interval_ = controller_factory_fallback_->GetProcessInterval();
}
// 間隔更新GoogCcNetworkController
UpdateControllerWithTimeInterval();
StartProcessPeriodicTasks();
}
創建后即刻就調用 UpdateControllerWithTimeInterval()
和 StartProcessPeriodicTasks()
:
void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
RTC_DCHECK(controller_);
ProcessInterval msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
if (add_pacing_to_cwin_)
msg.pacer_queue = pacer()->QueueSizeData();
// 對碼率進行檢測和更新,將結果轉發給pacer
PostUpdates(controller_->OnProcessInterval(msg));
}
UpdateControllerWithTimeInterval()
中:
-
調用
GoogCcNetworkController::OnProcessInterval()
做間隔的碼率檢測和更新 -
調用
PostUpdates()
將最新的碼率給轉發到pacer
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time = pacer()->ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState();
return kPacerQueueUpdateInterval;
});
}
controller_task_.Stop();
if (process_interval_.IsFinite()) {
// 定時檢測更新碼率
controller_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
return process_interval_;
});
}
}
StartProcessPeriodicTasks()
中:
-
對control_handler_進行了更新,control_handler 是一個將controller計算相關碼率信息路由回調給其它模塊的一個類(后續在仔細分析),調用UpdateControlState()更新,將信息回調給其它
-
創建了一個controller_task_去定時的做
UpdateControllerWithTimeInterval()
接下來會通過介紹cc-controller下最重要的幾個函數來介紹碼率控制的核心過程,其分別是OnProcessInterval()
和OnTransportPacketsFeedback()
,前者根據時間流逝定時更新碼率, 后者需要借助於cc-feedback的到來才能更新碼率, 這兩個函數涉及到的類都很廣,如果把里面的類一次性介紹到底的話,文章的邏輯結構性會很差,所以把其中涉及到的類都提出來點到為止,詳細的會放在后面去獨立介紹,可自行查閱。
2.2.2 定時檢測-OnProcessInterval()
GoogCcNetworkController::OnProcessInterval()
是cc-controller的核心函數之一,會定時的觸發,用來做帶寬檢測和更新:
NetworkControlUpdate GoogCcNetworkController::OnProcessInterval(
ProcessInterval msg) {
NetworkControlUpdate update;
if (initial_config_) {
// 重設loss_based和delay_based碼率探測器和probe的初始碼率
// 獲得碼率探測簇配置(probe_cluster_config)
update.probe_cluster_configs =
ResetConstraints(initial_config_->constraints);
// 獲取當前pacing 的發送碼率, padding, time_windows等
update.pacer_config = GetPacingRates(msg.at_time);
// probe探測完成后,允許其因為alr需要快速恢復碼率而繼續做probe
if (initial_config_->stream_based_config.requests_alr_probing) {
probe_controller_->EnablePeriodicAlrProbing(
*initial_config_->stream_based_config.requests_alr_probing);
}
absl::optional<datarate> total_bitrate =
initial_config_->stream_based_config.max_total_allocated_bitrate;
if (total_bitrate) {
// 為probe設置最大的分配碼率(MaxTotalAllocatedBitrate)作為探測的上邊界
// 並生成響應的probe_cluster_config去進行探測
auto probes = probe_controller_->OnMaxTotalAllocatedBitrate(
total_bitrate->bps(), msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
max_total_allocated_bitrate_ = *total_bitrate;
}
// 釋放initial_config_,下次進來就不通過init_config做初始化了
initial_config_.reset();
}
// 更新擁塞窗口中的pacing數據長度
if (congestion_window_pushback_controller_ && msg.pacer_queue) {
congestion_window_pushback_controller_->UpdatePacingQueue(
msg.pacer_queue->bytes());
}
// 更新碼率
bandwidth_estimation_->UpdateEstimate(msg.at_time);
// 檢測當前是否處於alr
absl::optional<int64_t> start_time_ms =
alr_detector_->GetApplicationLimitedRegionStartTime();
// 如果處於alr,告訴probe_controller處於alr,可以進行探測,進行快恢復
probe_controller_->SetAlrStartTimeMs(start_time_ms);
// 檢測當前是否因alr狀態而需要做probe了,獲取probe_cluster_config
auto probes = probe_controller_->Process(msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
if (rate_control_settings_.UseCongestionWindow() &&
last_packet_received_time_.IsFinite() && !feedback_max_rtts_.empty()) {
// 根據rtt和target_rate 更新當前擁塞控制窗口大小
UpdateCongestionWindowSize();
}
if (congestion_window_pushback_controller_ && current_data_window_) {
// 重新設置擁塞控制窗口大小
congestion_window_pushback_controller_->SetDataWindow(
*current_data_window_);
} else {
update.congestion_window = current_data_window_;
}
// 獲取更新后的碼率,probe等,同時對alr, probe_controller中的碼率進行更新
MaybeTriggerOnNetworkChanged(&update, msg.at_time);
return update;
}
GoogCcNetworkController::OnProcessInterval()
中:
-
在第一次調用該函數時,使用initial_config_設置DelayBasedBwe, SendSideBandwidthEstimation, ProbeController中的初始碼率,ProbeController設置完碼率之后會返回一個probe_cluster_config(探測簇), probe_cluster_config會返回給pacing_controller,pacing_controller在發包的時候使用其中的碼率去發包以配合碼率探測。
-
為ProbeController設置最大分配碼率(MaxTotalAllocatedBitrate),這個值在ProbeController中會被用來做探測的上邊界,一旦探測的碼率到達這個值,就停止普通探測。
-
過了初始化后,SendSideBandwidthEstimation(也就是bandwidth_estimation_)會基於時間更新碼率,其內部雖然是依靠cc-feedback提供丟包率來預估碼率,當沒有feedback也會基於時間預估當前的rtt去更新碼率。
-
從AlrDetector獲取當前是否處於alr狀態,AlrDetector在每次發送數據時(OnSentPacket)都會檢測實際發送碼率是否與目標碼率相差太多懸殊,從而判斷是否(受限於編碼器等原因而導致)無法達到目標碼率,從而設定處於alr狀態,alr狀態非常有用,帶寬預測的核心是需要向鏈路中發送足夠的包去觀察鏈路情況,如果探測到處於alr狀態無法達到這個要求,就需要一些額外手段去處理。
-
設置ProbeController處於alr狀態。ProbeController內完整了初始的在正常探測后就不再探測了,但如果處於alr狀態或者網絡變化的狀態,是需要對網絡進行探測以便於網絡的快恢復;
-
從ProbeController獲取probe_cluster_config,以進行需要可能的探測
-
根據rtt和congestion重新計算擁塞窗口控制器中的的數據大小(CongestionWindowPushbackController)
-
bandwidth_estimation_可能對碼率進行了更新,調用
MaybeTriggerOnNetworkChanged()
將更新的碼率同步到alr,probe_controller中,同時將碼率,probe_config等放到update中返回
2.2.3 cc-feedback
2.2.3.1 cc-feedback報文
在介紹cc-controler中另一個重要的函數OnTransportPacketsFeedback()
前,因其在收到cc-feedback時觸發。所以先介紹cc-feedback,cc-feedback協議的設計和詳情可見R2. transport-cc-feedback草案或R5. WebRTC研究:Transport-cc之RTP及RTCP, 都介紹的非常詳細易懂。
簡單從報文介紹一下我們能從cc-feedback拿到什么:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT=15 | PT=205 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
0 | SSRC of packet sender |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
4 | SSRC of media source |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
8 | base sequence number | packet status count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
12 | reference time | fb pkt. count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
16 | packet chunk | packet chunk |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| packet chunk | recv delta | recv delta |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| recv delta | recv delta | zero padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
cc-feedback的PT=205, FMT=15, 從base sequence number開始就是cc-feedback的報文主體:
base sequence number:TransportFeedback包中記錄的第一個RTP包的transport sequence number
packet status count: 表示這個TransportFeedback包記錄了多少個RTP包信息
reference time: 基准時間,以64ms為單位,可以和下面的recv delta求和得到包的接收時間
fb pkt. count: 當前feedback的序列號,用於檢測cc-feedback是否丟包
后面會跟着兩個數組,代表着transport number以base sequence number為基准遞增的包的相關信息
packet chunk: 當前包的到達狀態(到達\丟失),
recv delta: 接收時間delta,要和reference time求和才能得到真正的接收時間。
可以看到cc-feedback中能得到包的接收狀態和時間。
2.2.3.2 transprot-sequence-number
對於cc-feedback,說明一下webrtc的整體處理過程。
webrtc為每個rtp packet添加了一個transport-cc number的rtp extension用來標識每個包的傳輸序列號,見官方草案描述:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0xBE | 0xDE | length=1 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ID | L=1 |transport-wide sequence number | zero padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
添加該number的主要是分離媒體(使用sequence number)和網絡處理(使用transport number)。
在RTPSenderVideo::SendVideo()中使用AllocatePacket()為每幀的數據生成rtp packet的時,默認會為當前packet保留一些rtp-extension, 其中就包括了TransportSequenceNumber。
std::unique_ptr<rtppackettosend> RTPSender::AllocatePacket() const {
...
// Reserve extensions, if registered, RtpSender set in SendToNetwork.
packet->ReserveExtension<absolutesendtime>();
packet->ReserveExtension<transmissionoffset>();
packet->ReserveExtension<transportsequencenumber>();//<----
...
}
- extension register
(接下來這段介紹的是extension 的register過程,不感興趣的可以不看)
正如上面的AllocatePacket()
中的注釋所言,保存這些extension,如果這些extension注冊了,那么RtpSender中會對這些extension進行設值; extension Register的過程要從RtpVideoSender溯源,其初始化時將將傳入的rtp_config.extension設置到了每個stream的rtp_rtcp中
RtpVideoSender::RtpVideoSender(....){ // 實在太長了,省略一些參數,而不是一個變參構造函數
...
// RTP/RTCP initialization.
for (size_t i = 0; i < rtp_config_.extensions.size(); ++i) {
const std::string& extension = rtp_config_.extensions[i].uri;
// 將rtp_config中的所有extension設置到stream對應的rtp_rtcp module下
int id = rtp_config_.extensions[i].id;
RTC_DCHECK(RtpExtension::IsSupportedForVideo(extension));
for (const RtpStreamSender& stream : rtp_streams_) {
// rtp_rtcp module注冊這些extension
stream.rtp_rtcp->RegisterRtpHeaderExtension(extension, id);
}
}
...
}
rtp_rtcp將其轉發到packet_generator(實則RTPSender)
void ModuleRtpRtcpImpl2::RegisterRtpHeaderExtension(absl::string_view uri,
int id) {
// 轉發到packet_generator
bool registered =
rtp_sender_->packet_generator.RegisterRtpHeaderExtension(uri, id);
RTC_CHECK(registered);
}
RTPSender注冊該extension, 然后會看到一個很重要的變量supports_bwe_extension_會被HasBweExtension()檢測更新,根據是否已注冊了所有的bwe extension設置為true,這個變量決定能否使用padding功能(帶寬探測時,當前數據量達不到目標發送碼率,用一些歷史包或者空數據做帶寬填充)
bool RTPSender::RegisterRtpHeaderExtension(absl::string_view uri, int id) {
MutexLock lock(&send_mutex_);
bool registered = rtp_header_extension_map_.RegisterByUri(id, uri);// 注冊該extension
supports_bwe_extension_ = HasBweExtension(rtp_header_extension_map_);
UpdateHeaderSizes();
return registered;
}
// bwe所需extension
bool HasBweExtension(const RtpHeaderExtensionMap& extensions_map) {
return extensions_map.IsRegistered(kRtpExtensionTransportSequenceNumber) ||
extensions_map.IsRegistered(kRtpExtensionTransportSequenceNumber02) ||
extensions_map.IsRegistered(kRtpExtensionAbsoluteSendTime) ||
extensions_map.IsRegistered(kRtpExtensionTransmissionTimeOffset);
}
//padding的支持需要bwe extension
bool RTPSender::SupportsPadding() const {
MutexLock lock(&send_mutex_);
return sending_media_ && supports_bwe_extension_;
}
extension register介紹到此為止,看完了整個過程也沒有很明確的找到按照注釋所言--"發現注冊了這個extension,然后才對它這個extension設值"的處理, 但是還是提及到了一些重要的probing的東西
2.2.3.3 feedback packet的生成
最后在paced發送packet的過程中,當packet到達PacketRouter時,將會檢測其是否有TransportSequenceNumber, 如果有則將transport_sequence_number設置到到packet的頭部
void PacketRouter::SendPacket(std::unique_ptr<rtppackettosend> packet,
const PacedPacketInfo& cluster_info) {
...
MutexLock lock(&modules_mutex_);
// 設置transpoort sequence number
if (packet->HasExtension<transportsequencenumber>()) {
packet->SetExtension<transportsequencenumber>((++transport_seq_) & 0xFFFF);
}
...
}
之后,當packet經過RtpSenderEgress模塊的時,在RtpSenderEgress::SendPacket()中會提取其transport_sequence_number生成feedback包,整個流程如下:
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
...
if (auto packet_id = packet->GetExtension<transportsequencenumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true;
options.included_in_allocation = true;
// 添加該packet到feedback
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
...
}
構造packet_info,通知feedback_ovserver添加該包
void RtpSenderEgress::AddPacketToTransportFeedback(
uint16_t packet_id,
const RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info) {
if (transport_feedback_observer_) {
size_t packet_size = packet.payload_size() + packet.padding_size();
if (send_side_bwe_with_overhead_) {
packet_size = packet.size();
}
// 構造packet_info
RtpPacketSendInfo packet_info;
packet_info.ssrc = ssrc_;
packet_info.transport_sequence_number = packet_id;
packet_info.rtp_sequence_number = packet.SequenceNumber();
packet_info.length = packet_size;
packet_info.pacing_info = pacing_info;
packet_info.packet_type = packet.packet_type();
// 通知feedback_ovserver添加該包
transport_feedback_observer_->OnAddPacket(packet_info);
}
}
告知RtpTransportControllerSend有包發送了, 調用transport_feedbadck_adapter_為其生成feedback包
void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) {
feedback_demuxer_.AddPacket(packet_info);
Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, packet_info, creation_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
// 往adapter_添加feedback
transport_feedback_adapter_.AddPacket(
packet_info,
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0,
creation_time);
});
}
TransportFeedbackAdapter生成feedback packet,將其存入history_中
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
size_t overhead_bytes,
Timestamp creation_time) {
// 生成feedback包
PacketFeedback packet;
packet.creation_time = creation_time;
packet.sent.sequence_number =
seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number);
packet.sent.size = DataSize::Bytes(packet_info.length + overhead_bytes);
packet.sent.audio = packet_info.packet_type == RtpPacketMediaType::kAudio;
packet.network_route = network_route_;
packet.sent.pacing_info = packet_info.pacing_info;
while (!history_.empty() &&
creation_time - history_.begin()->second.creation_time >
kSendTimeHistoryWindow) {
// TODO(sprang): Warn if erasing (too many) old items?
if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
// 以transport_sequence_number和packet為key-valiue,存入history_中
history_.insert(std::make_pair(packet.sent.sequence_number, packet));
}
2.2.3.4 feedback packet再賦值
在收到cc-feedback的rtcp包的時候,會經過層層轉發到RTCPReceiver,
void RTCPReceiver::IncomingPacket(rtc::ArrayView<const uint8_t=""> packet) {
if (packet.empty()) {
RTC_LOG(LS_WARNING) << "Incoming empty RTCP packet";
return;
}
PacketInformation packet_information;
// 解析rtcp
if (!ParseCompoundPacket(packet, &packet_information))
return;
// 轉發
TriggerCallbacksFromRtcpPacket(packet_information);
}
RTCPReceiver::IncomingPacket()中:
-
使用ParseCompoundPacket()對報文進行解析, ParseCompoundPacket()是一個非常精華的函數,可以再里面找到所有有關的RTCP包的解析(RR,SR,SDES, NACK, CC-FeedBack, Pli, Fir等),其內部會調用HandleTransportFeedback()將cc-feedback解析成transport_feedback,放到packet-information中
void RTCPReceiver::HandleTransportFeedback( const CommonHeader& rtcp_block, PacketInformation* packet_information) { // 解析rtcp_block 生成transport_feedback std::unique_ptr<rtcp::transportfeedback> transport_feedback( new rtcp::TransportFeedback()); if (!transport_feedback->Parse(rtcp_block)) { ++num_skipped_packets_; return; } packet_information->packet_type_flags |= kRtcpTransportFeedback; packet_information->transport_feedback = std::move(transport_feedback); }
-
然后調用TriggerCallbacksFromRtcpPacket()去轉發該RTCP包.
TriggerCallbacksFromRtcpPacket()中會將解析出來的transport_feedback轉發到RtpTransportControllerSend
void RTCPReceiver::TriggerCallbacksFromRtcpPacket(
const PacketInformation& packet_information) {
...
if (transport_feedback_observer_ &&
(packet_information.packet_type_flags & kRtcpTransportFeedback)) {
uint32_t media_source_ssrc =
packet_information.transport_feedback->media_ssrc();
if (media_source_ssrc == local_ssrc ||
registered_ssrcs.find(media_source_ssrc) != registered_ssrcs.end()) {
// 將包轉給RtpTransportControllerSend處理
transport_feedback_observer_->OnTransportFeedback(
*packet_information.transport_feedback);
}
}
...
}
RtpTransportControllerSend將transport_feedback交給TransportFeedbackAdapter解析,獲得feedback_msg
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
feedback_demuxer_.OnTransportFeedback(feedback);
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
// 解析cc-feedback包獲得feedback_msg
absl::optional<transportpacketsfeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
feedback_time);
if (feedback_msg && controller_) {
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
}
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
});
}
ProcessTransportFeedback()中可以清楚的看到feedback_msg的結構
absl::optional<transportpacketsfeedback>
TransportFeedbackAdapter::ProcessTransportFeedback(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_receive_time) {
if (feedback.GetPacketStatusCount() == 0) {
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
return absl::nullopt;
}
TransportPacketsFeedback msg;
msg.feedback_time = feedback_receive_time;
msg.prior_in_flight = in_flight_.GetOutstandingData(network_route_);
// feedback packet 再賦值
msg.packet_feedbacks =
ProcessTransportFeedbackInner(feedback, feedback_receive_time);
if (msg.packet_feedbacks.empty())
return absl::nullopt;
auto it = history_.find(last_ack_seq_num_);
if (it != history_.end()) {
msg.first_unacked_send_time = it->second.sent.send_time;
}
msg.data_in_flight = in_flight_.GetOutstandingData(network_route_);
return msg;
}
其中就有我們最關心的packet_feedbacks, 其是通過調用ProcessTransportFeedbackInner()生成的:
std::vector<packetresult>
TransportFeedbackAdapter::ProcessTransportFeedbackInner(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_receive_time) {
// Add timestamp deltas to a local time base selected on first packet arrival.
// This won't be the true time base, but makes it easier to manually inspect
// time stamps.
// 此處有一個很細節的地方,為了使得timestamp能夠被夠好的檢視,沒有直接使用cc-feedback
// 中的reference time(基准時間),而是用了本地的feedback到達時間(feedback_receive_time)作為基准時間(current_offset_)
// 在后續不斷的feedback包到達的時候,將cc-feedback之間的reference time的delta,累加到current_offset_中。
if (last_timestamp_.IsInfinite()) {
current_offset_ = feedback_receive_time;
} else {
// TODO(srte): We shouldn't need to do rounding here.
// 計算當前的Base time和之前的Base time的差
const TimeDelta delta = feedback.GetBaseDelta(last_timestamp_)
.RoundDownTo(TimeDelta::Millis(1));
// Protect against assigning current_offset_ negative value.
if (delta < Timestamp::Zero() - current_offset_) {
// current_offset_負數情況下,直接將current_offset_置為feedback_receive_time
RTC_LOG(LS_WARNING) << "Unexpected feedback timestamp received.";
current_offset_ = feedback_receive_time;
} else {
// current_offset 正常,則直接等於即可
current_offset_ += delta;
}
}
last_timestamp_ = feedback.GetBaseTime();
std::vector<packetresult> packet_result_vector;
packet_result_vector.reserve(feedback.GetPacketStatusCount());
size_t failed_lookups = 0;
size_t ignored = 0;
TimeDelta packet_offset = TimeDelta::Zero();
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num = seq_num_unwrapper_.Unwrap(packet.sequence_number());
if (seq_num > last_ack_seq_num_) {
// Starts at history_.begin() if last_ack_seq_num_ < 0, since any valid
// sequence number is >= 0.
for (auto it = history_.upper_bound(last_ack_seq_num_);
it != history_.upper_bound(seq_num); ++it) {
in_flight_.RemoveInFlightPacketBytes(it->second);
}
last_ack_seq_num_ = seq_num;
}
// 根據transport seqnumber從history,將包取出來進行再賦值
auto it = history_.find(seq_num);
if (it == history_.end()) {
++failed_lookups;
continue;
}
if (it->second.sent.send_time.IsInfinite()) {
// TODO(srte): Fix the tests that makes this happen and make this a
// DCHECK.
RTC_DLOG(LS_ERROR)
<< "Received feedback before packet was indicated as sent";
continue;
}
PacketFeedback packet_feedback = it->second;
if (packet.received()) {
packet_offset += packet.delta();
// receive_time = base + delta
packet_feedback.receive_time =
current_offset_ + packet_offset.RoundDownTo(TimeDelta::Millis(1));
// Note: Lost packets are not removed from history because they might be
// reported as received by a later feedback.
history_.erase(it);
}
if (packet_feedback.network_route == network_route_) {
PacketResult result;
result.sent_packet = packet_feedback.sent;
result.receive_time = packet_feedback.receive_time;
packet_result_vector.push_back(result);
} else {
++ignored;
}
}
....
return packet_result_vector;
}
ProcessTransportFeedbackInner()是transport packet再賦值的核心函數,其主要
- 根據transport seqnumber將packet從history_獲取出來,然后對其接收時間和接收狀態的再賦值
- receive_time有個很細節的地方,並沒有直接使用cc-feedback中的reference time(基礎時間偏移),而是把第一個收到cc-feedback的時刻(feedback_receive_time)作為最初基准時間偏移(current_offset_),在后續的cc-feedback報文到達后,計算之前cc-feedback的feedback_receive_time和當前的delta,累加到current_offset _上作為后續的基礎時間偏移,根據注釋所言,是為了能夠更好的檢視包的到達時間。
至此,feed packet就完成了再復制,接下來會被傳到cc模塊中去更新碼率預估;
2.2.3.5 cc-feedback 總結
cc-feedback的過程其實是發送和接收rtcp的過程,整個過程中涉及到比較多的類,很有總結的價值:
分成左右兩邊,右邊是生成feedback packet的過程,最初由RTPSenderVideo注冊cc-extension到RTPSender, RTPSender支持cc-extension后支持PacingController的padding,並生成帶有TransportSequenceNumber的包,轉發到PacketRouter后設置TransportSequenceNumber, 在網絡發送的過程中,經由RTPSenderEgress 將包的信息傳到RTPTransportController,讓其調用TransportFeedbackAdapter生成feedback packet; 而左邊則是RTCPReceiver收到cc-feedback后解析生成transport-feedback,交給RTPTransportController對feedback packet進行再更新,將最終的feedback packet交給cc-controller中,用於帶寬預估。
2.2.4 處理ccfeedback-OnTransportPacketsFeedback()
如2.2.3.5
中的圖所示,當cc-feedback到來后對feedback packet進行更新后,就會將feedback packet轉發到cc-controller:
處理的函數是GoogCcNetworkController::OnTransportPacketsFeedback(), 這是一個很長的函數,通過feedback去估算一個最終的碼率,和進一步的probe等。
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
if (report.packet_feedbacks.empty()) {
// TODO(bugs.webrtc.org/10125): Design a better mechanism to safe-guard
// against building very large network queues.
return NetworkControlUpdate();
}
if (congestion_window_pushback_controller_) {
// congestion_windows_pushback_controller 根據feedback更新
// 發送的數據
congestion_window_pushback_controller_->UpdateOutstandingData(
report.data_in_flight.bytes());
}
TimeDelta max_feedback_rtt = TimeDelta::MinusInfinity();
TimeDelta min_propagation_rtt = TimeDelta::PlusInfinity();
Timestamp max_recv_time = Timestamp::MinusInfinity();
// 遍歷獲取最大的包到達時間(feedback.receive_time)
std::vector<packetresult> feedbacks = report.ReceivedWithSendInfo();
for (const auto& feedback : feedbacks)
max_recv_time = std::max(max_recv_time, feedback.receive_time);
// 從feedback中統計rtt,更新到各個組件
// 遍歷獲取最大的feedback_rtt(包發出去到收到feed包)和propagation_rtt(包在網絡中傳輸的rtt,不包含在服務端pending的時間)
for (const auto& feedback : feedbacks) {
TimeDelta feedback_rtt =
report.feedback_time - feedback.sent_packet.send_time;
TimeDelta min_pending_time = feedback.receive_time - max_recv_time; // ??
TimeDelta propagation_rtt = feedback_rtt - min_pending_time;
max_feedback_rtt = std::max(max_feedback_rtt, feedback_rtt);
min_propagation_rtt = std::min(min_propagation_rtt, propagation_rtt);
}
// 更新PropagationRtt
if (max_feedback_rtt.IsFinite()) {
feedback_max_rtts_.push_back(max_feedback_rtt.ms());
const size_t kMaxFeedbackRttWindow = 32;
// 滑動窗口feedback_max_rtts,長度為32
if (feedback_max_rtts_.size() > kMaxFeedbackRttWindow)
feedback_max_rtts_.pop_front();
// TODO(srte): Use time since last unacknowledged packet.
bandwidth_estimation_->UpdatePropagationRtt(report.feedback_time,
min_propagation_rtt);
}
// 更新loss和delay estimation的rtt,注意
// loss使用的是feedback_min_rtt
// delay使用的是feedback_max_rtt
if (packet_feedback_only_) {
// 計算平均feed_back_max_rtt
if (!feedback_max_rtts_.empty()) {
// 計算平均feedback_rtt
int64_t sum_rtt_ms = std::accumulate(feedback_max_rtts_.begin(),
feedback_max_rtts_.end(), 0);
int64_t mean_rtt_ms = sum_rtt_ms / feedback_max_rtts_.size();
// 更新bwe的rtt
if (delay_based_bwe_)
delay_based_bwe_->OnRttUpdate(TimeDelta::Millis(mean_rtt_ms));
}
// 計算feedback_min_rtt,更新bandwidth_estimation_ rtt
TimeDelta feedback_min_rtt = TimeDelta::PlusInfinity();
// 這塊邏輯和上面計算feedback_max_rtt一樣,寫了重復代碼
for (const auto& packet_feedback : feedbacks) {
TimeDelta pending_time = packet_feedback.receive_time - max_recv_time;
TimeDelta rtt = report.feedback_time -
packet_feedback.sent_packet.send_time - pending_time;
// Value used for predicting NACK round trip time in FEC controller.
feedback_min_rtt = std::min(rtt, feedback_min_rtt);
}
if (feedback_min_rtt.IsFinite()) {
bandwidth_estimation_->UpdateRtt(feedback_min_rtt, report.feedback_time);
}
// 更新丟包率
// 上次更新丟包后到現在應該收到的包的總數
expected_packets_since_last_loss_update_ +=
report.PacketsWithFeedback().size();
for (const auto& packet_feedback : report.PacketsWithFeedback()) {
if (packet_feedback.receive_time.IsInfinite())
lost_packets_since_last_loss_update_ += 1;
}
// feedback_time大於丟包更新時間了,更新丟包率
if (report.feedback_time > next_loss_update_) {
next_loss_update_ = report.feedback_time + kLossUpdateInterval;
bandwidth_estimation_->UpdatePacketsLost(
lost_packets_since_last_loss_update_,
expected_packets_since_last_loss_update_, report.feedback_time);
expected_packets_since_last_loss_update_ = 0;
lost_packets_since_last_loss_update_ = 0;
}
}
// 獲取當前是否處於alr
absl::optional<int64_t> alr_start_time =
alr_detector_->GetApplicationLimitedRegionStartTime();
// 告知acknowledge和probe_controller,當前不再處於alr
if (previously_in_alr_ && !alr_start_time.has_value()) {
int64_t now_ms = report.feedback_time.ms();
acknowledged_bitrate_estimator_->SetAlrEndedTime(report.feedback_time);
probe_controller_->SetAlrEndedTimeMs(now_ms);
}
previously_in_alr_ = alr_start_time.has_value();
// 預估接收端吞吐量
acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector(
report.SortedByReceiveTime());
auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate();
// 將其設置到bandwidth_estimation_中去更新鏈路容量(link_capacity)
bandwidth_estimation_->SetAcknowledgedRate(acknowledged_bitrate,
report.feedback_time);
bandwidth_estimation_->IncomingPacketFeedbackVector(report);
for (const auto& feedback : report.SortedByReceiveTime()) {
if (feedback.sent_packet.pacing_info.probe_cluster_id !=
PacedPacketInfo::kNotAProbe) {
// probe_estimator 根據返回的feedback更新帶寬探測的計算
probe_bitrate_estimator_->HandleProbeAndEstimateBitrate(feedback);
}
}
if (network_estimator_) {
// 這一塊暫時還在開發中,目前還未使用,不太清楚干什么
network_estimator_->OnTransportPacketsFeedback(report);
auto prev_estimate = estimate_;
estimate_ = network_estimator_->GetCurrentEstimate();
// TODO(srte): Make OnTransportPacketsFeedback signal whether the state
// changed to avoid the need for this check.
if (estimate_ && (!prev_estimate || estimate_->last_feed_time !=
prev_estimate->last_feed_time)) {
event_log_->Log(std::make_unique<rtceventremoteestimate>(
estimate_->link_capacity_lower, estimate_->link_capacity_upper));
}
}
// 獲取上面循環更新probe_estimator的最終的結果
absl::optional<datarate> probe_bitrate =
probe_bitrate_estimator_->FetchAndResetLastEstimatedBitrate();
// 如果enable probe < network_estimate時 忽略probe的特性,則忽略probe_bitrate
if (ignore_probes_lower_than_network_estimate_ && probe_bitrate &&
estimate_ && *probe_bitrate < delay_based_bwe_->last_estimate() &&
*probe_bitrate < estimate_->link_capacity_lower) {
probe_bitrate.reset();
}
// 如果enable
// 將probe略小於throughput_estimate_(預估吞吐量)的特性
// 對probe現在acknowledged_bitrate(鏈路吞吐量)下
if (limit_probes_lower_than_throughput_estimate_ && probe_bitrate &&
acknowledged_bitrate) {
// Limit the backoff to something slightly below the acknowledged
// bitrate. ("Slightly below" because we want to drain the queues
// if we are actually overusing.)
// The acknowledged bitrate shouldn't normally be higher than the delay
// based estimate, but it could happen e.g. due to packet bursts or
// encoder overshoot. We use std::min to ensure that a probe result
// below the current BWE never causes an increase.
DataRate limit =
std::min(delay_based_bwe_->last_estimate(),
*acknowledged_bitrate * kProbeDropThroughputFraction);
probe_bitrate = std::max(*probe_bitrate, limit);
}
NetworkControlUpdate update;
bool recovered_from_overuse = false;
bool backoff_in_alr = false;
// 使用feedback進行bwe預測,獲得基於延遲的碼率估計
DelayBasedBwe::Result result;
result = delay_based_bwe_->IncomingPacketFeedbackVector(
report, acknowledged_bitrate, probe_bitrate, estimate_,
alr_start_time.has_value());
if (result.updated) {
// 預估碼率更新了
if (result.probe) {
// bwe使用了探測碼率進行重設
// bandwidth_estimation_也進行重設sendbitrate
bandwidth_estimation_->SetSendBitrate(result.target_bitrate,
report.feedback_time);
}
// Since SetSendBitrate now resets the delay-based estimate, we have to
// call UpdateDelayBasedEstimate after SetSendBitrate.
// 更新bandwidth_estimation_中基於延遲的估計碼率
bandwidth_estimation_->UpdateDelayBasedEstimate(report.feedback_time,
result.target_bitrate);
// Update the estimate in the ProbeController, in case we want to probe.
// 將變化的碼率通知到probe_controller, alr_detector, congestion_window等
MaybeTriggerOnNetworkChanged(&update, report.feedback_time);
}
recovered_from_overuse = result.recovered_from_overuse;
backoff_in_alr = result.backoff_in_alr;
if (recovered_from_overuse) {
// 從overuse中恢復了,重設alr start 時間
probe_controller_->SetAlrStartTimeMs(alr_start_time);
// 獲取接下來要做帶寬探測的參數,放到update中
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
} else if (backoff_in_alr) {
// 如果在alr中做了碼率回退,進行新一輪的探測?
// If we just backed off during ALR, request a new probe.
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
}
// No valid RTT could be because send-side BWE isn't used, in which case
// we don't try to limit the outstanding packets.
if (rate_control_settings_.UseCongestionWindow() &&
max_feedback_rtt.IsFinite()) {
// TODO 這個window需要花時間看一看,直接一直沒有看這個東西
UpdateCongestionWindowSize();
}
if (congestion_window_pushback_controller_ && current_data_window_) {
// 如果有congestion_window_pushback_controller_,將當前的窗口放在通知器下回推給編碼器
congestion_window_pushback_controller_->SetDataWindow(
*current_data_window_);
} else {
// 否則,直接放在結果中
update.congestion_window = current_data_window_;
}
// 返回結果
return update;
}
其主要做了:
- 為所有feedback計算propagation_rtt和feedback_rtt(前者是packet單純在網絡上傳輸的時間,而后者包含了包到達服務器后pending的時間)。從中得到max_feedback_rtt, min_propagation_rtt, feedback_min_rtt.以min_propagation_rtt的計算為例:
for (const auto& feedback : feedbacks) {
TimeDelta feedback_rtt =
report.feedback_time - feedback.sent_packet.send_time;
TimeDelta min_pending_time = feedback.receive_time - max_recv_time; // ??
TimeDelta propagation_rtt = feedback_rtt - min_pending_time;
max_feedback_rtt = std::max(max_feedback_rtt, feedback_rtt);
min_propagation_rtt = std::min(min_propagation_rtt, propagation_rtt);
}
feedback_rtt和propagation_rtt按照下圖的方式進行計算(但至今讓我疑惑的是min_pending_time其表示packet在接收端最小的等待時間,計算應該是max_recv_time - feedback.receive_time,但實際上卻反過來導致結果是一個負數,提了一個issue,還未得到解答.)
-
使用bandwidth_estimation->UpdatePropagationRtt()將min_propagation_rtt更新到其PropagationRtt中; 使用一個隊列(feedback_max_rtts)保存最新的32個feedback_max_rtts,然后計算得到一個均值(mean_rtt_ms),使用delay_based_bwe->OnRttUpdate()更新到基於延遲的碼率預估器中(delay_based_bwe)的rtt;使用bandwidth_estimation_->UpdateRtt()將feedback_min_rtt更新到其rtt。
-
計算丟包率,使用bandwidth_estimation->UpdatePacketsLost()將丟包率更新到bandwidth_estimation中,
bandwidth_estimation內部會根據丟包率,調整碼率.
-
通過alr_detector檢測當前是否處於alr狀態,如果不是,則告知acknowledged_bitrate_estimator,probe_controller 當前不處於alr狀態
-
使用acknowledged_bitrate_estimator根據feedback計算接收端吞吐量(acknowledged_bitrate)。將接收端吞吐量更新到bandwidth_estimation中
-
使用碼率探測器(probe_bitrate_estimator)基於返回的feedback計算探測碼率(probe_bitrate)。 將接收端吞吐量(acknowledged_bitrate),探測碼率(probe_bitrate), feedback等放到基於延遲的碼率預估器(delay_based_bwe)做碼率預測
-
將delay_based_bwe預估的碼率放入bandwidth_estimation中, 讓bandwidth_estimation重新綜合修正最終目標碼率;
-
調用MaybeTriggerOnNetworkChanged()獲得最終碼率, 將新碼率設置到alr_detector, probe_controller等, 根據最新碼率和rtt更新擁塞窗口大小(congestion window size), 將最新的congestion window size設置到congestion_window_pushback_controller中.
2.2.5 更新網絡與結果-MaybeTriggerOnNetworkChanged()
MaybeTriggerOnNetworkChanged()是在估算完最終的碼率的時候,把新的碼率更新到alr_detector,probe_controller中,並生成相關結果返回給pacing_controller使用:
void GoogCcNetworkController::MaybeTriggerOnNetworkChanged(
NetworkControlUpdate* update,
Timestamp at_time) {
// 從bandwidth_estimation 獲取丟包率, rtt,目標碼率
uint8_t fraction_loss = bandwidth_estimation_->fraction_loss();
TimeDelta round_trip_time = bandwidth_estimation_->round_trip_time();
DataRate loss_based_target_rate = bandwidth_estimation_->target_rate();
DataRate pushback_target_rate = loss_based_target_rate;
BWE_TEST_LOGGING_PLOT(1, "fraction_loss_%", at_time.ms(),
(fraction_loss * 100) / 256);
BWE_TEST_LOGGING_PLOT(1, "rtt_ms", at_time.ms(), round_trip_time.ms());
BWE_TEST_LOGGING_PLOT(1, "Target_bitrate_kbps", at_time.ms(),
loss_based_target_rate.kbps());
double cwnd_reduce_ratio = 0.0;
if (congestion_window_pushback_controller_) {
// 更新擁塞控制窗口中的目標碼率,同時獲取新的擁塞控制碼率
int64_t pushback_rate =
congestion_window_pushback_controller_->UpdateTargetBitrate(
loss_based_target_rate.bps());
// 不能大於min_bitrate_configured_.bps<int>();
pushback_rate = std::max<int64_t>(bandwidth_estimation_->GetMinBitrate(),
pushback_rate);
pushback_target_rate = DataRate::BitsPerSec(pushback_rate);
if (rate_control_settings_.UseCongestionWindowDropFrameOnly()) {
// 如果rate_control僅使用了丟幀碼率控制,通過預估碼率和擁塞控制碼率得到丟幀率
cwnd_reduce_ratio = static_cast<double>(loss_based_target_rate.bps() -
pushback_target_rate.bps()) /
loss_based_target_rate.bps();
}
}
// 保守碼率會從(stable_target_rate) 會從link_capacity,loss_based_target_rate, pushback_target_rate中的最小的取得
DataRate stable_target_rate =
bandwidth_estimation_->GetEstimatedLinkCapacity();
if (loss_based_stable_rate_) {
stable_target_rate = std::min(stable_target_rate, loss_based_target_rate);
} else {
stable_target_rate = std::min(stable_target_rate, pushback_target_rate);
}
// 保存最新的
if ((loss_based_target_rate != last_loss_based_target_rate_) ||
(fraction_loss != last_estimated_fraction_loss_) ||
(round_trip_time != last_estimated_round_trip_time_) ||
(pushback_target_rate != last_pushback_target_rate_) ||
(stable_target_rate != last_stable_target_rate_)) {
last_loss_based_target_rate_ = loss_based_target_rate;
last_pushback_target_rate_ = pushback_target_rate;
last_estimated_fraction_loss_ = fraction_loss;
last_estimated_round_trip_time_ = round_trip_time;
last_stable_target_rate_ = stable_target_rate;
// 更新alr探測中的目標碼率
alr_detector_->SetEstimatedBitrate(loss_based_target_rate.bps());
// 獲取到下次bwe overuse 的時間
TimeDelta bwe_period = delay_based_bwe_->GetExpectedBwePeriod();
TargetTransferRate target_rate_msg;
target_rate_msg.at_time = at_time;
if (rate_control_settings_.UseCongestionWindowDropFrameOnly()) {
// 僅使用擁塞控制窗口僅丟幀(CongestionWindowDropFrameOnly),此時
// 動態調整編碼器碼率,直接使用預估碼率作為目標碼率
target_rate_msg.target_rate = loss_based_target_rate;
target_rate_msg.cwnd_reduce_ratio = cwnd_reduce_ratio;
} else {
// 否則的話,需要調整編碼器碼率,則將pushback_target_rate作為目標碼率
target_rate_msg.target_rate = pushback_target_rate;
}
target_rate_msg.stable_target_rate = stable_target_rate;
target_rate_msg.network_estimate.at_time = at_time;
target_rate_msg.network_estimate.round_trip_time = round_trip_time;
target_rate_msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0f; // fraction_loss計算的時候乘了256
target_rate_msg.network_estimate.bwe_period = bwe_period;
update->target_rate = target_rate_msg;
// 將最新碼率放入probe_controller中, 獲得需要做的碼率探測
auto probes = probe_controller_->SetEstimatedBitrate(
loss_based_target_rate.bps(), at_time.ms());
update->probe_cluster_configs.insert(update->probe_cluster_configs.end(),
probes.begin(), probes.end());
//獲取pacing rate, 原理就是設置一個1s的窗口,把target_rate放過去
update->pacer_config = GetPacingRates(at_time);
RTC_LOG(LS_VERBOSE) << "bwe " << at_time.ms() << " pushback_target_bps="
<< last_pushback_target_rate_.bps()
<< " estimate_bps=" << loss_based_target_rate.bps();
}
}
GoogCcNetworkController::MaybeTriggerOnNetworkChanged()中:
-
首先從bandwidth_estimation獲取rtt, 丟包率,目標碼率
-
然后通過目標碼率更新擁塞控制窗口的發送碼率(pushback_target_rate)后,利用擁塞窗口的發送碼率減去當前預估的目標碼率,求得一個碼率降低率(cwnd_reduce_ratio), 這個值是用來控制編碼的
-
計算一個保守碼率(stable_target_rate), 該保守碼率從以下三者最小的取得:
鏈路容量(link_capacity): 基於目標碼率和當前吞吐量(acknowledge)做指數平滑估計出來的鏈路容量
丟包碼率(loss_based_target_rate): 基於丟包估算出來的鏈路碼率,該碼率並以延遲預估碼率為上限
窗口控制碼率(pushback_target_rate):基於loss_based_target_rate和當前窗口的使用程度縮放后的碼率
-
將新的目標碼率更新至Alr_Detector中
-
根據是否啟用擁塞窗口僅丟幀控制碼率的特性, 如果是,編碼器碼率的調整是直接丟幀的調整的,此時直接使用loss_based_target_rate作為目標碼率,否則的話,使用窗口控制碼率(pushback_target_rate)作為目標碼率
-
將最新碼率更新到probe_controller中, 同時獲得可能需要做的碼率探測
-
通過GetPacingRates()獲取節律發送(pacer)控制
PacerConfig GoogCcNetworkController::GetPacingRates(Timestamp at_time) const { // Pacing rate is based on target rate before congestion window pushback, // because we don't want to build queues in the pacer when pushback occurs. // 此處的pacing rate使用的是last_loss_based_target_rate_, 這個值沒有經過擁塞窗口的更新處理 // 但是沒太看懂注釋,"當退避產生的時候不想在pacer創建隊列",因為pacer有兩種,一種是有queue的 // 一種是無queue的,可能想要表達的是congestion push back不應用在有queue的隊列上? DataRate pacing_rate = std::max(min_total_allocated_bitrate_, last_loss_based_target_rate_) * pacing_factor_; // padding_rate 主要的值還是max_padding_rate_,這是一個來自於外部(bitrateAllocation)計算的一個值 // 其次,它肯定不能大於窗口控制的碼率(last_pushback_target_rate_) DataRate padding_rate = std::min(max_padding_rate_, last_pushback_target_rate_); PacerConfig msg; msg.at_time = at_time; msg.time_window = TimeDelta::Seconds(1);//1s msg.data_window = pacing_rate * msg.time_window; msg.pad_window = padding_rate * msg.time_window; return msg; }
cc-controller的介紹到此為止,肯定是看到雲里霧里; 接下來會分小節,進一步介紹涉及到的AlrDetector, ProbeController , ProbeBitrateEstimator , AcknowledgedBitrateEstimator , DelayBasedBwe , SendSideBandwidthEstimation, CongestionWindowPushbackController,查閱完這些再回頭看cc-controller碼控過程就容易了。
2.3 碼率受限探測器-AlrDetector
AlrDetector(Application Limit Region Detector)碼率受限探測器, 正如其名, 是用來檢測當前的發送碼率是否因編碼器等其它應用遠低於目標碼率的情況. 在每次發送數據的時候,cc-controller的OnSentPacket()將會被更新前發送的數據大小和時間, 這些信息會被傳遞到AlrDetector中進行更新, 同時檢測當前是否處於alr狀態
NetworkControlUpdate GoogCcNetworkController::OnSentPacket(
SentPacket sent_packet) {
// 將發送數據和時間更新到alr
alr_detector_->OnBytesSent(sent_packet.size.bytes(),
sent_packet.send_time.ms());
// 告知acknowledged_bitrate_estimator_是否處於alr狀態
acknowledged_bitrate_estimator_->SetAlr(
alr_detector_->GetApplicationLimitedRegionStartTime().has_value());
...
}
AlrDetector::OnBytesSent()是進行更新檢測的地方, 原理很簡單, 內部設置了一個alr_budget, alr_budget的大小會隨着流逝的時間增加(delta_ms()間隔時間 * target_rate(目標碼率)), 並發送數據大小byte_send記錄當前已使用的budget, 然后計算使用的budget是否達不到一個預設的比例, 判斷當前發送碼率是否過低, 從而開啟alr狀態
void AlrDetector::OnBytesSent(size_t bytes_sent, int64_t send_time_ms) {
if (!last_send_time_ms_.has_value()) {
last_send_time_ms_ = send_time_ms;
// Since the duration for sending the bytes is unknwon, return without
// updating alr state.
return;
}
int64_t delta_time_ms = send_time_ms - *last_send_time_ms_;
last_send_time_ms_ = send_time_ms;
// 使用budget
alr_budget_.UseBudget(bytes_sent);
// 更新budget
alr_budget_.IncreaseBudget(delta_time_ms);
bool state_changed = false;
if (alr_budget_.budget_ratio() > conf_.start_budget_level_ratio &&
!alr_started_time_ms_) {
// 使用的budget小於預設的一定比例,alr狀態開啟
alr_started_time_ms_.emplace(rtc::TimeMillis());
state_changed = true;
} else if (alr_budget_.budget_ratio() < conf_.stop_budget_level_ratio &&
alr_started_time_ms_) {
state_changed = true;
alr_started_time_ms_.reset();
}
if (event_log_ && state_changed) {
event_log_->Log(
std::make_unique<rtceventalrstate>(alr_started_time_ms_.has_value()));
}
}
2.4 Probe
probe(探測)涉及到的類有三個:
ProbeController : 用來做探測控制的, 檢查是否需要探測
BitrateProber : 在實際的過程中執行探測,控制發送碼率大小
ProbeBitrateEstimator :是通過cc-feedback的報文得到探測包的發送情況,從而預估碼率
2.4.1 探測控制器-ProbeController
ProbeController在cc-controller創建的時候就被創建, 並執行Reset()做初始化, Reset()函數中最關心的變量是state, 它被設置成State::kInit
標識着還處於初始階段未進行探測.
ProbeController::ProbeController(const WebRtcKeyValueConfig* key_value_config,
RtcEventLog* event_log)
: enable_periodic_alr_probing_(false),
in_rapid_recovery_experiment_(absl::StartsWith(
key_value_config->Lookup(kBweRapidRecoveryExperiment),
"Enabled")),
limit_probes_with_allocateable_rate_(!absl::StartsWith(
key_value_config->Lookup(kCappedProbingFieldTrialName),
"Disabled")),
event_log_(event_log),
config_(ProbeControllerConfig(key_value_config)) {
Reset(0);
}
void ProbeController::Reset(int64_t at_time_ms) {
network_available_ = true;
state_ = State::kInit; // !
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
time_last_probing_initiated_ms_ = 0;
estimated_bitrate_bps_ = 0;
start_bitrate_bps_ = 0;
max_bitrate_bps_ = 0;
int64_t now_ms = at_time_ms;
last_bwe_drop_probing_time_ms_ = now_ms;
alr_end_time_ms_.reset();
mid_call_probing_waiting_for_result_ = false;
time_of_last_large_drop_ms_ = now_ms;
bitrate_before_last_large_drop_bps_ = 0;
max_total_allocated_bitrate_ = 0;
}
probe_controller隨着cc-controller建立完成后, 首先會在GoogCcNetworkController::OnProcessInterval()
被做一些設置和初始化:
NetworkControlUpdate GoogCcNetworkController::OnProcessInterval(
ProcessInterval msg) {
NetworkControlUpdate update;
if (initial_config_) {
// 重設loss_based和delay_based探測器和probe的初始碼率
// 獲得碼率探測簇配置(probe_cluster_config)
update.probe_cluster_configs =
ResetConstraints(initial_config_->constraints);
// probe_controller enable alr probing
if (initial_config_->stream_based_config.requests_alr_probing) {
probe_controller_->EnablePeriodicAlrProbing(
*initial_config_->stream_based_config.requests_alr_probing);
}
absl::optional<datarate> total_bitrate =
initial_config_->stream_based_config.max_total_allocated_bitrate;
if (total_bitrate) {
// 為probe設置最大的分配碼率(MaxTotalAllocatedBitrate)作為探測的上邊界
// 並生成響應的probe_cluster_config去進行探測
auto probes = probe_controller_->OnMaxTotalAllocatedBitrate(
total_bitrate->bps(), msg.at_time.ms());
...
}
...
}
...
// probe_controller_設置當前alr狀態
absl::optional<int64_t> start_time_ms =
alr_detector_->GetApplicationLimitedRegionStartTime();
probe_controller_->SetAlrStartTimeMs(start_time_ms);
// probe_controller 定時檢測是否需要進行process
auto probes = probe_controller_->Process(msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
// 獲取更新后的碼率,probe等,同時對alr, probe_controller中的碼率進行更新
MaybeTriggerOnNetworkChanged(&update, msg.at_time);
return update;
}
在OnProcessInterval()中首先調用ResetConstraints()將config中的min_data_rate, max_data_rate, starting_rate放入probe_controller_中
std::vector<probeclusterconfig> GoogCcNetworkController::ResetConstraints(
TargetRateConstraints new_constraints) {
min_target_rate_ = new_constraints.min_data_rate.value_or(DataRate::Zero());
max_data_rate_ =
new_constraints.max_data_rate.value_or(DataRate::PlusInfinity());
starting_rate_ = new_constraints.starting_rate;
ClampConstraints();
bandwidth_estimation_->SetBitrates(starting_rate_, min_data_rate_,
max_data_rate_, new_constraints.at_time);
if (starting_rate_)
delay_based_bwe_->SetStartBitrate(*starting_rate_);
delay_based_bwe_->SetMinBitrate(min_data_rate_);
// 設置初始target_bitrate,獲得最初的探測config
return probe_controller_->SetBitrates(
min_data_rate_.bps(), GetBpsOrDefault(starting_rate_, -1),
max_data_rate_.bps_or(-1), new_constraints.at_time.ms());
}
2.4.1.1 SetBitrates()
在ProbeController::SetBitrates()
設置的過程中就會開始做碼率探測, 由於是首次調用,所以會進入State::kInit
的分支,執行InitiateExponentialProbing()
std::vector<probeclusterconfig> ProbeController::SetBitrates(
int64_t min_bitrate_bps,
int64_t start_bitrate_bps,
int64_t max_bitrate_bps,
int64_t at_time_ms) {
if (start_bitrate_bps > 0) {
start_bitrate_bps_ = start_bitrate_bps;
estimated_bitrate_bps_ = start_bitrate_bps;
} else if (start_bitrate_bps_ == 0) {
// 沒有start_bitrate_bps_, 默認設為min_bitrate_bps
start_bitrate_bps_ = min_bitrate_bps;
}
// The reason we use the variable |old_max_bitrate_pbs| is because we
// need to set |max_bitrate_bps_| before we call InitiateProbing.
int64_t old_max_bitrate_bps = max_bitrate_bps_;
max_bitrate_bps_ = max_bitrate_bps;
switch (state_) {
case State::kInit:
// init從start_bitrate_bps_開始探測起
if (network_available_)
return InitiateExponentialProbing(at_time_ms);
break;
case State::kWaitingForProbingResult:
break;
case State::kProbingComplete:
// If the new max bitrate is higher than both the old max bitrate and the
// estimate then initiate probing.
if (estimated_bitrate_bps_ != 0 &&
old_max_bitrate_bps < max_bitrate_bps_ &&
estimated_bitrate_bps_ < max_bitrate_bps_) {
// The assumption is that if we jump more than 20% in the bandwidth
// estimate or if the bandwidth estimate is within 90% of the new
// max bitrate then the probing attempt was successful.
mid_call_probing_succcess_threshold_ =
std::min(estimated_bitrate_bps_ * 1.2, max_bitrate_bps_ * 0.9);
mid_call_probing_waiting_for_result_ = true;
mid_call_probing_bitrate_bps_ = max_bitrate_bps_;
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.Initiated",
max_bitrate_bps_ / 1000);
return InitiateProbing(at_time_ms, {max_bitrate_bps_}, false);
}
break;
}
return std::vector<probeclusterconfig>();
}
InitiateExponentialProbing()中用了擴大兩個系數first_exponential_probe_scale(3.0)
和second_exponential_probe_scale(3.0)
乘以start_bitrate_bps得到第一個和第二個要探測的碼率, 然后將這兩個碼率入參傳遞到InitiateProbing()
去生成探測的配置
std::vector<probeclusterconfig> ProbeController::InitiateExponentialProbing(
int64_t at_time_ms) {
RTC_DCHECK(network_available_);
RTC_DCHECK(state_ == State::kInit);
RTC_DCHECK_GT(start_bitrate_bps_, 0);
// When probing at 1.8 Mbps ( 6x 300), this represents a threshold of
// 1.2 Mbps to continue probing.
// 設置probe的初始碼率,初始的前兩個探測值為:
// config_.first_exponential_probe_scale(3.0) * start_bitrate_bps_
// config_.second_exponential_probe_scale(6.0) *start_bitrate_bps_
std::vector<int64_t> probes = {static_cast<int64_t>(
config_.first_exponential_probe_scale * start_bitrate_bps_)};
if (config_.second_exponential_probe_scale) {
probes.push_back(config_.second_exponential_probe_scale.Value() *
start_bitrate_bps_);
}
return InitiateProbing(at_time_ms, probes, true);
}
2.4.1.2 生成探測碼率- InitiateProbing()
InitiateProbing()
是ProbeController中最核心的函數, 其為傳入的每個碼率生成probe config, 這些config最終會在pacing controller的bitrate_prober中使用去影響發包速度, InitiateProbing()在ProbeController中會在多地調用, 凡是發生了什么網絡變化的,涉及到要重新探測碼率的就會調用該函數去生成probe config
/**
* @description: 根據bitrate生成ProbeClusterConfig以供bitrate_prober使用
* @param {bitrates_to_probe} 初始化要探測碼率的數組
* @return {std::vector<probeclusterconfig>} 要進行探測的組
*/
std::vector<probeclusterconfig> ProbeController::InitiateProbing(
int64_t now_ms,
std::vector<int64_t> bitrates_to_probe,
bool probe_further) {
// 獲取當前最大探測碼率(max_probe_bitrate_bps),受制於兩者:
// max_birate_bps_
// max_total_allocated_bitrate_
int64_t max_probe_bitrate_bps =
max_bitrate_bps_ > 0 ? max_bitrate_bps_ : kDefaultMaxProbingBitrateBps;
if (limit_probes_with_allocateable_rate_ &&
max_total_allocated_bitrate_ > 0) {
// 如果設置了probe受限於分配碼率(allocateable_rate_),並且設置了最大的受限分配碼率(max_total_allocated_bitrate)
// max_probe_bitrate_bps 不能大於2倍的max_total_allocated_bitrate_
// If a max allocated bitrate has been configured, allow probing up to 2x
// that rate. This allows some overhead to account for bursty streams,
// which otherwise would have to ramp up when the overshoot is already in
// progress.
// It also avoids minor quality reduction caused by probes often being
// received at slightly less than the target probe bitrate.
max_probe_bitrate_bps =
std::min(max_probe_bitrate_bps, max_total_allocated_bitrate_ * 2);
}
std::vector<probeclusterconfig> pending_probes;
// 每個要探測的bitrate都會以probeClusterConfig發送到pace_controoler,
// 在此為初始要特測的碼率生成probe config
for (int64_t bitrate : bitrates_to_probe) {
RTC_DCHECK_GT(bitrate, 0);
// 要探測的碼率大於最大探測碼率,不再進行探測,並置進一步探測標識(probe_further)為false
if (bitrate > max_probe_bitrate_bps) {
bitrate = max_probe_bitrate_bps;
probe_further = false;// !
}
// 為當前bitrate 生成config
ProbeClusterConfig config;
config.at_time = Timestamp::Millis(now_ms);
config.target_data_rate =
DataRate::BitsPerSec(rtc::dchecked_cast<int>(bitrate)); //探測目標碼率
config.target_duration = TimeDelta::Millis(kMinProbeDurationMs); // 目標探測時間(15ms)
config.target_probe_count = kMinProbePacketsSent; //目標探測包個數(5)
config.id = next_probe_cluster_id_; // 當前探測簇id( cluster_id )
next_probe_cluster_id_++;
MaybeLogProbeClusterCreated(event_log_, config);
pending_probes.push_back(config);
}
time_last_probing_initiated_ms_ = now_ms;
if (probe_further) {
// 如果啟用進一步探測
// 設置當前探測狀態為waiting
state_ = State::kWaitingForProbingResult;
// 計算一個最小進一步探測需要滿足的碼率,該值會在probe_controller更新預估碼率即執行SetEstimatedBitrate()時
// 用來檢測預估的碼率是否大於min_bitrate_to_probe_further_bps_,如果是會觸發探測,
min_bitrate_to_probe_further_bps_ =
(*(bitrates_to_probe.end() - 1)) * config_.further_probe_threshold;
} else {
// probe_further == false -> 整個probe_controller_完成探測
state_ = State::kProbingComplete;
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
}
return pending_probes;
}
ProbeController::InitiateProbing()中主要做了如下:
-
獲取max_probe_bitrate_bps(探測碼率上限), 對每個要探測的碼率判斷是否大於該上限, 不符合的剔除
-
為每個符合要探測的碼率生成ProbeClusterConfig, 其結構如下所示:
struct ProbeClusterConfig { Timestamp at_time = Timestamp::PlusInfinity(); DataRate target_data_rate = DataRate::Zero(); // 要探測的目標碼率 TimeDelta target_duration = TimeDelta::Zero(); // 探測時間 int32_t target_probe_count = 0; // 探測包個數 int32_t id = 0; // 當前探測簇id, 用來標明一組探測 };
-
如果入參probe_further為true的話, 說明進行此輪探測后,還會進一步的進行探測, 會將state設置為
State::kWaitingForProbingResult
, 並且根據探測碼率的最后一個生成一個進一步探測的最低碼率
2.4.1.3 周期性檢測是否探測-Process()
完成了SetBitrates()后,在GoogCcNetworkController::OnProcessInterval()
中, 會根據config調用probe_controller_->EnablePeriodicAlrProbing()
開啟probe controller的周期性Alr狀態下的碼率探測
void ProbeController::EnablePeriodicAlrProbing(bool enable) {
enable_periodic_alr_probing_ = enable;
}
這個值會在ProbeController::Process()
起作用, ProbeController::Process()
隨着GoogCcNetworkController::OnProcessInterval()
周期性的調用而調用.
/**
* @description: 周期性的進行檢測,如果當前處於alr狀態,檢測是否該做probe了
* @param {at_time_ms} 當前檢測時刻
* @return {*}
*/
std::vector<probeclusterconfig> ProbeController::Process(int64_t at_time_ms) {
if (at_time_ms - time_last_probing_initiated_ms_ >
kMaxWaitingTimeForProbingResultMs) {
mid_call_probing_waiting_for_result_ = false;
if (state_ == State::kWaitingForProbingResult) {
// 周期性檢查到了,但仍處於State::kWaitingForProbingResult狀態,說明cc-feedback還沒回來
// 沒觸發SetEstimatedBitrate,被認為超時, 提前結束進一步探測.
// 維持原碼率即可
RTC_LOG(LS_INFO) << "kWaitingForProbingResult: timeout";
state_ = State::kProbingComplete;
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
}
}
if (enable_periodic_alr_probing_ && state_ == State::kProbingComplete) {
// 如果kProbingComplete了,但是啟用了alr下進行探測,生成alr探測的config
// Probe bandwidth periodically when in ALR state.
if (alr_start_time_ms_ && estimated_bitrate_bps_ > 0) {
// 當前處於alr狀態,使用probe進行快速探測
// 使用alr狀態開始時間或上一次probe的時間兩者中的最大者 + interval作為下次probe時間
int64_t next_probe_time_ms =
std::max(*alr_start_time_ms_, time_last_probing_initiated_ms_) +
config_.alr_probing_interval->ms();
if (at_time_ms >= next_probe_time_ms) {
// 當前時間已經超過了下次探測時間,直接在預估碼率的基礎上進行探測
return InitiateProbing(at_time_ms,
{static_cast<int64_t>(estimated_bitrate_bps_ *
config_.alr_probe_scale)},
true);
}
}
}
return std::vector<probeclusterconfig>();
}
ProbeController::Process()
主要做了:
- 檢查是否處於
kWaitingForProbingResult
狀態, 如果是,則認為當前處於進一步的探測中,但是feedback卻還沒回來, 被認為超時, 提前結束進一步探測. - 如果啟用了Alr(enable_periodic_alr_probing), 當前處於Alr狀態下, 會首先通過alr狀態開啟時間(alr_start_time_ms)或上一次probe的時間(time_last_probing_initiated_ms) 加上一個interval 作為下次探測時間,當當前時間滿足之后, 基於預估碼率乘上一個系數變換作為探測碼率去探測.
2.4.1.4 更新預估碼率-SetEstimatedBitrate()
除了上述函數之外, cc-controler在接收到feedback之后進行碼率預估結束后會調用GoogCcNetworkController::MaybeTriggerOnNetworkChanged()
告知probe_controller 預估碼率發生改變, probe_controller會調用ProbeController::SetEstimatedBitrate()
去修改預估碼率:
std::vector<probeclusterconfig> ProbeController::SetEstimatedBitrate(
int64_t bitrate_bps,
int64_t at_time_ms) {
if (mid_call_probing_waiting_for_result_ &&
bitrate_bps >= mid_call_probing_succcess_threshold_) {
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.Success",
mid_call_probing_bitrate_bps_ / 1000);
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.ProbedKbps",
bitrate_bps / 1000);
mid_call_probing_waiting_for_result_ = false;
}
std::vector<probeclusterconfig> pending_probes;
if (state_ == State::kWaitingForProbingResult) {
// Continue probing if probing results indicate channel has greater
// capacity.
RTC_LOG(LS_INFO) << "Measured bitrate: " << bitrate_bps
<< " Minimum to probe further: "
<< min_bitrate_to_probe_further_bps_;
if (min_bitrate_to_probe_further_bps_ != kExponentialProbingDisabled &&
bitrate_bps > min_bitrate_to_probe_further_bps_) {
// 大於最小的進一步探測碼率,可以繼續探測
pending_probes = InitiateProbing(
at_time_ms,
{static_cast<int64_t>(config_.further_exponential_probe_scale *
bitrate_bps)},
true);
}
}
if (bitrate_bps < kBitrateDropThreshold * estimated_bitrate_bps_) {
// 當前設置的bitrate_bps 比estimated_bitrate_bps_ 小很多
// 發生了large drop
time_of_last_large_drop_ms_ = at_time_ms;
bitrate_before_last_large_drop_bps_ = estimated_bitrate_bps_;
}
// 將estimated_bitrate_bps_更新
estimated_bitrate_bps_ = bitrate_bps;
return pending_probes;
}
在ProbeController::SetEstimatedBitrate()中可以看到 :
- 檢測是否處於
kWaitingForProbingResult
,如果是則說明需要進一步探測(further detection), 將新設進來的預估碼率判斷是否大於最低進一步探測碼率(min_bitrate_to_probe_further_bps), 如果是則將它乘以一個變換系數后去生成probe config - 判斷新設的碼率是否比舊碼率小很多,如果是, 則標記發生了大的跌落(large_drop), 如果probe controller啟用了快恢復的特性, 即使state已經是
kProbingComplete
, 也會在RequestProbe()中生成probe config用於檢測當前真的發生穩定的large_drop.
2.4.1.5 檢測是否探測-RequestProbe()
RequestProbe()在GoogCcNetworkController::OnTransportPacketsFeedback()中會被調用, 調用的場景主要是需要快速恢復, 或是Alr下探測碼率需要借probe修正的情況.
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
if (recovered_from_overuse) {
// 從overuse中恢復了,重設alr start 時間
probe_controller_->SetAlrStartTimeMs(alr_start_time);
// 獲取接下來要做帶寬探測的參數,放到update中
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
} else if (backoff_in_alr) {
// 在alr狀態下發生了碼率下降, delay_base使用預估碼率而不是ack碼率下降,需要發送probe
// If we just backed off during ALR, request a new probe.
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
}
...
}
其主要原理很簡單, 去探測之前在SetEstimatedBitrate()
記錄下來暴跌前的碼率, 看看是否真的暴跌, 從而幫助快恢復
std::vector<probeclusterconfig> ProbeController::RequestProbe(
int64_t at_time_ms) {
// Called once we have returned to normal state after a large drop in
// estimated bandwidth. The current response is to initiate a single probe
// session (if not already probing) at the previous bitrate.
//
// If the probe session fails, the assumption is that this drop was a
// real one from a competing flow or a network change.
bool in_alr = alr_start_time_ms_.has_value();
bool alr_ended_recently =
(alr_end_time_ms_.has_value() &&
at_time_ms - alr_end_time_ms_.value() < kAlrEndedTimeoutMs);
// 處於alr或者剛剛還處於alr或者啟用了快恢復
if (in_alr || alr_ended_recently || in_rapid_recovery_experiment_) {
if (state_ == State::kProbingComplete) {
// 獲取碼率大幅下降前的碼率,去探測是否真的是large drop
uint32_t suggested_probe_bps =
kProbeFractionAfterDrop * bitrate_before_last_large_drop_bps_;
uint32_t min_expected_probe_result_bps =
(1 - kProbeUncertainty) * suggested_probe_bps;
int64_t time_since_drop_ms = at_time_ms - time_of_last_large_drop_ms_;
int64_t time_since_probe_ms = at_time_ms - last_bwe_drop_probing_time_ms_;
if (min_expected_probe_result_bps > estimated_bitrate_bps_ &&
time_since_drop_ms < kBitrateDropTimeoutMs &&
time_since_probe_ms > kMinTimeBetweenAlrProbesMs) {
RTC_LOG(LS_INFO) << "Detected big bandwidth drop, start probing.";
// Track how often we probe in response to bandwidth drop in ALR.
RTC_HISTOGRAM_COUNTS_10000(
"WebRTC.BWE.BweDropProbingIntervalInS",
(at_time_ms - last_bwe_drop_probing_time_ms_) / 1000);
last_bwe_drop_probing_time_ms_ = at_time_ms;
return InitiateProbing(at_time_ms, {suggested_probe_bps}, false);
}
}
}
return std::vector<probeclusterconfig>();
}
2.4.2 探測器-BitrateProber
2.4.2.1 創建探測簇-CreateProbeCluster()
BitrateProber是用來消費ProbeController創建的ProbeClusterConfig, 新生成的ProbeClusterConfig會由RtpTransportControllerSende::PostUpdates()進行轉發到PacingController
void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
if (update.congestion_window) {
pacer()->SetCongestionWindow(*update.congestion_window);
}
if (update.pacer_config) {
pacer()->SetPacingRates(update.pacer_config->data_rate(),
update.pacer_config->pad_rate());
}
for (const auto& probe : update.probe_cluster_configs) {
// 轉發probe cluster config
pacer()->CreateProbeCluster(probe.target_data_rate, probe.id);
}
if (update.target_rate) {
control_handler_->SetTargetRate(*update.target_rate);
UpdateControlState();
}
}
void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) {
prober_.CreateProbeCluster(bitrate, CurrentTime(), cluster_id);
}
最后轉發到BitrateProber:
void BitrateProber::CreateProbeCluster(DataRate bitrate,
Timestamp now,
int cluster_id) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
RTC_DCHECK_GT(bitrate, DataRate::Zero());
total_probe_count_++;
// 清除過期的cluster
while (!clusters_.empty() &&
now - clusters_.front().created_at > kProbeClusterTimeout) {
clusters_.pop();
total_failed_probe_count_++;
}
// 創建probe cluster, 放入clusters中
ProbeCluster cluster;
cluster.created_at = now;
cluster.pace_info.probe_cluster_min_probes = config_.min_probe_packets_sent;
cluster.pace_info.probe_cluster_min_bytes =
(bitrate * config_.min_probe_duration.Get()).bytes();
RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0);
cluster.pace_info.send_bitrate_bps = bitrate.bps();
cluster.pace_info.probe_cluster_id = cluster_id;
clusters_.push(cluster);
RTC_LOG(LS_INFO) << "Probe cluster (bitrate:min bytes:min packets): ("
<< cluster.pace_info.send_bitrate_bps << ":"
<< cluster.pace_info.probe_cluster_min_bytes << ":"
<< cluster.pace_info.probe_cluster_min_probes << ")";
// If we are already probing, continue to do so. Otherwise set it to
// kInactive and wait for OnIncomingPacket to start the probing.
// 已處於probing則保持,否則等待有包來的時候,才設置為active
if (probing_state_ != ProbingState::kActive)
probing_state_ = ProbingState::kInactive;
}
在BitrateProber::CreateProbeCluster()中:
- 以probe cluster的方式管理所有的probe
- 每當要創建的probe cluster的時候,首先會遍歷clusters隊列, 將過期的cluster給剔除
- 根據傳參進來的bitrate和cluster_id生成probe_cluster,然后將其放入cluster數組
2.4.2.2 計算當前探測發送碼率-RecommendedMinProbeSize()
在PacingController::ProcessPackets()從排隊隊列獲取包進行發包的時候, 會調用BitrateProber::RecommendedMinProbeSize()去獲取當前要探測的碼率, 然后再下面的循環發包的過程, 檢測到發包碼率小於探測碼率, 則會調用PacketRouter::GeneratePadding()生成padding包去彌補探測碼率.生成padding包的過程有機會再細說(TODO), 暫時可參考此文
void PacingController::ProcessPackets() {
....
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
// 獲取當前的prober cluster
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
// 從prober中獲取要探測的碼率
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
....
while (!paused_) {
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
// 取不到包了,看里probe的大小還差多少
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
// 生成padding包去彌補探測碼率
std::vector<std::unique_ptr<rtppackettosend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
// pading包入隊列
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
....
}
....
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
//prober更新已發送大小
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
}
在執行BitrateProber::RecommendedMinProbeSize()前實際上執行了BitrateProber::CurrentCluster(), 此函數會更新BitrateProber的cluster數組, 將過期的cluster丟棄
absl::optional<pacedpacketinfo> BitrateProber::CurrentCluster(Timestamp now) {
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return absl::nullopt;
}
// 隊頭的cluster已經過期了,丟棄
if (config_.abort_delayed_probes && next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay.Get()) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
" (next_ms:"
<< next_probe_time_.ms() << ", now_ms: " << now.ms()
<< "), discarding probe cluster.";
clusters_.pop();
if (clusters_.empty()) {
probing_state_ = ProbingState::kSuspended;
return absl::nullopt;
}
}
PacedPacketInfo info = clusters_.front().pace_info;
info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
return info;
}
然后才執行BitrateProber::RecommendedMinProbeSize(), 可以看到獲取的這個探測碼率為2倍的probe_delta時間下的碼率, 這是因為PacingController的發包涉及到進程和任務的調度, 不能保證隊列有包就能馬上觸發發送,在CPU高的時候,會有延遲,所以將這個探測區間擴大去平衡這種因素, 同時PacingController每次發包的時候會從BitrateProber獲取下次要探測的時間以確保探測的碼率
// Probe size is recommended based on the probe bitrate required. We choose
// a minimum of twice |kMinProbeDeltaMs| interval to allow scheduling to be
// feasible.
DataSize BitrateProber::RecommendedMinProbeSize() const {
if (clusters_.empty()) {
return DataSize::Zero();
}
// 獲取clusters隊頭的探測碼率
DataRate send_rate =
DataRate::BitsPerSec(clusters_.front().pace_info.send_bitrate_bps);
return 2 * send_rate * config_.min_probe_delta;
}
2.4.2.3 更新探測已發送碼率-ProbeSent()
下一個探測時間(next_probe_time)將會在探測包發送時, 其會調用CalculateNextProbeTime()去更新探測時間
void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK(!size.IsZero());
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
if (cluster->sent_probes == 0) {
RTC_DCHECK(cluster->started_at.IsInfinite());
cluster->started_at = now;
}
cluster->sent_bytes += size.bytes<int>();
cluster->sent_probes += 1;
// 更新下一個探測時間
next_probe_time_ = CalculateNextProbeTime(*cluster);
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
RTC_HISTOGRAM_COUNTS_100000("WebRTC.BWE.Probing.ProbeClusterSizeInBytes",
cluster->sent_bytes);
RTC_HISTOGRAM_COUNTS_100("WebRTC.BWE.Probing.ProbesPerCluster",
cluster->sent_probes);
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.Probing.TimePerProbeCluster",
(now - cluster->started_at).ms());
clusters_.pop();
}
if (clusters_.empty()) {
probing_state_ = ProbingState::kSuspended;
}
}
}
CalculateNextProbeTime()的原理很簡單: 下一個探測時刻 = cluster 開始探測時刻 + 已經發送數據要流逝的時間
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0);
RTC_CHECK(cluster.started_at.IsFinite());
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
DataRate send_bitrate =
DataRate::BitsPerSec(cluster.pace_info.send_bitrate_bps);
TimeDelta delta = sent_bytes / send_bitrate;
// 下一個探測時刻 = cluster 開始探測時刻 + 已發送的數據所用的時刻
return cluster.started_at + delta;
}
2.4.3 探測預估器-ProbeEstimator
當探測包發送到接收端后, 接收端構造cc-feedback發送回來, 由於ProbeEstimator去計算實際的探測碼率, 在GoogCcNetworkController::OnTransportPacketsFeedback()可以見到
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
for (const auto& feedback : report.SortedByReceiveTime()) {
if (feedback.sent_packet.pacing_info.probe_cluster_id !=
PacedPacketInfo::kNotAProbe) {
// probe_estimator 根據返回的feedback更新帶寬探測的計算
probe_bitrate_estimator_->HandleProbeAndEstimateBitrate(feedback);
}
}
...
// 獲取上面循環更新probe_estimator的最終的結果
absl::optional<datarate> probe_bitrate =
probe_bitrate_estimator_->FetchAndResetLastEstimatedBitrate();
...
}
在for循環中使用report.SortedByReceiveTime()獲取到的包都是有接收到的包, 沒有收到的包的feedback時間為正無窮,被自動過濾了
std::vector<packetresult> TransportPacketsFeedback::SortedByReceiveTime()
const {
std::vector<packetresult> res;
for (const PacketResult& fb : packet_feedbacks) {
// 只返回接收到的包
if (fb.receive_time.IsFinite()) {
res.push_back(fb);
}
}
std::sort(res.begin(), res.end(), PacketResult::ReceiveTimeOrder());
return res;
}
然后feedback packet會作為 ProbeBitrateEstimator::HandleProbeAndEstimateBitrate()的入參使用
2.4.3.1 計算探測結果-HandleProbeAndEstimateBitrate()
ProbeBitrateEstimator::HandleProbeAndEstimateBitrate() 通過feedback packet 去計算探測碼率
absl::optional<datarate> ProbeBitrateEstimator::HandleProbeAndEstimateBitrate(
const PacketResult& packet_feedback) {
// 從feedback packet中獲得probe cluster id
int cluster_id = packet_feedback.sent_packet.pacing_info.probe_cluster_id;
RTC_DCHECK_NE(cluster_id, PacedPacketInfo::kNotAProbe);
// 清除之前到期的cluster
EraseOldClusters(packet_feedback.receive_time);
// 獲取cluster進行跟新(或通過feedback創建cluster)
AggregatedCluster* cluster = &clusters_[cluster_id];
// 更新send_time
if (packet_feedback.sent_packet.send_time < cluster->first_send) {
cluster->first_send = packet_feedback.sent_packet.send_time;
}
if (packet_feedback.sent_packet.send_time > cluster->last_send) {
cluster->last_send = packet_feedback.sent_packet.send_time;
cluster->size_last_send = packet_feedback.sent_packet.size;
}
// 更新receive_time
if (packet_feedback.receive_time < cluster->first_receive) {
cluster->first_receive = packet_feedback.receive_time;
cluster->size_first_receive = packet_feedback.sent_packet.size;
}
// 對於沒有接收到的包,已經在外部(SortedByReceiveTime())做了特殊處理
if (packet_feedback.receive_time > cluster->last_receive) {
cluster->last_receive = packet_feedback.receive_time;
}
// 統計send packet size
cluster->size_total += packet_feedback.sent_packet.size;
cluster->num_probes += 1;
RTC_DCHECK_GT(
packet_feedback.sent_packet.pacing_info.probe_cluster_min_probes, 0);
RTC_DCHECK_GT(packet_feedback.sent_packet.pacing_info.probe_cluster_min_bytes,
0);
// 帶寬探測對返回feedback包的數量和大小都有要求
// 最小返回需要的探測包為 probe_cluster_min_probes * 0.8
// 最小返回大小為probe_cluster_min_bytes * 0.8
int min_probes =
packet_feedback.sent_packet.pacing_info.probe_cluster_min_probes *
kMinReceivedProbesRatio;
DataSize min_size =
DataSize::Bytes(
packet_feedback.sent_packet.pacing_info.probe_cluster_min_bytes) *
kMinReceivedBytesRatio;
if (cluster->num_probes < min_probes || cluster->size_total < min_size)
return absl::nullopt;
// 計算發送/接收間隔
// 並對其進行合理性校驗,合理才計算帶寬探測
TimeDelta send_interval = cluster->last_send - cluster->first_send;
TimeDelta receive_interval = cluster->last_receive - cluster->first_receive;
if (send_interval <= TimeDelta::Zero() || send_interval > kMaxProbeInterval ||
receive_interval <= TimeDelta::Zero() ||receive_interval > kMaxProbeInterval) {
RTC_LOG(LS_INFO) << "Probing unsuccessful, invalid send/receive interval";
}
// send_interval 不包含最后一個包的發送時間,計算碼率的時候要去掉最后一個包
RTC_DCHECK_GT(cluster->size_total, cluster->size_last_send);
// 計算發送碼率
DataSize send_size = cluster->size_total - cluster->size_last_send;
DataRate send_rate = send_size / send_interval;
// 同上
RTC_DCHECK_GT(cluster->size_total, cluster->size_first_receive);
DataSize receive_size = cluster->size_total - cluster->size_first_receive;
// 計算接受碼率
DataRate receive_rate = receive_size / receive_interval;
// receive_rate 遠大於 send_rate,一半包的feedback還沒加進來,不做預測
double ratio = receive_rate / send_rate;
if (ratio > kMaxValidRatio) {
RTC_LOG(LS_INFO) << "Probing unsuccessful, receive/send ratio too high";
}
RTC_LOG(LS_INFO) << "Probing successful";
// 取發送碼率和接受碼率中小的那個作為探測的結果
DataRate res = std::min(send_rate, receive_rate);
// 如果接收碼率遠小於發送碼率,這說明達到了鏈路的真實容量,此時,目標碼率會被設置為
// receive_rate降低一點即可
if (receive_rate < kMinRatioForUnsaturatedLink * send_rate) {
RTC_DCHECK_GT(send_rate, receive_rate);
res = kTargetUtilizationFraction * receive_rate;
}
if (event_log_) {
event_log_->Log(
std::make_unique<rtceventproberesultsuccess>(cluster_id, res.bps()));
}
estimated_data_rate_ = res;
return estimated_data_rate_;
}
ProbeBitrateEstimator::HandleProbeAndEstimateBitrate() 中:
- 利用當前接踵而來的feedback packet, 維護一個發送區間 [first_send_time , last_send_time] 和 接收區間 [firset_receive_time , last_receive_time], 同時累計已發送數據大小(send_size)和已接收數據大小(receive_size), 然后一除就能得到發送碼率和接收碼率
- 計算發送碼率和接收碼率的過程中,對於interval和包的數量都有要達到的要求,一定程度上保證結果的准確性
- 其中沒有接收到的包對應的feedback在前面已經說過了會被過濾掉, 所以這里計算receive_size不用特殊處理,但是計算send_size按道理是需要把那部分包給加上的, 不知道為何沒有, 沒能理解
- 最終會均衡發送碼率和接受碼率中最小的會作為探測碼率
2.4.3.2 獲取探測結果-FetchAndResetLastEstimatedBitrate()
最終碼率可以通過robeBitrateEstimator::FetchAndResetLastEstimatedBitrate()得到, 這里有個細節,預估碼率被取走后就reset了, 下次進來就拿不到了, 這個函數只會在cc-controller收到cc-feedback時調用, 為的就是計算當前實時的探測碼率去輔助后面的碼率估計,沒有就不要.
absl::optional<datarate>
ProbeBitrateEstimator::FetchAndResetLastEstimatedBitrate() {
absl::optional<datarate> estimated_data_rate = estimated_data_rate_;
estimated_data_rate_.reset();
return estimated_data_rate;
}
2.5 吞吐量-AcknowledgedBitrateEstimator
和Probe相比, AcknowledgedBitrateEstimator 是用來計算當前吞吐量的, 而Probe只會在一些特殊的時候才會進行探測(鏈路剛開始時, 碼率不正常暴跌時), AcknowledgedBitrateEstimator在中會被調用GoogCcNetworkController::OnTransportPacketsFeedback()
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
// 根據feedback計算吞吐量
acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector(
report.SortedByReceiveTime());
auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate();
...
}
然后調用AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector()
2.5.1 統計包大小-IncomingPacketFeedbackVector()
IncomingPacketFeedbackVector()主要是會統計出當前這個feedback packet的size, 然后交由BitrateEstimator去更新吞吐量
void AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector(
const std::vector<packetresult>& packet_feedback_vector) {
RTC_DCHECK(std::is_sorted(packet_feedback_vector.begin(),
packet_feedback_vector.end(),
PacketResult::ReceiveTimeOrder()));
for (const auto& packet : packet_feedback_vector) {
if (alr_ended_time_ && packet.sent_packet.send_time > *alr_ended_time_) {
bitrate_estimator_->ExpectFastRateChange();
alr_ended_time_.reset();
}
// 這個acknowledged_estimate的值是當前已發出的packet的size和其前面
// 沒有開啟feedback track的size的總和
// 每個packet發送的時候會被檢查是否enable TransportSequenceNumber
// enable了就會開啟cc-feedback,會有對應的sent_packet和size的記錄
// 沒有enable的但是網絡上發送出去的packet的size會做累加,直到下一個enable
// 的packet出現,把累加的size放到prior_unacked_data中
DataSize acknowledged_estimate = packet.sent_packet.size;
acknowledged_estimate += packet.sent_packet.prior_unacked_data;
bitrate_estimator_->Update(packet.receive_time, acknowledged_estimate,
in_alr_);
}
}
這里有一個細節的地方, feedback packet中有一個值叫做prior_unacked_data, 記錄的是在這個packet前的包但是沒有TransportSequenceNumber, 這種包不會feedback創建, 但是它們的size會累積到下一個最近包的prior_unacked_data上, 詳細點見2.5.2
2.5.2 included_in_feedback
packet->included_in_feedback的設置在RtpSenderEgress::SendPacket()中設置的,如果啟用了TransportSequenceNumber這個extension則會設置這個包
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
//........
options.is_retransmit = !is_media;
if (auto packet_id = packet->GetExtension<transportsequencenumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true; //設置是否in_feed_back
options.included_in_allocation = true;
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
options.additional_data = packet->additional_data();
if (packet->packet_type() != RtpPacketMediaType::kPadding &&
packet->packet_type() != RtpPacketMediaType::kRetransmission) {
UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
packet_ssrc);
}
// 轉發packet
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
......
}
這個flag伴隨着options會隨着的以下的堆棧從SendPacketToNetwork()直到從AsyncUDPSocket::SendTo()發送完成后會構造構造SentPacket,沿着類層次反向將發包情況發回通知上層
rtc::AsyncUDPSocket::SendTo()
cricket::UDPPort::SendTo()
cricket::ProxyConnection::Send()
cricket::P2PTransportChannel::SendPacket()
cricket::DtlsTransport::SendPacket()
webrtc::RtpTransport::SendPacket()
webrtc::SrtpTransport::SendRtpPacket()
cricket::BaseChannel::SendPacket()
bool RtpSenderEgress::SendPacketToNetwork() // 發送
int AsyncUDPSocket::SendTo(const void* pv,
size_t cb,
const SocketAddress& addr,
const rtc::PacketOptions& options) {
// 構造SentPacket
rtc::SentPacket sent_packet(options.packet_id, rtc::TimeMillis(),
options.info_signaled_after_sent);
CopySocketInformationToPacketInfo(cb, *this, true, &sent_packet.info);
int ret = socket_->SendTo(pv, cb, addr);
SignalSentPacket(this, sent_packet);
return ret;
}
將構造的包返回上層,傳遞至TransportFeedbackAdapter中
absl::optional<sentpacket> TransportFeedbackAdapter::ProcessSentPacket()
void RtpTransportControllerSend::OnSentPacket()
void BaseChannel::SignalSentPacket_n()
webrtc::RtpTransport::OnSentPacket()
cricket::DtlsTransport::OnSentPacket()
cricket::P2PTransportChannel::OnSentPacket()
cricket::UDPPort::OnSentPacket()
rtc::AsyncUDPSocket::SendTo()
調用的RtpTransportControllerSend::OnSentPacket()
如下:
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
task_queue_.PostTask([this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
absl::optional<sentpacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
if (packet_msg && controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
其中transport_feedback_adapter_.ProcessSentPacket()
中會用上included_in_feedback,將untracked_size 累加起來,放到最近一個sent_packet上
absl::optional<sentpacket> TransportFeedbackAdapter::ProcessSentPacket(
const rtc::SentPacket& sent_packet) {
auto send_time = Timestamp::Millis(sent_packet.send_time_ms);
// TODO(srte): Only use one way to indicate that packet feedback is used.
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
int64_t unwrapped_seq_num =
seq_num_unwrapper_.Unwrap(sent_packet.packet_id);
auto it = history_.find(unwrapped_seq_num);
if (it != history_.end()) {
bool packet_retransmit = it->second.sent.send_time.IsFinite();
it->second.sent.send_time = send_time;
last_send_time_ = std::max(last_send_time_, send_time);
// TODO(srte): Don't do this on retransmit.
if (!pending_untracked_size_.IsZero()) {
// 將untracked_size 賦值給最近一個sent_packet
if (send_time < last_untracked_send_time_)
RTC_LOG(LS_WARNING)
<< "appending acknowledged data for out of order packet. (Diff: "
<< ToString(last_untracked_send_time_ - send_time) << " ms.)";
it->second.sent.prior_unacked_data += pending_untracked_size_;
pending_untracked_size_ = DataSize::Zero();
}
if (!packet_retransmit) {
if (it->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.AddInFlightPacketBytes(it->second);
it->second.sent.data_in_flight = GetOutstandingData();
return it->second.sent;
}
}
} else if (sent_packet.info.included_in_allocation) {
if (send_time < last_send_time_) {
RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet.";
}
// 將untracked_size累加起來
pending_untracked_size_ +=
DataSize::Bytes(sent_packet.info.packet_size_bytes);
last_untracked_send_time_ = std::max(last_untracked_send_time_, send_time);
}
return absl::nullopt;
}
2.5.3 更新吞吐量-Update()
Update() 通過feedback packet的size和到達時間來更新吞吐量
void BitrateEstimator::Update(Timestamp at_time, DataSize amount, bool in_alr) {
int rate_window_ms = noninitial_window_ms_.Get();
// We use a larger window at the beginning to get a more stable sample that
// we can use to initialize the estimate.
if (bitrate_estimate_kbps_ < 0.f)
rate_window_ms = initial_window_ms_.Get();
bool is_small_sample = false;
// 計算當前時刻碼率
float bitrate_sample_kbps = UpdateWindow(at_time.ms(), amount.bytes(),
rate_window_ms, &is_small_sample);
if (bitrate_sample_kbps < 0.0f)
return;
if (bitrate_estimate_kbps_ < 0.0f) {
// This is the very first sample we get. Use it to initialize the estimate.
bitrate_estimate_kbps_ = bitrate_sample_kbps;
return;
}
// Optionally use higher uncertainty for very small samples to avoid dropping
// estimate and for samples obtained in ALR.
float scale = uncertainty_scale_;
if (is_small_sample && bitrate_sample_kbps < bitrate_estimate_kbps_) {
scale = small_sample_uncertainty_scale_;
} else if (in_alr && bitrate_sample_kbps < bitrate_estimate_kbps_) {
// alr狀態下,碼率未全部使用,所以實際samples的值的方差應該更大
// Optionally use higher uncertainty for samples obtained during ALR.
scale = uncertainty_scale_in_alr_;
}
// Define the sample uncertainty as a function of how far away it is from the
// current estimate. With low values of uncertainty_symmetry_cap_ we add more
// uncertainty to increases than to decreases. For higher values we approach
// symmetry.
// 此處定義了一個sample_uncertainty,含義上是預估碼率和觀測碼率的偏差
// 偏差越大說明采樣點的方差越大,可信度越低
float sample_uncertainty =
scale * std::abs(bitrate_estimate_kbps_ - bitrate_sample_kbps) /
(bitrate_estimate_kbps_ +
std::min(bitrate_sample_kbps,
uncertainty_symmetry_cap_.Get().kbps<float>()));
float sample_var = sample_uncertainty * sample_uncertainty;
// Update a bayesian estimate of the rate, weighting it lower if the sample
// uncertainty is large.
// The bitrate estimate uncertainty is increased with each update to model
// that the bitrate changes over time.
float pred_bitrate_estimate_var = bitrate_estimate_var_ + 5.f;
// 這其實對應的是一個卡爾曼率濾波的后驗期望的更新過程
// 后驗期望:exp[k]+ = exp[k]ˉ + k*(y[k] - h* exp[k]ˉ)
// 其中 k = var[k]ˉ / (var[k]ˉ + sample_var) (var 和 sample_var 分別為預測誤差方差和觀測誤差方差)
bitrate_estimate_kbps_ = (sample_var * bitrate_estimate_kbps_ +
pred_bitrate_estimate_var * bitrate_sample_kbps) /
(sample_var + pred_bitrate_estimate_var);
bitrate_estimate_kbps_ =
std::max(bitrate_estimate_kbps_, estimate_floor_.Get().kbps<float>());
// 這其實對應的是一個卡爾曼率濾波的后驗方差的更新過程,
// 后驗方差: var[k] = (1 - k) * var[k]ˉ
// 其中 k = var[k]ˉ / (var[k]ˉ + sample_var) (var 和 sample_var 分別為預測誤差方差和觀測誤差方差)
bitrate_estimate_var_ = sample_var * pred_bitrate_estimate_var /
(sample_var + pred_bitrate_estimate_var);
BWE_TEST_LOGGING_PLOT(1, "acknowledged_bitrate", at_time.ms(),
bitrate_estimate_kbps_ * 1000);
}
BitrateEstimator::Update()中:
-
通過新來feedback packet的大小調用UpdateWindow()去計算當前的碼率(bitrate_sample_kbps)
-
將當前計算出來當前碼率(bitrate_sample_kbps)作為觀測值, 把上一個預測碼率(bitrate_estimate_kbps_)當作預測值, 使用貝葉斯濾波去修正當前觀測碼率(貝葉斯濾波可參考此文), 其中引入了一個基於觀測值和預測值的差的變量sample_uncertainty去作為樣本標准差.
2.5.4 計算吞吐量-UpdateWindow()
UpdateWindow()計算當前碼率的過程如下所示, 原理很簡單, 設置了一個時間窗口大小(rate_windows_ms), 將受到feedback的數據放入到最新的rate_windows_ms中, 一除就得到吞吐量了.
float BitrateEstimator::UpdateWindow(int64_t now_ms,
int bytes,
int rate_window_ms,
bool* is_small_sample) {
RTC_DCHECK(is_small_sample != nullptr);
// rate_window_ms(預設評估窗口大小)
// |**********************|------------------------------|
// |-----------------------------------------------------|
// prev_time_ms_ current_window_ms_(當前窗口大小) now_ms
// Reset if time moves backwards
if (now_ms < prev_time_ms_) {
prev_time_ms_ = -1;
sum_ = 0;
current_window_ms_ = 0;
}
if (prev_time_ms_ >= 0) {
// 計算當前窗口大小
current_window_ms_ += now_ms - prev_time_ms_;
// Reset if nothing has been received for more than a full window.
// rate_windows_ms(預設窗口大小)
// |***************************************|
// .......|-----------------------------------------------------|
// prev_time_ms_ now_ms
// |......----------------current_window_ms_--------------------|
// |*********************| 規定窗口大小rate_window_ms
// |---------------------| 被縮減后的窗口
// 超過一個窗口沒有接收到東西了,把過去窗口累加的size重置為0, 並把窗口大小
// 進行縮減,減去一個周期前的長度,因為sum_置0后,會+=bytes,所以窗口
// 不是直接置0而是保存在在一個窗口的部分
if (now_ms - prev_time_ms_ > rate_window_ms) {
sum_ = 0; // 重置
current_window_ms_ %= rate_window_ms;
}
}
prev_time_ms_ = now_ms;
float bitrate_sample = -1.0f;
if (current_window_ms_ >= rate_window_ms) {
// 滿足一個窗口了,計算當前窗口內的碼率
*is_small_sample = sum_ < small_sample_threshold_->bytes(); // 窗口數據小
bitrate_sample = 8.0f * sum_ / static_cast<float>(rate_window_ms);
current_window_ms_ -= rate_window_ms;// 減去窗口
sum_ = 0;
}
sum_ += bytes;
return bitrate_sample;
}
2.6 基於延遲碼率預估-DelayBasedBwe
DelayBasedBwe是基於延遲的碼率估計, 在進行源碼分析前, 對它的背后原理進行簡述
2.6.1 cc 擁塞控制原理
GCC擁塞控制的原理有相應的論文:Analysis and Design of the Google Congestion Contro for Web Real-time Communication (WebRTC),雖然該論文在包組間的延遲梯度的計算是卡爾曼濾波(還不是trendline),但沒關系,只是一個小點而已,全文對webrtc的擁塞控制的設計,構成,原理都介紹的很全,雖然網絡上很多資料也會進行總結介紹,但其實說的並不深入的,只是簡單的介紹了紙面公式的含義,如包組間延遲梯度計算,閾值判斷,閾值更新,發送碼率更新,但對背后的思想沒有進行探討,對網絡方向鋪墊不夠的同學,很容易誤導其以為理解了流程就完全理解了整個系統的原理。
2.6.1.1 基於延遲梯度的擁塞控制原理
gcc的基於延遲的擁塞控制是一個由【發送碼率】-【排隊延遲梯度】-【自適應閾值】三者構成的一個系統,如下所示:
發送碼率(send bitrate)影響了網絡設備排隊隊列的長度,從而影響到了排隊延遲梯度(delay gradient)的大小,排隊延遲梯度的大小和閾值(threshold detector)比較又決定下次的發送碼率是上升還是下降,看起來是雞生蛋蛋生雞,接下來詳細解釋一下其中的部分細節。
排隊延遲梯度是指兩組包之間在網絡傳播時,由於網絡設備的排隊隊列長度的改變而導致的單位時間下排隊延遲變化量, 借舊版的webrtc中的基於卡爾曼濾波延遲擁塞控制算法,如下圖所示 \(T_{i-1}\) 和 \(T_{i}\)分別代表的是連續兩幀圖像的包組,其在網絡傳輸的延遲差為:
這連續兩幀延遲差產生的原因是有\(\Delta L_{i}\)(包組間的大小差), \(C_{i}\)(傳輸速率),\(m_{i}\)(包組排隊延遲差),\(z_{i}\)(隨機誤差)
重點需要介紹的是\(m_{i}\)(排隊延遲差),程序發送的每個數據包經過中間網絡設備的轉發的時候會先被放在排隊隊列再經過處理發送, 根據網絡設備的能力,當設備的處理發送速度不小於接收速度,排隊隊列就始終很小甚至為空,排隊延遲基本為0,當設備的處理發送速度比接收速度小的時候,排隊隊列就會越來越大,排隊延遲就會很高
在超過設備的最大能力時,發送碼率增加/減少劇烈,隊列長度也會跟着劇烈變化,進而導致排隊延遲的劇烈變化,我們使用排隊延遲梯度的大小去表示排隊延遲的變化程度,所以排隊延遲梯度的大小和發送碼率的變化是有直接關系的。
要解決網絡擁塞的問題,本質是在鏈路最大負載和發送率之間做均衡,但鏈路的最大負載能力是不確定的也無法直接探測,而排隊隊列的雖然情況直接反應了當前鏈路的負載情況,但由於無法直接測得排隊隊列長度,所以側面把排隊延遲梯度作為目標指標,始終讓它逐漸增長,遇到某個閾值后降下來,然后再增長,就可以保證:
1.充分達到網絡的飽和負載,隊列大小才開始有正向變化,排隊延遲梯度才不再始終為0,這個時候肯定是超過鏈路最大負載能力。
2. 防止過載,當排隊梯度正向擴張,並且大於某個閾值時,已經充分使用了鏈路的負載,隊列也一定已經增長到某個程度了,已經是過負載的情況,這個時候就要降低發送碼率,發送碼率小於鏈路最大負載后,排隊隊列會被逐漸排空,隊列沒包穩定了,此時延遲梯度也會慢慢的收斂至零,又可以進行下一輪的增長。
在理解該算法時很容易把最終的碼率看作目標,排隊延遲梯度當作手段,其實不然,該算法是通過不斷的碼率調節,調節排隊延遲梯度達到正向到大於某個閾值,從而達到網絡設備的最大負載。
接下來從數學角度上介紹上面的那一坨,簡潔直觀的看碼率如何影響排隊延遲梯度(這里給出了梯度的數學定義,這個數學定義非常重要,在后續的思考跑偏的時候可以回來看看排隊梯度到底是啥),排隊延遲梯度被定義為排隊延遲的導數\(T_q{'}(t)\),如下:
其中\(q(t)\)為隊列長度,\(q^{’}(t)\) 為排隊隊列增長導數,\(C\)為鏈路最大容量,也就是最大處理負載,\(r(t)\)為隊列接收速率也就是發送碼率。
式(2.2.4)~式(2.2.6)可以不看,只是為了展示\(q^{’}(t)\)含義,揭示它和發送碼率r(t)的關系,所以啰嗦的把過程寫出來了;
對於式(2.2.3), 直觀上理解就是: **排隊延遲梯度 = 隊列長度的變化 / 最大處理負載 **。
最后得到式(2.2.7),很直觀能看到排隊延遲梯度鏈路容量\(T_q{'}(t)\)的大小就是接收碼率r(t)和鏈路最大容量C的大小關系,當接收碼率r(t)越來越高,其和鏈路容量的差值就越大,鏈路越負載不動,延遲梯度\(T_q{'}(t)\)也就越高;但同時也揭示了一個局限點,假設r(t) > C 后但是不再增長,此時延遲梯度便不會再做改變,但此時排隊隊列會隨着時間不斷的增長而導致傳輸延遲升高最終導致網絡擁堵的情況,這明顯不是我們想看到的事情,所以接收碼率r(t)必須不斷的攀升,使得延遲梯度\(T_q{'}(t)\)跟着攀升到一個閾值,這時就可以認為隊列已經經過了一段時間增長變得太長了,需要緩一緩,進而降低接收碼率,讓隊列減少,減少到一定后再繼續攀爬,周而復始,如下圖所示:
2.6.1.2 webrtc中基於延遲的碼率預估
- 排隊延遲梯度的計算
m55之后,webrtc使用trendline算法去估算排隊延遲梯度;其和此前基於卡爾曼濾波的老算法相比,最大的不同是:
1.不再將視頻幀作為包組,而是將連續的包間間隔不超過的5ms的包作為一組,包組長不超過100ms.
2.不再用卡爾曼濾波修正對排隊延遲梯度估計的誤差,而是使用線性回歸的方式直接對排隊延遲梯隊進行預估平衡誤差和隨機因素.
記每個包組的第一個包的發送時間為\(T_{i}\)和最后一個包的到達時間\(t_i\),通過公式2.2.1
,可以得到組間傳輸延遲差
在收集到20個這樣的包組時候,以到達時間 - 第一個包的達到時間為x軸,累計延遲為y軸,這些延遲差在一定時刻下對應一定的排隊梯度,但因為觀測誤差,噪聲等其它隨機因素導致出現偏差,延遲差還需要通過線性回歸計算斜率,去平衡其它干擾因素帶來的影響,計算出來的斜率便是排隊延遲的梯度。
實際的計算具體的過程如下:
每個包組對應的x軸坐標\(x_i\)定義如下成到達時間-第一個包到達時間,\(t_i\)為到達時間, \(first_arrival\)為系統運行后第一個包的到達時間
每個包組對應的y軸坐標\(y_i\)定義成延遲變化的累計\(\sum_{k = 0}^{i}{d_k}\), 為了平滑使用式(2.2.3.3)指數退避(Exponential backoff filter)做一次平滑
確定好了\(x_i, y_i\)后使用線性回歸的公式計算, 見(2.2.3.4 ~ 2.2.3.6),就可以得到當前的排隊延遲梯度\(T_{q_i}\)
- 根據梯度探測當前網絡狀況
得到排隊延遲梯度T_{q_i}之后,接下來就要對該梯度進行檢測,梯度檢測的過程和2.2.1所描述的差不多,也是看梯度是否在一個預設的經驗閾值 $[-\gamma, \gamma] \(內,但梯度的計算是由當前n(20)個點進行的,由於點的間隔相對穩定(100ms)內,**所以webrtc中將梯度*點數,得到一個基於點數為基礎的梯度值,然后和一個基於點數的經驗閾值進行比較(后續分析源碼時詳細解釋)**,然后將此值和閾值做對比,大於\)\gamma\(表示帶寬過度使用(overuse),處於\)[-\gamma, \gamma]\(表示正常(normal),小於\)-\gamma$表示帶寬未充分使用(underuse)
閾值\(\gamma\)在比較完成后會結合當前的時延梯度進行指數平滑,指數平滑是時間序列分析中常用的一種數學手段,平滑的指數中還增加了\(\Delta ti\),使得閾值的調整更加及時, \(\Delta t_{i} = t_{i} - t_{i-1}\) ,代表着當前閾值更新時間和上一次閾值更新時間的間隔
其中\(k_{i}\)並不是一個固定值,當梯度絕對值大於閾值的時候,\(k_i=0.0087\)比較小,超過閾值是的梯度\(|NT_{q_{i}}|\)的瞬時增跌是非常劇烈的(因為從是0隊列長開始變化的),所以此時更傾向於歷史值;而當梯度絕對值小於閾值的時候, \(k_{i}\)比較大,讓閾值向當前梯度迅速收斂,當前梯度更有參考性。
在這里需要提及一個點,為什么要用延時梯度去更新閾值?指數平滑一般是使用同一個變量在不同時刻的值做加權,而這里延時梯度和閾值並不是同一個變量,這樣去更新閾值是否會破壞閾值的有效性?這就需要回到我們的根本目標上來,我們需要的是什么?需要的是剛好充分的使用到鏈路的最大負載能力,這個時候排隊隊列是剛好開始積累包或者排隊隊列的包即將排空,如果能夠排除掉網絡公用性以及一些其它隨機性的干擾因素,梯度變為正向時就說明已經到達鏈路最大負載能力了,而不再需要設置一個閾值去緩沖,可這是不可能的,所以才需要設置這么一個閾值而不是直接和0比較,但觀測到的梯度其實往往對應的是排隊隊列開始積累包/即將排空包時的梯度, 觀測梯度更具有參考性,表示當前隊列的長度在0的邊緣,所以我們需要對預設的閾值向觀測梯度收斂: 在\(|NT_{q_{i}}| < \gamma_{i-1}\)時,\(|NT_{q_{i}}|\)緩慢變化,代表着排隊隊列剛開始積累/排空包的梯度,此時應當讓閾值快速朝着梯度收斂;在\(|NT_{q_{i}}| > \gamma_{i-1}\),此時的梯度代表剛好超過了鏈路能承載的極限一點的梯度,閾值應該朝着向這個方向增長。以下圖的5個點作為例子:
在點1處,隨着發送碼率的慢慢攀未超過鏈路容量,隊列處於積累包的邊緣,閾值在這一段朝着的梯度收斂。
在點2處,發送碼率超過鏈路容量,隊列已經明確開始積累包不再為0,梯度有了明顯變化瞬時垂直上升,閾值朝着這個隊列長度剛起來的時刻下的梯度逼近。
在點3處,梯度超過了閾值后,發送碼率被調整跳崖式下降,梯度跳崖式下降,隊列處於漸漸排空包的狀態。
在點4處,梯度被檢測到小於閾值,發送碼率重新升高,梯度慢慢回升。
在點5處,重新重復點2處的邏輯,發送碼率超過鏈路容量,梯度開始垂直上升。
- 根據當前網絡狀況調整發送碼率
根據梯度和閾值比較得到當前的網絡狀況(overuse, normal , underuse),就可以調整碼率了,調整的過程中webrtc使用了一個狀態機,如下圖所示:
在Decrease,Hold,Increase對應的發送碼率調整如下, 其中\(A_{r}(t_i)\)表示\(t_i\)時刻預估碼率,\(R_r(t_i)\)表示\(t_i\)時刻實際碼率, 采用了AIMD(慢升速降)的控制策略
初始的碼率上升下降路徑是:Normal(Increase) ->Overuse(Increase)->Overuse(Decrease)->UnderUse(Hold)->Normal(Hold)->Normal(Increase)
其中特別關注標紅的UnderUse(Hold),為什么在處於UnderUse情況下還繼續采用Hold策略?此時繼續排包就可以使得排隊隊列被排空,梯度值慢慢回歸到0;
而標黃的UnderUse(Hold)和Overuse(Decrease)至今沒想明白的什么情況會產生??不太清楚是否為一些上個狀態評估的異常而導致的。
至此,基於延遲的帶寬探測原理介紹完畢。
2.6.1.3 webrtc中基於丟包的碼率預估
2.6.2 IncomingPacketFeedbackVector()
在計算得到此前介紹探測碼率(probe_bitrate)和吞吐量(acknowledge_bitrate)后, DelayBasedBwe就可以根據feedback packet去做碼率預估了, DelayBasedBwe 在GoogCcNetworkController::OnTransportPacketsFeedback()中被調用IncomingPacketFeedbackVector(), 由此函數為起點,會根據feedback預估出當前網絡的狀態(normal, underuse, overuse), 然后根據網絡狀態對碼率進行aimd
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
....
DelayBasedBwe::Result result;
result = delay_based_bwe_->IncomingPacketFeedbackVector(
report, acknowledged_bitrate, probe_bitrate, estimate_,
alr_start_time.has_value());
....
}
IncomingPacketFeedbackVector()中會對每一個feedback packet做IncomingPacketFeedback()
DelayBasedBwe::Result DelayBasedBwe::IncomingPacketFeedbackVector(
const TransportPacketsFeedback& msg,
absl::optional<datarate> acked_bitrate,
absl::optional<datarate> probe_bitrate,
absl::optional<networkstateestimate> network_estimate,
bool in_alr) {
RTC_DCHECK_RUNS_SERIALIZED(&network_race_);
auto packet_feedback_vector = msg.SortedByReceiveTime();
// TODO(holmer): An empty feedback vector here likely means that
// all acks were too late and that the send time history had
// timed out. We should reduce the rate when this occurs.
if (packet_feedback_vector.empty()) {
RTC_LOG(LS_WARNING) << "Very late feedback received.";
return DelayBasedBwe::Result();
}
if (!uma_recorded_) {
RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram,
BweNames::kSendSideTransportSeqNum,
BweNames::kBweNamesMax);
uma_recorded_ = true;
}
bool delayed_feedback = true;
bool recovered_from_overuse = false;
BandwidthUsage prev_detector_state = active_delay_detector_->State();
for (const auto& packet_feedback : packet_feedback_vector) {
delayed_feedback = false;
// 每個包做trendline
IncomingPacketFeedback(packet_feedback, msg.feedback_time);
if (prev_detector_state == BandwidthUsage::kBwUnderusing &&
active_delay_detector_->State() == BandwidthUsage::kBwNormal) {
recovered_from_overuse = true;
}
prev_detector_state = active_delay_detector_->State();
}
if (delayed_feedback) {
// TODO(bugs.webrtc.org/10125): Design a better mechanism to safe-guard
// against building very large network queues.
return Result();
}
rate_control_.SetInApplicationLimitedRegion(in_alr);
rate_control_.SetNetworkStateEstimate(network_estimate);
// update
return MaybeUpdateEstimate(acked_bitrate, probe_bitrate,
std::move(network_estimate),
recovered_from_overuse, in_alr, msg.feedback_time);
}
IncomingPacketFeedback()的整個過程如下, 雖然很長, 但是很好懂
void DelayBasedBwe::IncomingPacketFeedback(const PacketResult& packet_feedback,
Timestamp at_time) {
// Reset if the stream has timed out.
if (last_seen_packet_.IsInfinite() ||
at_time - last_seen_packet_ > kStreamTimeOut) {
// 檢測當前包是否和之前的包相隔太遠timeout,是的話對delay_detector做reset
// reset arrival delta 計算器
// 注意此處有兩種arrival delta計算器:InterArrivalDelta和 InterArrival
// 如果啟用WebRTC-Bwe-NewInterArrivalDelta特性,使用InterArrivalDelta
// 否則使用 InterArrival
if (use_new_inter_arrival_delta_) {
video_inter_arrival_delta_ =
std::make_unique<interarrivaldelta>(kSendTimeGroupLength);
audio_inter_arrival_delta_ =
std::make_unique<interarrivaldelta>(kSendTimeGroupLength);
} else {
video_inter_arrival_ = std::make_unique<interarrival>(
kTimestampGroupTicks, kTimestampToMs, true);
audio_inter_arrival_ = std::make_unique<interarrival>(
kTimestampGroupTicks, kTimestampToMs, true);
}
// reset delay detector
video_delay_detector_.reset(
new TrendlineEstimator(key_value_config_, network_state_predictor_));
audio_delay_detector_.reset(
new TrendlineEstimator(key_value_config_, network_state_predictor_));
active_delay_detector_ = video_delay_detector_.get();
}
last_seen_packet_ = at_time;
// As an alternative to ignoring small packets, we can separate audio and
// video packets for overuse detection.
DelayIncreaseDetectorInterface* delay_detector_for_packet =
video_delay_detector_.get();
if (separate_audio_.enabled) {
// 如果將音頻和視頻的分開碼率預估,根據包是audio/video選擇對應的delay_dector
if (packet_feedback.sent_packet.audio) {
delay_detector_for_packet = audio_delay_detector_.get();
audio_packets_since_last_video_++;
if (audio_packets_since_last_video_ > separate_audio_.packet_threshold &&
packet_feedback.receive_time - last_video_packet_recv_time_ >
separate_audio_.time_threshold) {
active_delay_detector_ = audio_delay_detector_.get();
}
} else {
audio_packets_since_last_video_ = 0;
last_video_packet_recv_time_ =
std::max(last_video_packet_recv_time_, packet_feedback.receive_time);
active_delay_detector_ = video_delay_detector_.get();
}
}
DataSize packet_size = packet_feedback.sent_packet.size;
if (use_new_inter_arrival_delta_) {
TimeDelta send_delta = TimeDelta::Zero();
TimeDelta recv_delta = TimeDelta::Zero();
int size_delta = 0;
// 獲取audio/video對應的InterArrivalDelta
InterArrivalDelta* inter_arrival_for_packet =
(separate_audio_.enabled && packet_feedback.sent_packet.audio)
? video_inter_arrival_delta_.get()
: audio_inter_arrival_delta_.get();
// 計算前后兩組包的delta,
bool calculated_deltas = inter_arrival_for_packet->ComputeDeltas(
packet_feedback.sent_packet.send_time, packet_feedback.receive_time,
at_time, packet_size.bytes(), &send_delta, &recv_delta, &size_delta);
// trendline update and estimate
delay_detector_for_packet->Update(
recv_delta.ms(), send_delta.ms(),
packet_feedback.sent_packet.send_time.ms(),
packet_feedback.receive_time.ms(), packet_size.bytes(),
calculated_deltas);
} else {
// 獲取audio/video對應的InterArrivalDelta
InterArrival* inter_arrival_for_packet =
(separate_audio_.enabled && packet_feedback.sent_packet.audio)
? video_inter_arrival_.get()
: audio_inter_arrival_.get();
uint32_t send_time_24bits =
static_cast<uint32_t>(
((static_cast<uint64_t>(packet_feedback.sent_packet.send_time.ms())
<< kAbsSendTimeFraction) +
500) /
1000) &
0x00FFFFFF;
// Shift up send time to use the full 32 bits that inter_arrival works with,
// so wrapping works properly.
uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
uint32_t timestamp_delta = 0;
int64_t recv_delta_ms = 0;
int size_delta = 0;
// 計算前后兩組包 的delta
bool calculated_deltas = inter_arrival_for_packet->ComputeDeltas(
timestamp, packet_feedback.receive_time.ms(), at_time.ms(),
packet_size.bytes(), ×tamp_delta, &recv_delta_ms, &size_delta);
double send_delta_ms =
(1000.0 * timestamp_delta) / (1 << kInterArrivalShift);
delay_detector_for_packet->Update(
recv_delta_ms, send_delta_ms,
packet_feedback.sent_packet.send_time.ms(),
packet_feedback.receive_time.ms(), packet_size.bytes(),
calculated_deltas);
}
}
DelayBasedBwe::IncomingPacketFeedback()中:
-
使用一種名為inter_arrival的對象計算兩組包之間delta, 然后把這種delta放入一個名為delay_detector做trendline
-
函數開頭檢查當前包和上一個包在feedback的時間上是否相隔太遠, 如果是則重置delay_detector
-
如果音頻和視頻是分開做碼率預估,選擇對應的delay_detector,.
-
使用inter_arrival的ComputeDeltas()計算前后兩個組包的發送時刻差(send_delta), 接收時刻差(recv_delta)和大小差(size_delta)
-
使用delay_detector的Update()用delta計算當前網絡的擁堵狀態
2.6.2.1 計算包組延遲差-ComputeDeltas()
ComputeDeltas()的過程很簡單, 就是會為當前的包創建group, 然后記錄好該組包的第一個發送時間(first_send), 大小(size), 最后一個包的接收時間(complete_time); 當發現當前packet和上一個組包在時間上不在同一個組時,就會創建新的組, 同時讓上一個組和上上個組計算各種Delta
bool InterArrivalDelta::ComputeDeltas(Timestamp send_time,
Timestamp arrival_time,
Timestamp system_time,
size_t packet_size,
TimeDelta* send_time_delta,
TimeDelta* arrival_time_delta,
int* packet_size_delta) {
bool calculated_deltas = false;
if (current_timestamp_group_.IsFirstPacket()) {
// We don't have enough data to update the filter, so we store it until we
// have two frames of data to process.
current_timestamp_group_.send_time = send_time;
current_timestamp_group_.first_send_time = send_time;
current_timestamp_group_.first_arrival = arrival_time;
} else if (current_timestamp_group_.first_send_time > send_time) {
// Reordered packet.
return false;
} else if (NewTimestampGroup(arrival_time, send_time)) {
// 通過time判斷是否要新建group,如果要則使用current_group - prev_group得到delta
// First packet of a later send burst, the previous packets sample is ready.
if (prev_timestamp_group_.complete_time.IsFinite()) {
*send_time_delta =
current_timestamp_group_.send_time - prev_timestamp_group_.send_time;
*arrival_time_delta = current_timestamp_group_.complete_time -
prev_timestamp_group_.complete_time;
TimeDelta system_time_delta = current_timestamp_group_.last_system_time -
prev_timestamp_group_.last_system_time;
if (*arrival_time_delta - system_time_delta >=
kArrivalTimeOffsetThreshold) {
RTC_LOG(LS_WARNING)
<< "The arrival time clock offset has changed (diff = "
<< arrival_time_delta->ms() - system_time_delta.ms()
<< " ms), resetting.";
Reset();
return false;
}
if (*arrival_time_delta < TimeDelta::Zero()) {
// The group of packets has been reordered since receiving its local
// arrival timestamp.
++num_consecutive_reordered_packets_;
if (num_consecutive_reordered_packets_ >= kReorderedResetThreshold) {
RTC_LOG(LS_WARNING)
<< "Packets between send burst arrived out of order, resetting."
<< " arrival_time_delta" << arrival_time_delta->ms()
<< " send time delta " << send_time_delta->ms();
Reset();
}
return false;
} else {
num_consecutive_reordered_packets_ = 0;
}
*packet_size_delta = static_cast<int>(current_timestamp_group_.size) -
static_cast<int>(prev_timestamp_group_.size);
calculated_deltas = true;
}
// new and swap group
prev_timestamp_group_ = current_timestamp_group_;
// The new timestamp is now the current frame.
current_timestamp_group_.first_send_time = send_time;
current_timestamp_group_.send_time = send_time;
current_timestamp_group_.first_arrival = arrival_time;
current_timestamp_group_.size = 0;
} else {
current_timestamp_group_.send_time =
std::max(current_timestamp_group_.send_time, send_time);
}
// Accumulate the frame size.
current_timestamp_group_.size += packet_size;
current_timestamp_group_.complete_time = arrival_time;
current_timestamp_group_.last_system_time = system_time;
return calculated_deltas;
}
NewTimestampGroup()判斷需要創建新組的過程如下所示:
// Assumes that |timestamp| is not reordered compared to
// |current_timestamp_group_|.
bool InterArrivalDelta::NewTimestampGroup(Timestamp arrival_time,
Timestamp send_time) const {
if (current_timestamp_group_.IsFirstPacket()) {
return false;
} else if (BelongsToBurst(arrival_time, send_time)) {
return false;
} else {
// 當前包和組首包的發送時刻差不在一個范圍內
return send_time - current_timestamp_group_.first_send_time >
send_time_group_length_;
}
}
/**
* @description: 檢查包的是否同屬於一組
* @param {*}
* @return {*}
*/
bool InterArrivalDelta::BelongsToBurst(Timestamp arrival_time,
Timestamp send_time) const {
RTC_DCHECK(current_timestamp_group_.complete_time.IsFinite());
// 計算和當前組最后包的到達服務端時刻receive time的差
TimeDelta arrival_time_delta =
arrival_time - current_timestamp_group_.complete_time;
// 計算和當前組最后包的發送時刻(send time)的差
TimeDelta send_time_delta = send_time - current_timestamp_group_.send_time;
if (send_time_delta.IsZero())
return true;
// 計算傳播時間差
TimeDelta propagation_delta = arrival_time_delta - send_time_delta;
if (propagation_delta < TimeDelta::Zero() &&
arrival_time_delta <= kBurstDeltaThreshold &&
arrival_time - current_timestamp_group_.first_arrival < kMaxBurstDuration)
// 如果傳播時間差為負(這個點不太理解)
// 並且包和組最后的包到達時間差小於kBurstDeltaThreshold(5ms),
// 並與組首包到達時刻差了100ms內,認為是同一組的包
return true;
return false;
}
可以看到包同屬於一個組就是要求包和包之間的到達時刻在5ms以內,並且以100ms為一個Group.
2.6.2.2 線性回歸計算梯度-trendline::update()
TrendlineEstimator::Update()中使用UpdateTrendline()開始做網絡預估,但只有在得到組包間的delta之后(也就是有新的group生成了,算出上一個group和上上個group的delta)才會開始做trendline, 這里的arrival_time_ms是改組group下最后一個包的receive_time(包到達接受端時刻)
void TrendlineEstimator::Update(double recv_delta_ms,
double send_delta_ms,
int64_t send_time_ms,
int64_t arrival_time_ms,
size_t packet_size,
bool calculated_deltas) {
if (calculated_deltas) {
// 使用trendline
UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
packet_size);
}
if (network_state_predictor_) {
// 未啟用
hypothesis_predicted_ = network_state_predictor_->Update(
send_time_ms, arrival_time_ms, hypothesis_);
}
}
UpdateTrendline()的整個細節如下:
void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
double send_delta_ms,
int64_t send_time_ms,
int64_t arrival_time_ms,
size_t packet_size) {
// 計算排隊延遲
const double delta_ms = recv_delta_ms - send_delta_ms;
++num_of_deltas_;
num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
if (first_arrival_time_ms_ == -1)
first_arrival_time_ms_ = arrival_time_ms;
// Exponential backoff filter.
accumulated_delay_ += delta_ms;
BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
accumulated_delay_);
// 指數濾波器平滑累積網絡delay
smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
(1 - smoothing_coef_) * accumulated_delay_;
BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
smoothed_delay_);
// Maintain packet window
// 以(到達時間,累積延遲)作為點, 將點放入
delay_hist_.emplace_back(
static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
smoothed_delay_, accumulated_delay_);
if (settings_.enable_sort) {
for (size_t i = delay_hist_.size() - 1;
i > 0 &&
delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
--i) {
std::swap(delay_hist_[i], delay_hist_[i - 1]);
}
}
// 點的個數維持在20以內
if (delay_hist_.size() > settings_.window_size)
delay_hist_.pop_front();
// Simple linear regression.
double trend = prev_trend_;
if (delay_hist_.size() == settings_.window_size) {
// 點的個數需達到20
// Update trend_ if it is possible to fit a line to the data. The delay
// trend can be seen as an estimate of (send_rate - capacity)/capacity.
// 0 < trend < 1 -> the delay increases, queues are filling up
// trend == 0 -> the delay does not change
// trend < 0 -> the delay decreases, queues are being emptied
// 開始線性回歸計算斜率
trend = LinearFitSlope(delay_hist_).value_or(trend);
if (settings_.enable_cap) {
// 這個特性不清楚是用來干什么的,其相當於把所有點分成了前后兩個部分
// 從前后兩部分中各選一個最低點,用來計算一個封頂斜率(cap),但卻不知道這樣做的含義
// 由於梯度閾值是向梯度是自適應的,防止梯度過高也可以防止梯度閾值過高,難道是為了這個?
absl::optional<double> cap = ComputeSlopeCap(delay_hist_, settings_);
// We only use the cap to filter out overuse detections, not
// to detect additional underuses.
if (trend >= 0 && cap.has_value() && trend > cap.value()) {
trend = cap.value();
}
}
}
BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
// 將排隊延遲梯度和閾值相比較,判斷當前網絡情況
Detect(trend, send_delta_ms, arrival_time_ms);
}
TrendlineEstimator::UpdateTrendline()中:
-
主要是通過包組間排隊延遲做trendline得到排隊延遲梯度
-
函數首先通過
式2.2.1
計算出排隊延遲差(delta_ms), 計算完成后,通過指數退避平滑得到當前點的延遲smoothed_delay -
以當前包組的最后一個包到達接收端時刻為x軸,以該包組的排隊延遲梯度為y軸生成點, 然后放入delay_hist, delay_hist會將點的個數維持在20個
-
當點個數達到20個的時候,調用 LinearFitSlope()進行線性回歸算出這些點的斜率,也就是排隊延遲梯度
absl::optional<double> LinearFitSlope( const std::deque<trendlineestimator::packettiming>& packets) { RTC_DCHECK(packets.size() >= 2); // Compute the "center of mass". double sum_x = 0; double sum_y = 0; for (const auto& packet : packets) { sum_x += packet.arrival_time_ms; sum_y += packet.smoothed_delay_ms; } double x_avg = sum_x / packets.size(); double y_avg = sum_y / packets.size(); // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2 double numerator = 0; double denominator = 0; for (const auto& packet : packets) { double x = packet.arrival_time_ms; double y = packet.smoothed_delay_ms; numerator += (x - x_avg) * (y - y_avg); denominator += (x - x_avg) * (x - x_avg); } if (denominator == 0) return absl::nullopt; return numerator / denominator; }
-
此處還有一個enable_cap特性,這個特性還沒有啟用,不太清楚是做什么的,其將點分成前后兩個部分(07,819), 通過ComputeSlopeCap()從前后兩個部分取出最低的點計算了一個斜率,以此作為排隊延遲梯度上限,盲猜是為了限制排隊延遲梯度閾值過高,提升overuse判斷及時性,因為梯度閾值會朝着排隊延遲梯度做自適應。
absl::optional<double> ComputeSlopeCap( const std::deque<trendlineestimator::packettiming>& packets, const TrendlineEstimatorSettings& settings) { RTC_DCHECK(1 <= settings.beginning_packets && settings.beginning_packets < packets.size()); RTC_DCHECK(1 <= settings.end_packets && settings.end_packets < packets.size()); RTC_DCHECK(settings.beginning_packets + settings.end_packets <= packets.size()); // 7之前的最低點 TrendlineEstimator::PacketTiming early = packets[0]; for (size_t i = 1; i < settings.beginning_packets; ++i) { if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i]; } // 7以后的最低點 size_t late_start = packets.size() - settings.end_packets; TrendlineEstimator::PacketTiming late = packets[late_start]; for (size_t i = late_start + 1; i < packets.size(); ++i) { if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i]; } if (late.arrival_time_ms - early.arrival_time_ms < 1) { return absl::nullopt; } // 計算最低點和最高點的斜率 return (late.raw_delay_ms - early.raw_delay_ms) / (late.arrival_time_ms - early.arrival_time_ms) + settings.cap_uncertainty; }
-
最終使用Detect()函數,對當前的排隊延遲梯度和閾值相比較,判斷當前的網絡擁塞狀況(overuse, normal, underuse)
2.6.2.3 梯度比較-trendline::Detect()
void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
if (num_of_deltas_ < 2) {
hypothesis_ = BandwidthUsage::kBwNormal;
return;
}
// 此處沒有直接使用trend,而是計算一個kMinNumDeltas(60)個點為單位的延遲梯度縮小誤差
// 下面會和一個閾值(threshold_)做比較,該閾值應該也是基於60個點的情況下算出來的閾值
const double modified_trend =
std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
prev_modified_trend_ = modified_trend;
BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
if (modified_trend > threshold_) {
if (time_over_using_ == -1) {
// Initialize the timer. Assume that we've been
// over-using half of the time since the previous
// sample.
time_over_using_ = ts_delta / 2;
} else {
// Increment timer
time_over_using_ += ts_delta;
}
overuse_counter_++;
if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
// 帶寬過量使用時間超過閾值,reset, 並將hypothesis_設置為overUsing
if (trend >= prev_trend_) {
time_over_using_ = 0;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwOverusing;
}
}
} else if (modified_trend < -threshold_) {
// 小於閾值下限 underusing
time_over_using_ = -1;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwUnderusing;
} else {
// 處於閾值間, normal
time_over_using_ = -1;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwNormal;
}
prev_trend_ = trend;
// 更新排隊梯度閾值
UpdateThreshold(modified_trend, now_ms);
}
TrendlineEstimator::Detect()中:
- 通過排隊延遲梯度和梯度閾值做比較判斷當前的網絡情況
- 對排隊延遲梯度的單位做了更改,改成了以60個點為單位排隊延遲梯度, 並由於字面值太小所以乘上了一個增益(threshold_gain_), 最終得到modified_trend, 下面比較的threshold應該是基於此做實驗測得的值
- 調用UpdateThreshold()更新梯度閾值
void TrendlineEstimator::UpdateThreshold(double modified_trend,
int64_t now_ms) {
if (last_update_ms_ == -1)
last_update_ms_ = now_ms;
if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
// Avoid adapting the threshold to big latency spikes, caused e.g.,
// by a sudden capacity drop.
last_update_ms_ = now_ms;
return;
}
// 排隊梯度閾值向當前排隊梯度趨近
// modified_trend < threshold_? k =0.039 : k = 0.0087
const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
// 添加時間考慮及時性
const int64_t kMaxTimeDeltaMs = 100;
int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
// 指數平滑自適應
threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
last_update_ms_ = now_ms;
}
至此就完成了從包組計算排隊延遲差-> 排隊延遲差進行trendline得到排隊延遲梯度-> 排隊延遲梯度和排隊延遲閾值比較得到當前網絡狀況(normal、underuse、ovreuse) -> 梯度閾值自適應, 接下來就是進行aimd的碼率調整, 讓我們回到DelayBasedBwe::Result DelayBasedBwe::IncomingPacketFeedbackVector()
中的最后,其調用了MaybeUpdateEstimate()
開始做碼率的aimd
2.6.2.4 碼率調整-MaybeUpdateEstimate()
碼率調整的過程如下
DelayBasedBwe::Result DelayBasedBwe::MaybeUpdateEstimate(
absl::optional<datarate> acked_bitrate,
absl::optional<datarate> probe_bitrate,
absl::optional<networkstateestimate> state_estimate,
bool recovered_from_overuse,
bool in_alr,
Timestamp at_time) {
Result result;
// Currently overusing the bandwidth.
if (active_delay_detector_->State() == BandwidthUsage::kBwOverusing) {
//bw_state: overusing
//
if (has_once_detected_overuse_ && in_alr && alr_limited_backoff_enabled_) {
// ALR下overuse將碼率過程比較特殊,這里是為了解決一個issue:10144
// 大意也就是說因ALR無法發送足夠的碼率使得探測帶寬上升,完全是依賴周期性的alr probe
// 當出現overuse假峰值時,不能像正常一樣使用ack_bitrate做AIMD,否則會使得碼率遠低於estimate_bitrate
// ack rate此時不太可信,所以使用estimate_bitrate去預估然后迅速發一個probe去判斷
// 預估的碼率是否准確
if (rate_control_.TimeToReduceFurther(at_time, prev_bitrate_)) {
// 到了調整碼率的時候了
result.updated =
UpdateEstimate(at_time, prev_bitrate_, &result.target_bitrate);
result.backoff_in_alr = true;
}
} else if (acked_bitrate &&
rate_control_.TimeToReduceFurther(at_time, *acked_bitrate)) {
// 正常情況下,有ack_bitrate,直接使用它做decrease
result.updated =
UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
} else if (!acked_bitrate && rate_control_.ValidEstimate() &&
rate_control_.InitialTimeToReduceFurther(at_time)) {
//ack_bitrate沒有測出來,單純的用時間去判斷能否再次降低碼率
// Overusing before we have a measured acknowledged bitrate. Reduce send
// rate by 50% every 200 ms.
// TODO(tschumim): Improve this and/or the acknowledged bitrate estimator
// so that we (almost) always have a bitrate estimate.
// 直接將預估碼率腰斬至50%
rate_control_.SetEstimate(rate_control_.LatestEstimate() / 2, at_time);
result.updated = true;
result.probe = false;
result.target_bitrate = rate_control_.LatestEstimate();
}
has_once_detected_overuse_ = true;
} else {
//bw_state: normal 和 under using
if (probe_bitrate) {
// 如果有探測碼率,無須慢增長,直接使用探測碼率,並且把探測碼率的數據更新到rate_control中
result.probe = true;
result.updated = true;
result.target_bitrate = *probe_bitrate;
rate_control_.SetEstimate(*probe_bitrate, at_time);
} else {
// 沒有的話,就只能進行rate調整了
result.updated =
UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
result.recovered_from_overuse = recovered_from_overuse;
}
}
BandwidthUsage detector_state = active_delay_detector_->State();
if ((result.updated && prev_bitrate_ != result.target_bitrate) ||
detector_state != prev_state_) {
DataRate bitrate = result.updated ? result.target_bitrate : prev_bitrate_;
BWE_TEST_LOGGING_PLOT(1, "target_bitrate_bps", at_time.ms(), bitrate.bps());
if (event_log_) {
event_log_->Log(std::make_unique<rtceventbweupdatedelaybased>(
bitrate.bps(), detector_state));
}
// 記錄之前的預估碼率和state
prev_bitrate_ = bitrate;
prev_state_ = detector_state;
}
return result;
}
DelayBasedBwe::MaybeUpdateEstimate()中:
- 獲取當前的網絡狀態,如果是overuse並且當前處於alr下,則認為ALR無法發送足夠的碼率使得探測帶寬上升,當出現overuse假峰值時,不能像正常一樣使用ack_bitrate做AIMD,否則會使得碼率遠低於estimate_bitrate,ack rate此時不太可信,所以使用estimate_bitrate去預估然后迅速發一個probe去判斷預估的碼率是否准確;
- 如果是正常情況下的overuse, 如果有吞吐量(acknowledge),直接基於當前的吞吐量(acknowledge)做碼率減少;如果沒有吞吐量,說明處於最開始的探測階段,rate_control中只有一個初始預設的碼率,這個碼率直接縮小50%作為調整的碼率
- 如果是normal或者underuse的情況下,有probe_bitrate就用probe_bitrate做碼率恢復,應該是認為瞬時性的它更准確,否則的話就用當前吞吐量
- 將預測完的碼率bitrate中進行更新,將結果返回
碼率更新的結果如下所示:
bool DelayBasedBwe::UpdateEstimate(Timestamp at_time,
absl::optional<datarate> acked_bitrate,
DataRate* target_rate) {
const RateControlInput input(active_delay_detector_->State(), acked_bitrate);
*target_rate = rate_control_.Update(&input, at_time);
return rate_control_.ValidEstimate();
}
Update()中會檢查aimd 是否設置了初始碼率,然后調用ChangeBitrate()進行碼率調整
DataRate AimdRateControl::Update(const RateControlInput* input,
Timestamp at_time) {
RTC_CHECK(input);
// Set the initial bit rate value to what we're receiving the first half
// second.
// TODO(bugs.webrtc.org/9379): The comment above doesn't match to the code.
// aimd的初始碼率未設置
if (!bitrate_is_initialized_) {
const TimeDelta kInitializationTime = TimeDelta::Seconds(5);
RTC_DCHECK_LE(kBitrateWindowMs, kInitializationTime.ms());
if (time_first_throughput_estimate_.IsInfinite()) {
// 記錄第一個發送碼率到達的時間
if (input->estimated_throughput)
time_first_throughput_estimate_ = at_time;
} else if (at_time - time_first_throughput_estimate_ >
kInitializationTime &&
input->estimated_throughput) {
// 第N個發送碼率時間和第一個相差大於kInitializationTime(5s),
// 超過了初始化碼率的時間,把當前的發送碼率當作當前碼率
current_bitrate_ = *input->estimated_throughput;
bitrate_is_initialized_ = true;
}
}
// 調整碼率
ChangeBitrate(*input, at_time);
return current_bitrate_;
}
ChangeBitrate()中會基於網絡擁堵狀態進行2.6.1.2中所述的狀態機輪轉判斷,基於當前吞吐量增加減少碼率
/**
* @description: 根據當前吞吐量和bw_state進行aimd,調整碼率
* @param {*}
* @return {*}
*/
void AimdRateControl::ChangeBitrate(const RateControlInput& input,
Timestamp at_time) {
absl::optional<datarate> new_bitrate;
DataRate estimated_throughput =
input.estimated_throughput.value_or(latest_estimated_throughput_);
if (input.estimated_throughput)
latest_estimated_throughput_ = *input.estimated_throughput;
// bitrate_is_initialized_表示的是current_bitrate_是否被初始化
// current_bitrate_被初始化的場景有兩個:
// 1.初始化時,外部調用SetStartBitrate()初始化current_bitrate_
// 2.第一探測到overusing,到達兩路最大容量,用這個值去初始化aimud的碼率
// 這個判斷表示的是current_bitrate_還未被設置,所以不做normal和increase,直接返回
// 或者檢測到kBwOverusing發現鏈路最大容量 ,要對對current_bitrate_進行初始化,並降低碼率
if (!bitrate_is_initialized_ &&
input.bw_state != BandwidthUsage::kBwOverusing)
return;
// 根據狀態機輪轉,判斷當前的碼率控制狀態rate_control_state是(increase。 hold,decrease)
ChangeState(input, at_time);
// We limit the new bitrate based on the troughput to avoid unlimited bitrate
// increases. We allow a bit more lag at very low rates to not too easily get
// stuck if the encoder produces uneven outputs.
// 將新的碼率基於吞吐量(throughput)*1.5 去避免無限增加
// 在低碼率的時候,允許一定的滯后性,為了避免編碼器輸出的碼率波動導致的頻繁阻塞
const DataRate troughput_based_limit =
1.5 * estimated_throughput + DataRate::KilobitsPerSec(10);
switch (rate_control_state_) {
case RateControlState::kRcHold:
break;
case RateControlState::kRcIncrease:
// 如果當前吞吐量大大超過了鏈路容積(link_capacity_),預估的鏈路容積已經不准確,需要重新估計,
// 鏈路容積差不多過去min(吞吐量, 估計碼率)的指數平均
if (estimated_throughput > link_capacity_.UpperBound())
link_capacity_.Reset();
// 如果是alr狀態,不增長碼率,因為無法有足夠的碼率去探測增長后的碼率是否正確
// 如果早先因為使用probe bitrate作為吞吐量使得預測碼率增長超過當前的輸入吞吐量閾值
// 並且當前仍處於increase狀態,則說明早先增長的碼率是正確的,不必根據當前吞吐量做increase
if (current_bitrate_ < troughput_based_limit &&
!(send_side_ && in_alr_ && no_bitrate_increase_in_alr_)) {
DataRate increased_bitrate = DataRate::MinusInfinity();
if (link_capacity_.has_estimate()) {
// link_capacity estimate沒有在上面被重置說明測量的吞吐量和預估鏈路相差的並不太大
// 這個時候使用加性增長即可
// 計算出要加性增長的碼率
DataRate additive_increase =
AdditiveRateIncrease(at_time, time_last_bitrate_change_);
// 把碼率和當前碼率加起來
increased_bitrate = current_bitrate_ + additive_increase;
} else {
// 否則的話, link capacity被重置了,預估的鏈路不准確,
// 對當前的碼率 current_bitrate_做乘性增長
// 計算乘性增長下要增長的碼率
DataRate multiplicative_increase = MultiplicativeRateIncrease(
at_time, time_last_bitrate_change_, current_bitrate_);
// 計算增長后的碼率
increased_bitrate = current_bitrate_ + multiplicative_increase;
}
// 將被調節后的碼率和吞吐量閾值比較,不能超過吞吐量閾值
new_bitrate = std::min(increased_bitrate, troughput_based_limit);
}
time_last_bitrate_change_ = at_time;
break;
case RateControlState::kRcDecrease: {
DataRate decreased_bitrate = DataRate::PlusInfinity();
// 碼率降低,直接使用當前的吞吐量 * beta_(0.85)
decreased_bitrate = estimated_throughput * beta_;
if (decreased_bitrate > current_bitrate_ && !link_capacity_fix_) {
// TODO(terelius): The link_capacity estimate may be based on old
// throughput measurements. Relying on them may lead to unnecessary
// BWE drops.
// 當前吞吐量 * 0.85仍然大於目標碼率,使用過去的吞吐量link_capacity_做減少
// 但歷史吞吐量可能不及時太小,可能導致不必要的drop
if (link_capacity_.has_estimate()) {
decreased_bitrate = beta_ * link_capacity_.estimate();
}
}
if (estimate_bounded_backoff_ && network_estimate_) {
// 開啟了 使用預估下邊界碼率減退(estimate_bounded_backoff)
// 在(estimated_throughput * beta_, estimate_lower_bound *beta)中選最大
// 前者是測出來的acknowlege 吞吐量,后者是其它estimator探測出來的下邊界(這塊隨着network_estimator未啟用而未啟用)
decreased_bitrate = std::max(
decreased_bitrate, network_estimate_->link_capacity_lower * beta_);
}
// Avoid increasing the rate when over-using.
if (decreased_bitrate < current_bitrate_) {
new_bitrate = decreased_bitrate;
}
// 記錄本次減少增量:last_decrease_
if (bitrate_is_initialized_ && estimated_throughput < current_bitrate_) {
if (!new_bitrate.has_value()) {
last_decrease_ = DataRate::Zero();
} else {
last_decrease_ = current_bitrate_ - *new_bitrate;
}
}
if (estimated_throughput < link_capacity_.LowerBound()) {
// The current throughput is far from the estimated link capacity. Clear
// the estimate to allow an immediate update in OnOveruseDetected.
// 吞吐量比鏈路估計下限小太多,波動很大,link_capacity要reset
// 因為link_capacity內部是一個使用estimated_throughput的指數平滑,
// 差太多的時候繼續做指數平滑被認為沒有意義
link_capacity_.Reset();
}
// 當碼率第一次下下降后認為已經探測到容量上限了,直接設置initialized_的狀態
bitrate_is_initialized_ = true;
// 使用estimated_throughput做指數平滑
link_capacity_.OnOveruseDetected(estimated_throughput);
// Stay on hold until the pipes are cleared.
rate_control_state_ = RateControlState::kRcHold;
time_last_bitrate_change_ = at_time;
time_last_bitrate_decrease_ = at_time;
break;
}
default:
assert(false);
}
// new_bitrate 夾逼在[min_configured_bitrate_, upper_bound]中
current_bitrate_ = ClampBitrate(new_bitrate.value_or(current_bitrate_));
}
AimdRateControl::ChangeBitrate()中:
-
首先會獲取輸入的當前吞吐量, 這個入參吞吐量在前面已經知道,並不就一定是當前實際的吞吐量,可能由於各種情況是上一個預測碼率,或者是探測碼率
-
檢查rate controller設置初始化碼率,如果沒有,除非遇到overusing的狀態,認為檢測到鏈路的碼率了,使用它作為初始化碼率,否則在normal和underuse的情況下不處理
-
調用ChangeState()根據輸入的網絡狀態進行狀態機輪轉,判斷當前要對碼率做(hold, decrease, increase)
void AimdRateControl::ChangeState(const RateControlInput& input,
Timestamp at_time) {
switch (input.bw_state) {
case BandwidthUsage::kBwNormal:
if (rate_control_state_ == RateControlState::kRcHold) {
time_last_bitrate_change_ = at_time;
rate_control_state_ = RateControlState::kRcIncrease;
}
break;
case BandwidthUsage::kBwOverusing:
if (rate_control_state_ != RateControlState::kRcDecrease) {
rate_control_state_ = RateControlState::kRcDecrease;
}
break;
case BandwidthUsage::kBwUnderusing:
rate_control_state_ = RateControlState::kRcHold;
break;
default:
assert(false);
}
}
-
基於當前的吞吐量,計算一個吞吐量閾值(troughput_based_limit),防止吞吐量無限增加
-
如果是Increase, 首先判斷鏈路容量(歷史吞吐量的指數平滑)和吞吐量閾值是否相差太大,如果是則對鏈路容量進行重設,並判斷是否處於alr狀態,或者早先因為別的原因已經提升過碼率超過當前吞吐量閾值,如果是則不對碼率做increase, 否則的話,則檢查鏈路容量(link_capacity)是否有值,如果有,說明吞吐量和鏈路容量相差不大,則使用AdditiveRateIncrease()對當前碼率做加性增加,否則使用MultiplicativeRateIncrease()做乘性增加
- 如果是decrease, 直接將當前吞吐量 * 0.85作為新碼率,但該碼率可能仍大於上一個調整后的碼率,如果有這種情況出現,則使用鏈路容量(link_capacity) * 0.85作為新碼率, 如果開啟了使用預估下邊界碼率減退(estimate_bounded_backoff), 則下降的碼率為min(decreased_bitrate, network_estimate_->link_capacity_lower * beta_), 計算完成后將bitrate_is_initialized設置為true,標識着已經完成初始化
其中,關於加性增長AdditiveRateIncrease()是在草案中規定,詳見此處,概括為就是要在當前的碼率下多發一個包要增加多少碼率,詳見下:
DataRate AimdRateControl::AdditiveRateIncrease(Timestamp at_time,
Timestamp last_time) const {
// 計算上次碼率到現在的時間間隔
double time_period_seconds = (at_time - last_time).seconds<double>();
// 計算一個response_time中增加一個packet每秒需要增加多少碼率
// 乘以interval 算出最終要增加的碼率
double data_rate_increase_bps =
GetNearMaxIncreaseRateBpsPerSecond() * time_period_seconds;
return DataRate::BitsPerSec(data_rate_increase_bps);
}
/**
* @description: 計算一個response time下增加一個包需要增加多少碼率
* 詳情可見: https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#page-10
* response time被定義為網絡中的一個請求到服務端處理到回來的整個的時間
* @param {*}
* @return {*}
*/
double AimdRateControl::GetNearMaxIncreaseRateBpsPerSecond() const {
// 計算幀率對應當前碼率(current_bitrate_)下,一個packet的大小是多少
RTC_DCHECK(!current_bitrate_.IsZero());
const TimeDelta kFrameInterval = TimeDelta::Seconds(1) / 30; //fps 為30
DataSize frame_size = current_bitrate_ * kFrameInterval; //按照當前碼率和幀間隔,計算一幀的大小
const DataSize kPacketSize = DataSize::Bytes(1200); //設一個packet大小為1200
double packets_per_frame = std::ceil(frame_size / kPacketSize);//計算一幀中有多少個packet
DataSize avg_packet_size = frame_size / packets_per_frame; // 計算一個packet有多大
// Approximate the over-use estimator delay to 100 ms.
// rtt_默認為200ms,但會通過接收到的RTCP報文使用接口setRtt()做修改
TimeDelta response_time = rtt_ + TimeDelta::Millis(100);
if (in_experiment_)
response_time = response_time * 2;
// 每秒要增加的碼率 packet_size / response_time
double increase_rate_bps_per_second =
(avg_packet_size / response_time).bps<double>();
double kMinIncreaseRateBpsPerSecond = 4000;
// 最少要增長4kb/s碼率
return std::max(kMinIncreaseRateBpsPerSecond, increase_rate_bps_per_second);
}
而乘性增加MultiplicativeRateIncrease()就很好理解,就是上一個預估碼率乘以一個系數alpha(1.08)
DataRate AimdRateControl::MultiplicativeRateIncrease(
Timestamp at_time,
Timestamp last_time,
DataRate current_bitrate) const {
double alpha = 1.08;
if (last_time.IsFinite()) {
auto time_since_last_update = at_time - last_time;
alpha = pow(alpha, std::min(time_since_last_update.seconds<double>(), 1.0));
}
DataRate multiplicative_increase =
std::max(current_bitrate * (alpha - 1.0), DataRate::BitsPerSec(1000));
return multiplicative_increase;
}
至此,基於延遲的碼率預估介紹完畢
2.7 基於丟包碼率預估-SendSideBandwidthEstimation
SendSideBandwidthEstimation主要是基於丟包率對當前碼率進行預估的,其丟包控制如2.7.1所述,也差不多是接近最終碼率調整的最后一環
2.7.1 webrtc中基於丟包的碼率預估
基於丟包的碼率預估非常簡單, 如下式,當丟包率小於0.02的時候,在上一個碼率上乘以1.08,丟包率小於0.1時保持當前碼率,當丟包率大於0.1時乘以(1 - 0.5 * loss)
2.7.2 統計更新丟包率-UpdatePacketsLost()
每當收到cc-feedback或者收到RR-report的時候就能統計出丟包率,在cc-controller中就會調用SendSideBandwidthEstimation::UpdatePacketsLost()
去更新丟包率,同時進行碼率預估
void SendSideBandwidthEstimation::UpdatePacketsLost(int64_t packets_lost,
int64_t number_of_packets,
Timestamp at_time) {
last_loss_feedback_ = at_time;
if (first_report_time_.IsInfinite())
first_report_time_ = at_time;
// Check sequence number diff and weight loss report
if (number_of_packets > 0) {
int64_t expected =
expected_packets_since_last_loss_update_ + number_of_packets;
// Don't generate a loss rate until it can be based on enough packets.
// 丟包統計的總報數不能小於20,否則不更新丟包率
if (expected < kLimitNumPackets) {
// Accumulate reports.
expected_packets_since_last_loss_update_ = expected; // 總包數
lost_packets_since_last_loss_update_ += packets_lost; // 丟包數
return;
}
has_decreased_since_last_fraction_loss_ = false;
// 怕太小了,所以乘了個256
int64_t lost_q8 = (lost_packets_since_last_loss_update_ + packets_lost)
<< 8;
// 計算當前fraction的丟包率
last_fraction_loss_ = std::min<int>(lost_q8 / expected, 255);
// Reset accumulators.
lost_packets_since_last_loss_update_ = 0;
expected_packets_since_last_loss_update_ = 0;
last_loss_packet_report_ = at_time;
// 更新目標碼率
UpdateEstimate(at_time);
}
UpdateUmaStatsPacketsLost(at_time, packets_lost);
}
SendSideBandwidthEstimation::UpdatePacketsLost()中:
- 根據總包數和丟包數計更新了當前的丟包率(last_fraction_loss), 該丟包率經過256倍放大
- 調用UpdateEstimate()根據丟包率更新目標碼率
2.7.3 基於丟包更新碼率-UpdateEstimate()
void SendSideBandwidthEstimation::UpdateEstimate(Timestamp at_time) {
// 此處函數調用有兩個地方:
// 1.是incoming feedback
// 2. rtptransportcontroller有一個processInterval()會定時將pacer中的size傳過來做update
if (rtt_backoff_.CorrectedRtt(at_time) > rtt_backoff_.rtt_limit_) {
// 當前時刻下預估rtt的最大時間超過了設置的rtt最大時間,認為網絡擁堵
// 直接進行更新,下調碼率
if (at_time - time_last_decrease_ >= rtt_backoff_.drop_interval_ &&
current_target_ > rtt_backoff_.bandwidth_floor_) {
// 如果當前時刻超過了drop_interval且目標碼率不低於最低,可以下降,則下降
time_last_decrease_ = at_time;
// 下降目標碼率
// new_bitrate = max(current_target * 0.8, 5kb)
DataRate new_bitrate =
std::max(current_target_ * rtt_backoff_.drop_fraction_,
rtt_backoff_.bandwidth_floor_.Get());
// link_capacity_更新
link_capacity_.OnRttBackoff(new_bitrate, at_time);
// 更新目標碼率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
// TODO(srte): This is likely redundant in most cases.
// 什么情況下target會被更新?
ApplyTargetLimits(at_time);
return;
}
// We trust the REMB and/or delay-based estimate during the first 2 seconds if
// we haven't had any packet loss reported, to allow startup bitrate probing.
// 最初的2s沒有packet loss report,相信remb或者延遲碼率的估算結果
if (last_fraction_loss_ == 0 && IsInStartPhase(at_time)) {
DataRate new_bitrate = current_target_;
// TODO(srte): We should not allow the new_bitrate to be larger than the
// receiver limit here.
// 下面兩者正如注釋所言應該還是要改的,讓他們不能比receiver_limit_/delay_based_limit_ 大
if (receiver_limit_.IsFinite())
new_bitrate = std::max(receiver_limit_, new_bitrate); //remb
if (delay_based_limit_.IsFinite())
new_bitrate = std::max(delay_based_limit_, new_bitrate);//trendline
if (loss_based_bandwidth_estimation_.Enabled()) {
// 啟用丟包預估,用new_bitrate做初始化
loss_based_bandwidth_estimation_.Initialize(new_bitrate);
}
// new_bitrate被delay_based_limit_(延遲預估)更新了,清空min_bitrate_history后將其記錄
if (new_bitrate != current_target_) {
// 清空min_bitrate_history
min_bitrate_history_.clear();
if (loss_based_bandwidth_estimation_.Enabled()) {
// 啟用了丟包預估,上面loss_based_bandwidth_estimation_使用了new_bitrate做了初始化
// 即使用new_bitrate, bitrate_history記錄它
min_bitrate_history_.push_back(std::make_pair(at_time, new_bitrate));
} else {
// 沒啟用丟包探測, bitrate_history記錄current_target_,這就不太理解了
min_bitrate_history_.push_back(
std::make_pair(at_time, current_target_));
}
// 更新目標碼率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
}
// 對min_history進行更新,讓front元素為bwe最開始incrase時的碼率
UpdateMinHistory(at_time);
if (last_loss_packet_report_.IsInfinite()) {
// No feedback received.
// TODO(srte): This is likely redundant in most cases.
ApplyTargetLimits(at_time);
return;
}
if (loss_based_bandwidth_estimation_.InUse()) {
// 根據丟包和延遲預估碼率,估算一個新碼率
DataRate new_bitrate = loss_based_bandwidth_estimation_.Update(
at_time, min_bitrate_history_.front().second, delay_based_limit_,
last_round_trip_time_);
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
// 計算上個feedback到當前feedback的時間差
TimeDelta time_since_loss_packet_report = at_time - last_loss_packet_report_;
// 處於當前loss_packet_report的有效范圍內,使用該report對應的丟包率last_fraction_loss_進行更新
if (time_since_loss_packet_report < 1.2 * kMaxRtcpFeedbackInterval) {
// We only care about loss above a given bitrate threshold.
// last_fraction_loss_之前擴大了256,現在只是縮減回去
float loss = last_fraction_loss_ / 256.0f;
// We only make decisions based on loss when the bitrate is above a
// threshold. This is a crude way of handling loss which is uncorrelated
// to congestion.
if (current_target_ < bitrate_threshold_ || loss <= low_loss_threshold_) {
// 當前目標碼率小於閾值, 並且loss < 2%時,對當前目標碼率(current_target)進行8%的增長
// kBweIncreaseInterval.
// Note that by remembering the bitrate over the last second one can
// rampup up one second faster than if only allowed to start ramping
// at 8% per second rate now. E.g.:
// If sending a constant 100kbps it can rampup immediately to 108kbps
// whenever a receiver report is received with lower packet loss.
// If instead one would do: current_target_ *= 1.08^(delta time),
// it would take over one second since the lower packet loss to achieve
// 108kbps.
// 這里解釋一下,碼率的8%的增長居然考慮了(delta time), 當考慮到時間就有下面兩種增長方式了
// 1. delta_ms * 1.08, 2. 1.08^(delta_ms)
// jonas認為第二種增長慢,對於100kb/s將會使用超過一秒的時間才能增長到108
// 但我覺得這里犯了一個數學錯誤: 比如1秒內收到兩個report結果是: 1.08^0.5 * 1.08 ^ 0.5 = 1.08
// 似乎沒區別, jonas可能把它當成了 1.08^0.5^0.5? 不至於呀,不理解
DataRate new_bitrate = DataRate::BitsPerSec(
min_bitrate_history_.front().second.bps() * 1.08 + 0.5);
// Add 1 kbps extra, just to make sure that we do not get stuck
// (gives a little extra increase at low rates, negligible at higher
// rates).
// 增加額外的1kbs確保不會停滯
new_bitrate += DataRate::BitsPerSec(1000);
// 更新當前目標碼率
UpdateTargetBitrate(new_bitrate, at_time);
return;
} else if (current_target_ > bitrate_threshold_) {
// 當前設定的目標碼率大於設定閾值了
if (loss <= high_loss_threshold_) {
// Loss between 2% - 10%: Do nothing.
} else {
// Loss > 10%: Limit the rate decreases to once a kBweDecreaseInterval
// + rtt.
if (!has_decreased_since_last_fraction_loss_ &&
(at_time - time_last_decrease_) >=
(kBweDecreaseInterval + last_round_trip_time_)) {
time_last_decrease_ = at_time;
// 降低碼率:
// current_target_ = current_target_ * (1 -0.5 * lossrate)
// 下面的512是因為last_fraction_loss進行了256擴大
DataRate new_bitrate = DataRate::BitsPerSec(
(current_target_.bps() *
static_cast<double>(512 - last_fraction_loss_)) /
512.0);
has_decreased_since_last_fraction_loss_ = true;
// 更新當前目標碼率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
}
}
}
// TODO(srte): This is likely redundant in most cases.
ApplyTargetLimits(at_time);
}
SendSideBandwidthEstimation::UpdateEstimate()中:
-
主要是通過丟包率(last_fraction_loss)來和2.7.1調整碼率
-
因為該函數也會在無feedback的時候被定時調用去更新碼率,所以在開頭使用rtt_backoff.CorrectedRtt()預估了一個當前的rtt時間,判斷當前是否存在rtt超時在沒有feedback的情況下調用該函數, 如果是則直接下調碼率, 下調的方式為max(current_target * 0.8, 5kb),下調完成后使用UpdateTargetBitrate()更新目標碼率,預估rtt的方式如下:
/** * @description: 假設at_time為當前的rtt_feedback的時間,預估一個當前的可能的rtt時間 * 原理上,根據上一個rtt時間 + 最后一個發送包組可能的首包發送時間預估出一個當前的可能rtt * @param {at_time} 當前時刻 * @return {*} */ TimeDelta RttBasedBackoff::CorrectedRtt(Timestamp at_time) const { // 計算從最近更新上一個rtt到現在的時間 TimeDelta time_since_rtt = at_time - last_propagation_rtt_update_; // TimeDelta timeout_correction = time_since_rtt; // Avoid timeout when no packets are being sent. // 計算當前時間到最后一個包的發送時間 TimeDelta time_since_packet_sent = at_time - last_packet_sent_; // 計算一個首包到最后一個包的發送時間,實際上是(last_packet_sent_ - last_propagation_rtt_update_) timeout_correction = std::max(time_since_rtt - time_since_packet_sent, TimeDelta::Zero()); return timeout_correction + last_propagation_rtt_; }
-
接下來判斷是否是最初階段,如果是並且存在remb預估得到的碼率(receiver_limit)或者trendline預估得到的碼率(delay_based_limit),選擇其中的最大者;
-
維護一個存儲着<時間,碼率>的隊列(min_bitrate_history),隊列頭是1s前時刻下的最低碼率
void SendSideBandwidthEstimation::UpdateMinHistory(Timestamp at_time) { // 隊列 o: old。 n:new //queue: n、n、n、n、o、o、o、o、o // 壓縮到1s: n、n、n、n、o // 將back端大於target的給移除后插入: curent_target、n、o // Remove old data points from history. // Since history precision is in ms, add one so it is able to increase // bitrate if it is off by as little as 0.5ms. // 將隊列窗口的長度設置在一個1000ms區間內,這是一個可能的bweincrease范圍 while (!min_bitrate_history_.empty() && at_time - min_bitrate_history_.front().first + TimeDelta::Millis(1) > kBweIncreaseInterval) { min_bitrate_history_.pop_front(); } // Typical minimum sliding-window algorithm: Pop values higher than current // bitrate before pushing it. // 將隊列back端大於current_target的都給remove掉,刪除式插入排序? while (!min_bitrate_history_.empty() && current_target_ <= min_bitrate_history_.back().second) { min_bitrate_history_.pop_back(); } min_bitrate_history_.push_back(std::make_pair(at_time, current_target_)); }
-
然后開始判斷,當丟包率<2%的時候,開始以8%的速度增加碼率,碼率的8%的增長考慮了兩個feedback的始檢查, 使用last_bitrate * delta_ms * 1.08的方式,然后調用UpdateTargetBitrate()更新碼率
-
當丟包率2% ~10%的時候繼續保持當前碼率,當丟包率大於10%的時候開始降低碼率,降低的方式為bitrate * (1 -0.5 * lossrate), 然后調用UpdateTargetBitrate()更新碼率
2.7.4 更新目標碼率-UpdateTargetBitrate()
DataRate SendSideBandwidthEstimation::GetUpperLimit() const {
// upper_limit為min[delay_based_limit_, receiver_limit_]
DataRate upper_limit = delay_based_limit_;
if (!receiver_limit_caps_only_)
upper_limit = std::min(upper_limit, receiver_limit_);
upper_limit = std::min(upper_limit, max_bitrate_configured_);
return upper_limit;
}
void SendSideBandwidthEstimation::UpdateTargetBitrate(DataRate new_bitrate,
Timestamp at_time) {
// 新碼率不能大於延遲預估碼率(delay_based_limit_)
new_bitrate = std::min(new_bitrate, GetUpperLimit());
if (new_bitrate < min_bitrate_configured_) {
// 設置了最小目標碼率,使用最小目標碼率
MaybeLogLowBitrateWarning(new_bitrate, at_time);
new_bitrate = min_bitrate_configured_;
}
// 更新當前目標碼率
current_target_ = new_bitrate;
MaybeLogLossBasedEvent(at_time);
// 更新鏈路預估容量capacity_estimate_bps_
link_capacity_.OnRateUpdate(acknowledged_rate_, current_target_, at_time);
}
此處目標碼率的更新有細節,在UpdateTargetBitrate()的開頭會對new_bitrata進行一個限制,要求其不能大於延遲預估碼率,其次不能小於配置最小的碼率(默認為5kb) ,這個最小值會被設置調整,但是看到這里的都明白webrtc只能支持10%以下的丟包,一旦丟包在10%以上持續,碼率會越來越小,直到到達這個最小值,對於一些弱網情況,如wifi穿牆,信號差等,丟包率10%以上是很正常的,但是此時卻遠遠沒有達到鏈路的最大負載,不太清楚webrtc目前為何做這樣的設計,也是一個優化點
2.8 碼率窗口控制器-CongestionWindowPushbackController
這個類使用了一個擁塞窗口,對目標碼率進一步調整升降,其核心非常簡單: 就是統計一段時間內發送窗口的使用率來決定是應該提升碼率還是降低碼率,如下所示:
uint32_t CongestionWindowPushbackController::UpdateTargetBitrate(
uint32_t bitrate_bps) {
if (!current_data_window_ || current_data_window_->IsZero())
return bitrate_bps;
int64_t total_bytes = outstanding_bytes_;
if (add_pacing_)
total_bytes += pacing_bytes_;
// 計算窗口使用比例
double fill_ratio =
total_bytes / static_cast<double>(current_data_window_->bytes());
if (fill_ratio > 1.5) {
encoding_rate_ratio_ *= 0.9;
} else if (fill_ratio > 1) {
encoding_rate_ratio_ *= 0.95;
} else if (fill_ratio < 0.1) {
encoding_rate_ratio_ = 1.0;
} else {
encoding_rate_ratio_ *= 1.05;
encoding_rate_ratio_ = std::min(encoding_rate_ratio_, 1.0);
}
// 重新調整碼率
uint32_t adjusted_target_bitrate_bps =
static_cast<uint32_t>(bitrate_bps * encoding_rate_ratio_);
// Do not adjust below the minimum pushback bitrate but do obey if the
// original estimate is below it.
// 不要比最小值小
bitrate_bps = adjusted_target_bitrate_bps < min_pushback_target_bitrate_bps_
? std::min(bitrate_bps, min_pushback_target_bitrate_bps_)
: adjusted_target_bitrate_bps;
return bitrate_bps;
}
CongestionWindowPushbackController::UpdateTargetBitrate()中:
- 窗口使用率大於1.5倍時,編碼碼率*0.9; 大於1倍時,乘以0.95;小於0.1時候,編碼碼率重置為1.0;當處於[0.1 ,1]時,編碼碼率乘以1.05逐步緩升,不超過1.0
- 窗口使用比例的計算,使用的total_bytes,這個值是當前pacer queue的大小 + 已經發往網絡(還未到達)數據的大小,而數據窗口的大小是在cc-controller中做的, 其使用rtt作為時間窗口,使rtt * last_loss_based_target_rate(目標碼率)作為數據窗口,如下所示。值得一提的是,total_bytes起初是不包括pacer queue的大小的,只是單純的用處在網絡中的數據的大小,它們相當於下一個rtt下到達的數據,這樣就能夠理解,為什么它除以一個rtt能得到一個有效的窗口比例值,
void GoogCcNetworkController::UpdateCongestionWindowSize() {
// 獲得每個cc-report中的最大rtt數組中最小的那個
TimeDelta min_feedback_max_rtt = TimeDelta::Millis(
*std::min_element(feedback_max_rtts_.begin(), feedback_max_rtts_.end()));
const DataSize kMinCwnd = DataSize::Bytes(2 * 1500);
// 使用rtt + 額外值 作為time_window
TimeDelta time_window =
min_feedback_max_rtt +
TimeDelta::Millis(
rate_control_settings_.GetCongestionWindowAdditionalTimeMs());
// data_window = target_rate * time_window
DataSize data_window = last_loss_based_target_rate_ * time_window;
if (current_data_window_) {
data_window =
std::max(kMinCwnd, (data_window + current_data_window_.value()) / 2);
} else {
data_window = std::max(kMinCwnd, data_window);
}
current_data_window_ = data_window;
}
2.9 總結
再回頭看2.1的着張圖應該就清晰很多了:
- 首先進行探測碼率計算和吞吐量計算
- 基於探測碼率和吞吐量,開始基於延遲計算碼率: 使用線性回歸預估出網絡的狀態后,通過aimd調整碼率
- 有了延遲預估碼率后,基於丟包率調整碼率,得到丟包預估碼率,丟包預估碼率以延遲預估碼率為上限
- 將新的目標碼率設置到探測控制器上,檢測后續是否進行探測,以及探測碼率的大小
- 通過擁塞窗口進一步對目標碼率進行調整
本文的webrtc擁塞控制的上文,主要分析碼率預估和調整的過程;下文將會介紹預估出的碼率在webrtc中編碼器,fec,pacer中的分配和使用。
3.Ref
R1. webrtc擁塞控制論文: Analysis and Design of the Google Congestion Contro for Web Real-time Communication (WebRTC)
R2. transport-cc-feedback草案: https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#page-10
R3. WebRTC GCC擁塞控制算法詳解(一朵喇叭花壓海棠) : https://blog.csdn.net/sonysuqin/article/details/106186374)
R4. congestion_controller、 remote bitrate estimator、pacing模塊淺析(吃好,睡好,身體好): (https://blog.csdn.net/weixin_29405665/article/details/110420315)
R5. WebRTC研究:Transport-cc之RTP及RTCP(劍痴乎 ):https://blog.jianchihu.net/webrtc-research-transport-cc-rtp-rtcp.html