pgm不太能用,沒有想象中的可靠,重傳機制貌似仍然使用組播重傳,丟包率80%的網絡感覺沒啥改進,如果有所好轉延遲估計也是個不小的問題。
后聽說rtp也有nack機制,webrtc基於rtp實現了重傳在一定程度上保證可靠性。
在各路大神的指引下找到了rfc4585,看到了這么一段
RTCP擴展反饋報文,有一種nack報文
當FMT=1並且PT=205時,代表此報文是個NACK報文
Name | Value | Brief Description |
---|---|---|
RTPFB | 205 | Transport layer FB message |
PSFB | 206 | Pyload-specific FB message |
0: unassigned
1: Generic NACK
2-30: unassigned
31: reserved for future expansion of the identifier number space
The Generic NACK message is identified by PT=RTPFB and FMT=1.
FCI字段會有如下圖所示的數據
PID:表示Packet ID,用於表明當前接收端丟失的數據包的序號,是接收端期待收到的下一個數據包
BLP:表示bitmask of following lost lost packets,占兩個字節,16位,表示接着PID后面的16個數據包的丟包情況。
rtp協議本身不會幫你重傳。應用應該自己解析rtcp做處理
webrtc關於nack的實現
我突然想起來,我入職的時候下過webrtc的源碼,還沒刪除(可能是太大了,刪太慢了就沒刪),於是就把源碼拿出來看了看webrtc對於這個部分的實現
這個部分的代碼量也不多,很好懂,大概就是發送端的rtcp receiver接收到rtcp數據包,解析發現是個nack,告訴rtp發送端重新發送接收端請求重傳的數據包
bool RTCPReceiver::IncomingPacket(const uint8_t* packet, size_t packet_size) {
if (packet_size == 0) {
LOG(LS_WARNING) << "Incoming empty RTCP packet";
return false;
}
PacketInformation packet_information;
if (!ParseCompoundPacket(packet, packet + packet_size, &packet_information))
return false;
TriggerCallbacksFromRTCPPacket(packet_information);
return true;
}
上述代碼是rtcp receiver接收到rtcp數據包后的初步判斷,ParseCompoundPacket
函數用於解析rtcp數據包,將關鍵信息摘出儲存到PacketInformation
結構體中傳遞給觸發回調,TriggerCallbacksFromRTCPPacket
函數用於觸發收到rtcp數據包回調。
下面是ParseCompoundPacket
結構體的實現
struct RTCPReceiver::PacketInformation {
uint32_t packet_type_flags = 0; // RTCPPacketTypeFlags bit field.
uint32_t remote_ssrc = 0;
std::vector<uint16_t> nack_sequence_numbers;
ReportBlockList report_blocks;
int64_t rtt_ms = 0;
uint8_t sli_picture_id = 0;
uint64_t rpsi_picture_id = 0;
uint32_t receiver_estimated_max_bitrate_bps = 0;
std::unique_ptr<rtcp::TransportFeedback> transport_feedback;
};
nack_sequence_numbers
已經是解析過后的接收端沒有收到的數據包的序號了,解析過程也很簡單,是個拆包過的成就不再展開描述了。
void RTCPReceiver::TriggerCallbacksFromRTCPPacket(
const PacketInformation& packet_information) {
...
if (!receiver_only_ && (packet_information.packet_type_flags & kRtcpNack)) {
if (!packet_information.nack_sequence_numbers.empty()) {
LOG(LS_VERBOSE) << "Incoming NACK length: "
<< packet_information.nack_sequence_numbers.size();
_rtpRtcp.OnReceivedNack(packet_information.nack_sequence_numbers);
}
...
}
TriggerCallbacksFromRTCPPacket
函數會根據解析的數據包信息判斷出當前rtcp數據包類型是nack,觸發回調,該回調並不會直接到rtp sender而是到rtp-rtcp module由這個module調用rtp sender,這個module是rtp和rtcp的中心組件(和webrtc結構有關),也起到了解耦的作用
這個中間調用的代碼量不多
void ModuleRtpRtcpImpl::OnReceivedNack(
const std::vector<uint16_t>& nack_sequence_numbers) {
for (uint16_t nack_sequence_number : nack_sequence_numbers) {
send_loss_stats_.AddLostPacket(nack_sequence_number);
}
if (!rtp_sender_.StorePackets() ||
nack_sequence_numbers.size() == 0) {
return;
}
// Use RTT from RtcpRttStats class if provided.
int64_t rtt = rtt_ms();
if (rtt == 0) {
rtcp_receiver_.RTT(rtcp_receiver_.RemoteSSRC(), NULL, &rtt, NULL, NULL);
}
rtp_sender_.OnReceivedNack(nack_sequence_numbers, rtt);
}
一開始做了一些記錄,記錄丟包情況,然后rtt是用來做流控的,收到nack當次並不一定會重傳,會用到rtt做判斷。
下面是rtp sender的代碼用於重傳數據包
void RTPSender::OnReceivedNack(
const std::vector<uint16_t>& nack_sequence_numbers,
int64_t avg_rtt) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
"RTPSender::OnReceivedNACK", "num_seqnum",
nack_sequence_numbers.size(), "avg_rtt", avg_rtt);
for (uint16_t seq_no : nack_sequence_numbers) {
const int32_t bytes_sent = ReSendPacket(seq_no, 5 + avg_rtt);
if (bytes_sent < 0) {
// Failed to send one Sequence number. Give up the rest in this nack.
LOG(LS_WARNING) << "Failed resending RTP packet " << seq_no
<< ", Discard rest of packets";
break;
}
}
}
TRACE_EVENT
是google調試使用的機制,不用管它,這個函數會循環重發丟失隊列中的數據包,但是不一定發送成功,數據包緩存是有限制的,如果要重新發送的數據包已經不再緩存中了,總不能變出來吧?
int32_t RTPSender::ReSendPacket(uint16_t packet_id, int64_t min_resend_time) {
std::unique_ptr<RtpPacketToSend> packet =
packet_history_.GetPacketAndSetSendTime(packet_id, min_resend_time, true);
if (!packet) {
// Packet not found.
return 0;
}
// Check if we're overusing retransmission bitrate.
// TODO(sprang): Add histograms for nack success or failure reasons.
RTC_DCHECK(retransmission_rate_limiter_);
if (!retransmission_rate_limiter_->TryUseRate(packet->size()))
return -1;
if (paced_sender_) {
// Convert from TickTime to Clock since capture_time_ms is based on
// TickTime.
int64_t corrected_capture_tims_ms =
packet->capture_time_ms() + clock_delta_ms_;
paced_sender_->InsertPacket(RtpPacketSender::kNormalPriority,
packet->Ssrc(), packet->SequenceNumber(),
corrected_capture_tims_ms,
packet->payload_size(), true);
return packet->size();
}
bool rtx = (RtxStatus() & kRtxRetransmitted) > 0;
int32_t packet_size = static_cast<int32_t>(packet->size());
if (!PrepareAndSendPacket(std::move(packet), rtx, true,
PacketInfo::kNotAProbe))
return -1;
return packet_size;
}
- 重發數據包操作會先檢查歷史緩存中有沒有數據包,如果沒有,繼續外層循環,重發下一個包。
- 如果有帶寬限制,需要看當前分給重發機制的帶寬是否已經被用完,用完了就停止循環重發操作。
min_resend_time
時間用於檢測。如果之前有請求過重傳同樣序號的數據包,在短時間內是不會再重傳的