webrtc點對點會話建立過程:https://blog.csdn.net/zhuiyuanqingya/article/details/84108763
本地Candidate收集
本地的IceCandidate收集過程起始於PeerConnection::SetLocalDescription
,其中會啟動收集
// MaybeStartGathering needs to be called after posting
// MSG_SET_SESSIONDESCRIPTION_SUCCESS, so that we don't signal any candidates
// before signaling that SetLocalDescription completed.
transport_controller_->MaybeStartGathering();
MaybeStartGathering具體實現是:
void JsepTransportController::MaybeStartGathering() {
if (!network_thread_->IsCurrent()) {
network_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { MaybeStartGathering(); });
return;
}
for (auto& dtls : GetDtlsTransports()) {
dtls->ice_transport()->MaybeStartGathering();
}
}
在這里會在network_thread_啟動收集函數,然后對每個DtlsTransports啟動收集過程,轉到P2PTransportChannel::MaybeStartGathering()
:
if (pooled_session) {
AddAllocatorSession(std::move(pooled_session));
PortAllocatorSession* raw_pooled_session =
allocator_sessions_.back().get();
// Process the pooled session's existing candidates/ports, if they exist.
OnCandidatesReady(raw_pooled_session,
raw_pooled_session->ReadyCandidates());
for (PortInterface* port : allocator_sessions_.back()->ReadyPorts()) {
OnPortReady(raw_pooled_session, port);
}
if (allocator_sessions_.back()->CandidatesAllocationDone()) {
OnCandidatesAllocationDone(raw_pooled_session);
}
} else {
AddAllocatorSession(allocator_->CreateSession(
transport_name(), component(), ice_parameters_.ufrag,
ice_parameters_.pwd));
allocator_sessions_.back()->StartGettingPorts();
}
然后啟動port收集BasicPortAllocatorSession::StartGettingPorts()
void BasicPortAllocatorSession::StartGettingPorts() {
RTC_DCHECK_RUN_ON(network_thread_);
state_ = SessionState::GATHERING;
if (!socket_factory_) {
owned_socket_factory_.reset(
new rtc::BasicPacketSocketFactory(network_thread_));
socket_factory_ = owned_socket_factory_.get();
}
network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START);
RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
<< turn_port_prune_policy_;
}
此時的post消息是放到消息隊列中MessageQueue::Post
,會在this的BasicPortAllocatorSession::OnMessage
中監聽消息並進行處理:MSG_CONFIG_START->MSG_CONFIG_READY->MSG_ALLOCATE->MSG_SEQUENCEOBJECTS_CREATED
void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
switch (message->message_id) {
case MSG_CONFIG_START:
GetPortConfigurations();
break;
case MSG_CONFIG_READY:
OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
break;
case MSG_ALLOCATE:
OnAllocate();
break;
case MSG_SEQUENCEOBJECTS_CREATED:
OnAllocationSequenceObjectsCreated();
break;
case MSG_CONFIG_STOP:
OnConfigStop();
break;
default:
RTC_NOTREACHED();
}
}
在OnAllocate
中啟動端口分配:遍歷所有網絡設備(Network 對象),創建 AllocationSequence
對象,調用其 Init
,Start
函數,分配 port。在init
中會創建udp_socket_(CreateUdpSocket
),在啟用boundle的時候檢測StunPorts的時候會和普通UDP端口共用一個socket。
void AllocationSequence::Start() {
state_ = kRunning;
session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
}
發送post消息后會進入AllocationSequence
對象本身的OnMessage
:
void AllocationSequence::OnMessage(rtc::Message* msg) {
RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
// Perform all of the phases in the current step.
RTC_LOG(LS_INFO) << network_->ToString()
<< ": Allocation Phase=" << PHASE_NAMES[phase_];
switch (phase_) {
case PHASE_UDP:
CreateUDPPorts();
CreateStunPorts();
break;
case PHASE_RELAY:
CreateRelayPorts();
break;
case PHASE_TCP:
CreateTCPPorts();
state_ = kCompleted;
break;
default:
RTC_NOTREACHED();
}
if (state() == kRunning) {
++phase_;
session_->network_thread()->PostDelayed(RTC_FROM_HERE,
session_->allocator()->step_delay(),
this, MSG_ALLOCATION_PHASE);
} else {
// If all phases in AllocationSequence are completed, no allocation
// steps needed further. Canceling pending signal.
session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
SignalPortAllocationComplete(this);
}
}
AllocationSequence 的 phase_ 成員在對象創建時初始化為 0, 等於 PHASE_UDP ,所以首先會進入 PHASE_UDP 的處理過程,處理完成后會進入下一個處理session_->network_thread()->PostDelayed
即PHASE_RELAY。
UDP phase 會收集兩種類型的 candidate:host 和 srflx。會先收集host,然后srflx,但是PORTALLOCATOR_ENABLE_SHARED_SOCKET設置為true時會共享一個socket。
void AllocationSequence::CreateUDPPorts() {
if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
RTC_LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
return;
}
// TODO(mallinath) - Remove UDPPort creating socket after shared socket
// is enabled completely.
std::unique_ptr<UDPPort> port;
bool emit_local_candidate_for_anyaddress =
!IsFlagSet(PORTALLOCATOR_DISABLE_DEFAULT_LOCAL_CANDIDATE);
if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
port = UDPPort::Create(
session_->network_thread(), session_->socket_factory(), network_,
udp_socket_.get(), session_->username(), session_->password(),
session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
session_->allocator()->stun_candidate_keepalive_interval());
} else {
port = UDPPort::Create(
session_->network_thread(), session_->socket_factory(), network_,
session_->allocator()->min_port(), session_->allocator()->max_port(),
session_->username(), session_->password(),
session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
session_->allocator()->stun_candidate_keepalive_interval());
}
if (port) {
// If shared socket is enabled, STUN candidate will be allocated by the
// UDPPort.
if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
udp_port_ = port.get();
port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed);
// If STUN is not disabled, setting stun server address to port.
if (!IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
if (config_ && !config_->StunServers().empty()) {
RTC_LOG(LS_INFO)
<< "AllocationSequence: UDPPort will be handling the "
"STUN candidate generation.";
port->set_server_addresses(config_->StunServers());
}
}
}
session_->AddAllocatedPort(port.release(), this, true);
}
}
之后進入BasicPortAllocatorSession::AddAllocatedPort
,這個時候會關聯信號和槽並將此port存入隊列中:
void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
AllocationSequence* seq,
bool prepare_address) {
RTC_DCHECK_RUN_ON(network_thread_);
if (!port)
return;
RTC_LOG(LS_INFO) << "Adding allocated port for " << content_name();
port->set_content_name(content_name());
port->set_component(component());
port->set_generation(generation());
if (allocator_->proxy().type != rtc::PROXY_NONE)
port->set_proxy(allocator_->user_agent(), allocator_->proxy());
port->set_send_retransmit_count_attribute(
(flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);
//將port存入隊列
PortData data(port, seq);
ports_.push_back(data);
//關聯信號和槽
port->SignalCandidateReady.connect(
this, &BasicPortAllocatorSession::OnCandidateReady);
port->SignalCandidateError.connect(
this, &BasicPortAllocatorSession::OnCandidateError);
port->SignalPortComplete.connect(this,
&BasicPortAllocatorSession::OnPortComplete);
port->SignalDestroyed.connect(this,
&BasicPortAllocatorSession::OnPortDestroyed);
port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);
RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";
if (prepare_address)
port->PrepareAddress();
}
之后進入UDPPort::PrepareAddress
,然后轉到UDPPort::OnLocalAddressReady
void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,
const rtc::SocketAddress& address) {
// When adapter enumeration is disabled and binding to the any address, the
// default local address will be issued as a candidate instead if
// |emit_local_for_anyaddress| is true. This is to allow connectivity for
// applications which absolutely requires a HOST candidate.
rtc::SocketAddress addr = address;
// If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at
// least the port is listening.
MaybeSetDefaultLocalAddress(&addr);
AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",
LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);
MaybePrepareStunCandidate();
}
會調用Port::AddAddress
->Port::FinishAddingAddress
在這里會觸發SignalCandidateReady,轉到回調函數
void Port::FinishAddingAddress(const Candidate& c, bool is_final) {
candidates_.push_back(c);
SignalCandidateReady(this, c);
PostAddAddress(is_final);
}
void BasicPortAllocatorSession::OnCandidateReady(Port* port,
const Candidate& c) {
......
// If the current port is not pruned yet, SignalPortReady.
if (!data->pruned()) {
RTC_LOG(LS_INFO) << port->ToString() << ": Port ready.";
SignalPortReady(this, port);
port->KeepAliveUntilPruned();
}
}
if (data->ready() && CheckCandidateFilter(c)) {
std::vector<Candidate> candidates;
candidates.push_back(allocator_->SanitizeCandidate(c));
SignalCandidatesReady(this, candidates);
} else {
RTC_LOG(LS_INFO) << "Discarding candidate because it doesn't match filter.";
}
........
}
信號和槽關聯流轉:
BasicPortAllocatorSession::OnCandidateReady
↓SignalCandidatesReady
P2PTransportChannel::OnCandidatesReady
↓SignalCandidateGathered
JsepTransportController::OnTransportCandidateGathered_n
↓SignalIceCandidatesGathered
PeerConnection::OnTransportControllerCandidatesGathered
↓
PeerConnection::OnIceCandidate
SignalPortReady
P2PTransportChannel::OnPortReady
↓
P2PTransportChannel::CreateConnection
↓
P2PTransportChannel::SortConnectionsAndUpdateState
↓
void PeerConnection::OnIceCandidate(
std::unique_ptr<IceCandidateInterface> candidate) {
if (IsClosed()) {
return;
}
ReportIceCandidateCollected(candidate->candidate());
Observer()->OnIceCandidate(candidate.get());//然后客戶端就會監聽到ICECandidate收集到的消息
}
復用 socket 的情況下, AllocationSequence::CreateStunPorts
函數會直接返回,因為早在 AllocationSequence::CreateUDPPorts
函數的執行過程中,就已經執行了 STUN Binding request 的發送邏輯。
發送 STUN Binding request:
UDPPort::OnLocalAddressReady(CreateStunPorts)
↓
UDPPort::MaybePrepareStunCandidate
↓
UDPPort::SendStunBindingRequests
↓
UDPPort::SendStunBindingRequest
↓
StunRequestManager::Send
↓
StunRequestManager::SendDelayed
↓MSG_STUN_SEND
StunRequest::OnMessage
↓SignalSendPacket
UDPPort::OnSendPacket
↓
AsyncUDPSocket::SendTo
↓
PhysicalSocket::SendTo
↓
收到 STUN Binding response:
PhysicalSocketServer::Wait
↓
SocketDispatcher::OnEvent
↓
AsyncUDPSocket::OnReadEvent
↓SignalReadPacket
AllocationSequence::OnReadPacket
↓
UDPPort::HandleIncomingPacket
↓
UDPPort::OnReadPacket
↓
StunRequestManager::CheckResponse
↓
StunBindingRequest::OnResponse
↓
UDPPort::OnStunBindingRequestSucceeded
↓
Port::AddAddress
↓
Port::FinishAddingAddress
↓
BasicPortAllocatorSession::OnCandidateReady
↓SignalCandidatesReady
StunRequest 類是對 STUN request 的定義和封裝,基類里實現了 request 超時管理、重發的邏輯,各種特定類型的邏輯由子類實現,例如 StunBindingRequest 和 TurnAllocateRequest
StunRequestManager 則實現了 response 和 request 匹配的邏輯:manager 按 transaction id => request 的 hash 保存了所有的 request,收到 response 后,根據 transaction id 即可找到對應的 request,進而可以執行 request 對象的回調。
candidate 收集狀態:
-
P2PTransportChannel 創建時,
gathering_state_
為 kIceGatheringNew; -
在
P2PTransportChannel::MaybeStartGathering
里開始收集 candidate 時,如果當前未處於 Gathering 狀態,則切換到kIceGatheringGathering
狀態; -
在收到 OnCandidatesAllocationDone 回調時,切換到
kIceGatheringComplete
狀態;- 當
BasicPortAllocatorSession::CandidatesAllocationDone
為 true 時,就會觸發這個回調; - 這意味着創建了 AllocationSequence,且所有的 AllocationSequence 都不處於 kRunning 狀態,且所有的 port 都不處於
STATE_INPROGRESS
狀態; - AllocationSequence 創建時處於 kInit 狀態,
AllocationSequence::Start
里切換為 kRunning 狀態,TCP phase 結束后切換為 kCompleted 狀態,如果調用了AllocationSequence::Stop
,則會切換到 kStopped 狀態; - port 創建時就處於
STATE_INPROGRESS
狀態,當被 prune、發生錯誤時,分別切換到STATE_PRUNED
、STATE_ERROR
狀態,TurnPort 和 TcpPort 收集到 candidate 后調用Port::AddAddress
時,就會切換到STATE_COMPLETE
狀態,RelayPort(GTURN)和 UdpPort 也會在收集到 candidate 后切換到STATE_COMPLETE
狀態,StunPort 則會在收集完 candidate(即向所有 STUN server 完成了 binding request)之后切換到STATE_COMPLETE
狀態.
- 當
BasicPortAllocatorSession::
OnAllocationSequenceObjectsCreated也會調用檢測是否完成
AllocationSequence::OnMessage Port::FinishAddingAddress
↓SignalPortAllocationComplete ↓
OnPortAllocationComplete Port::PostAddAddress(bool is_final)
↓ ↓SignalPortComplete
↓ BasicPortAllocatorSession::OnPortComplete
↓ ↓
BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone
↓
BasicPortAllocatorSession::CandidatesAllocationDone
↓true
SignalCandidatesAllocationDone
↓
OnCandidatesAllocationDone
遠端Candidates設置
PeerConnection::SetRemoteDescription UpdateStats(kStatsOutputLevelStandard)
↓
PeerConnection::UseCandidatesInSessionDescription
↓
PeerConnection::UseCandidate
↓SetIceConnectionState(PeerConnectionInterface::kIceConnectionChecking)
JsepTransportController::AddRemoteCandidates
↓
JsepTransport::AddRemoteCandidates
↓
P2PTransportChannel::AddRemoteCandidate
↓
P2PTransportChannel::FinishAddingRemoteCandidate
↓
P2PTransportChannel::CreateConnections
↓
P2PTransportChannel::CreateConnection
↓
UDPPort::CreateConnection
↓
Port::AddOrReplaceConnection SignalConnectionCreated(這個信號未關聯,無用)
↓
P2PTransportChannel::AddConnection
↓
P2PTransportChannel::RememberRemoteCandidate (維護remote_candidates_列表)
P2PTransportChannel::FinishAddingRemoteCandidate
↓
P2PTransportChannel::SortConnectionsAndUpdateState
↓
P2PTransportChannel::MaybeStartPinging
↓
P2PTransportChannel::CheckAndPing FindNextPingableConnection
↓
P2PTransportChannel::PingConnection
↓
Connection::Ping
↓
StunRequestManager::Send IceCandidatePairState::IN_PROGRESS
↓
StunRequestManager::SendDelayed
↓
...這個和STUN Binding response類似
↓
StunRequestManager::CheckResponse
↓
ConnectionRequest::OnResponse
↓
Connection::OnConnectionRequestResponse
↓
Connection::ReceivedPingResponse
↓
Connection::set_write_state STATE_WRITABLE
↓SignalStateChange
P2PTransportChannel::OnConnectionStateChange
↓
RequestSortAndStateUpdate
↓
P2PTransportChannel::SortConnectionsAndUpdateState
P2PTransportChannel::SwitchSelectedConnection
↓ sig slot (SignalReadyToSend)
DtlsTransport::OnReadyToSend
↓ sig slot (SignalReadyToSend)
RtpTransport::OnReadyToSend
AddConnection會關聯端口的發送和接收
void P2PTransportChannel::AddConnection(Connection* connection) {
RTC_DCHECK_RUN_ON(network_thread_);
connections_.push_back(connection);
unpinged_connections_.insert(connection);
connection->set_remote_ice_mode(remote_ice_mode_);
connection->set_receiving_timeout(config_.receiving_timeout);
connection->set_unwritable_timeout(config_.ice_unwritable_timeout);
connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks);
connection->set_inactive_timeout(config_.ice_inactive_timeout);
connection->SignalReadPacket.connect(this,
&P2PTransportChannel::OnReadPacket);
connection->SignalReadyToSend.connect(this,
&P2PTransportChannel::OnReadyToSend);
connection->SignalStateChange.connect(
this, &P2PTransportChannel::OnConnectionStateChange);
connection->SignalDestroyed.connect(
this, &P2PTransportChannel::OnConnectionDestroyed);
connection->SignalNominated.connect(this, &P2PTransportChannel::OnNominated);
had_connection_ = true;
connection->set_ice_event_log(&ice_event_log_);
LogCandidatePairConfig(connection,
webrtc::IceCandidatePairConfigType::kAdded);
}