1.前言
webrtc線程源於chromium,其中有消息隊列,通信等功能,相對於原始的std::thread或者posix pthread而言,好用不少,本文介紹了webrtc 線程的常用功能以及實現;
webrtc版本:M91
2.正文
2.1 webrtc中的主要線程
出於管理接口即時性,平衡IO任務或其它任務阻塞性等,通過異步的方式,將不同類型的任務歸類到不同的異步線程去處理是常見的處理方式,webrtc中一共有三大線程
-
signaling_thread_: 處理PeerConnection有關的接口任務和observer回調
-
network_thread_:網絡io等
-
worker_thread_:其它阻塞耗時的任務
2.2 使用Invoke在異步線程執行任務
當需要將一個任務放到異步線程的時候,只需要使用thread->Invoke<>()
函數即可,取JsepTransportController::SetLocalDescription()
作為例子, 在函數最開頭就檢查了,network_thread線程是否是當前線程,如果不是則通過network_thread_->Invoke<>()
將當前函數投遞到network_thread_中去執行:
RTCError JsepTransportController::SetLocalDescription(
SdpType type,
const cricket::SessionDescription* description) {
// network線程運行
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<RTCError>(
RTC_FROM_HERE, [=] { return SetLocalDescription(type, description); });
}
RTC_DCHECK_RUN_ON(network_thread_);
if (!initial_offerer_.has_value()) {
initial_offerer_.emplace(type == SdpType::kOffer);
if (*initial_offerer_) {
SetIceRole_n(cricket::ICEROLE_CONTROLLING);
} else {
SetIceRole_n(cricket::ICEROLE_CONTROLLED);
}
}
return ApplyDescription_n(/*local=*/true, type, description);
}
2.3 Invoke的實現和線程任務管理
2.3.1 任務的投遞
以上述的SetLocalDescription()
為例,其調用的network_thread_->Invoke()
如下:
template中的第一個參數ReturnT
為執行函數的返回值類型,第二個參數用來SFINAE確保返回值不是void
(還有一個適配void類型的版本)
作為task的任務被轉化成了FunctionView
類型的functor,傳入到函數InvokeInternal()
中
template <
class ReturnT,
typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result = functor(); });
return result;
}
InvokeInternal()
會將functor轉化成Msg handler然后放到this線程隊列中去
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); }
private:
rtc::FunctionView<void()> functor_;
// 將funtor轉化成Msg handler
} handler(functor);
// 發送到this線程隊列中
Send(posted_from, &handler);
}
在Thread::Send()
函數中有非常多的細節,首先會判斷當前線程和this線程是否相同,是就直接執行,否則生成一個QueueTask 投遞到this線程的隊列中去
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
RTC_DCHECK(!IsQuitting());
if (IsQuitting())
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
// 構造成msg
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
// 如果當前線程就是this線程的話,直接將
// 執行任務即可
if (IsCurrent()) {
msg.phandler->OnMessage(&msg);
return;
}
AssertBlockingIsAllowedOnCurrentThread();
// 獲取當前線程
Thread* current_thread = Thread::Current();
#if RTC_DCHECK_IS_ON
if (current_thread) {
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
}
#endif
// Perhaps down the line we can get rid of this workaround and always require
// current_thread to be valid when Send() is called.
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready = false;
// 將msg封裝達成QueueTask,放到線程隊列中
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done = done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
if (current_thread) {
// 當前的thread是google thread
bool waited = false;
crit_.Enter();
while (!ready) {
// 任務未執行完,阻塞等待到任務完成被喚醒
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false); // epoll wait
waited = true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
// socketserver有兩個使用場景
// 1.像這種給別的線程投遞了阻塞任務后,進行wait等到執行完畢
// 2.Thread::Get()函數中獲取消息的時候,如果獲取不到就會陷入永久的wait直到被wakup()
// 對於第二點,此處提到了一個問題,A向B投遞了一個阻塞任務task1后wait等待結果,此時別的線程
// 向A的隊列投遞了一個任務task1,投遞的時候會有wakeup()的操作,那么上面檢測ready的loop會把
// 這個wakeup()給吃掉,當任務完成時,由於wakeup被吃掉了,導致線程獲取task得時候會陷入wait
// 無法及時處理task,(表述上確實如此,但代碼上看似乎沒有這樣得問題,因為是先檢測隊列是否為空
// 再繼續wait的)
current_thread->socketserver()->WakeUp();
}
} else {
// 非常google thread
done_event->Wait(rtc::Event::kForever);
}
}
先跟着主流程走看看這個PostTask()
函數, PostTask()
內部直接調用了POST()
, 在POST()
中把msg再次封裝成一個rtc::message
然后投遞到this線程的任務隊列messages_
中,然后執行WakeUpSocketServer()
喚醒this線程消費任務,至此,任務的投遞過程就完成了;
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
Post(RTC_FROM_HERE, &queued_task_handler_,
/*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
}
// Keep thread safe
// Add the message to the end of the queue
// Signal for the multiplexer to return
{
CritScope cs(&crit_);
// 將QueueTask 封裝成 rtc::message 放到message隊列中
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
messages_.push_back(msg);
}
// 喚醒this線程消費任務
WakeUpSocketServer();
}
2.3.2 任務的消費
接下來看看task的消費流程,thread啟動之后會運行Run()
然后運行ProcessMessages()
void Thread::Run() {
ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {
// Using ProcessMessages with a custom clock for testing and a time greater
// than 0 doesn't work, since it's not guaranteed to advance the custom
// clock's time, and may get stuck in an infinite loop.
RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
cmsLoop == kForever);
int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
int cmsNext = cmsLoop;
while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
Message msg;
// 獲取消息
if (!Get(&msg, cmsNext))
return !IsQuitting();
// 分發處理
Dispatch(&msg);
if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntil(msEnd));
if (cmsNext < 0)
return true;
}
}
}
在Thread::Get()
中會從消息隊列messages_
獲取消息,看起來很長,核心的只有幾句:
遍歷delay_messages_,獲取到期消息並放入到messages_隊列中
將messages_存在的消息取出,返回出去,如果沒有,則break陷入阻塞直到被喚醒
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// Return and clear peek if present
// Always return the peek if it exists so there is Peek/Get symmetry
if (fPeekKeep_) {
*pmsg = msgPeek_;
fPeekKeep_ = false;
return true;
}
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
// Check for posted events
int64_t cmsDelayNext = kForever;
bool first_pass = true;
while (true) {
// All queue operations need to be locked, but nothing else in this loop
// (specifically handling disposed message) can happen inside the crit.
// Otherwise, disposed MessageHandlers will cause deadlocks.
{
CritScope cs(&crit_);
// On the first pass, check for delayed messages that have been
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass = false;
// 遍歷delay message到期消息
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext =
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
// 將到期消息移動到messages_隊列中
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
// 獲取messages_任務
if (messages_.empty()) {
break;
} else {
*pmsg = messages_.front();
messages_.pop_front();
}
} // crit_ is released here.
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
delete pmsg->pdata;
*pmsg = Message();
continue;
}
return true;
}
if (IsQuitting())
break;
// Which is shorter, the delay wait or the asked wait?
int64_t cmsNext;
if (cmsWait == kForever) {
cmsNext = cmsDelayNext;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
cmsNext = cmsDelayNext;
}
{
// 阻塞直到消息來
// Wait and multiplex in the meantime
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
return false;
}
// If the specified timeout expired, return
msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
if (cmsElapsed >= cmsWait)
return false;
}
}
return false;
}
當消息被獲取出,就調用dispatch()
然后執行,至此,任務就被執行完成了
void Thread::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
pmsg->posted_from.file_name(), "src_func",
pmsg->posted_from.function_name());
RTC_DCHECK_RUN_ON(this);
int64_t start_time = TimeMillis();
pmsg->phandler->OnMessage(pmsg);// 執行
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_ = diff + 1;
}
}
調用QueuedTaskHandler::OnMessage()
,從msg->pdata中還原成QueueTask運行,然后release釋放掉
void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
RTC_DCHECK(msg);
//取出data 還原成queue task
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
// Thread expects handler to own Message::pdata when OnMessage is called
// Since MessageData is no longer needed, delete it.
delete data;
// 運行之后釋放
// QueuedTask interface uses Run return value to communicate who owns the
// task. false means QueuedTask took the ownership.
if (!task->Run())
task.release();
}
2.3.3 執行結果的返回
上述task執行的時候涉及到兩個非常重要的函數task->Run()
和 task.release()
:
task->Run()
會將最初的投遞進來的函數運行然后存到result中,流程如下
首先構造QueueTask時的lambda
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
......
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); }, // <= Run()
[this, &ready, current_thread, done = done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
......
}
msg.phandler->OnMessage(&msg)
中的phanlder是InvokeInternal
構造的FunctorMessageHandler,調用的override的OnMessage()函數
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); } // <= OnMessage()
private:
rtc::FunctionView<void()> functor_;
} handler(functor);
Send(posted_from, &handler);
}
functor_()
是Invoke中是將result = functor();
封裝成的一個lambda,所以執行完成后的result會存在該result中;
template <
class ReturnT,
typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result = functor(); });
return result;
}
在之前投遞任務的send函數中,當前線程PostTask()
后就開始current_thread->socketserver()->Wait(kForever, false)
, 陷入阻塞;
task.release()
則會喚醒投遞任務后陷入wait的當前線程,讓其將result返回
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
......
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready = false;
// 將msg封裝達成QueueTask,放到線程隊列中
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done = done_event.get()] { // <= task.release()
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp(); // 喚醒
} else {
done->Set();
}
}));
if (current_thread) {
// 當前的thread是google thread
bool waited = false;
crit_.Enter();
while (!ready) {
// 任務未執行完,阻塞等待到任務完成被喚醒
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false); // epoll wait
waited = true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
// socketserver有兩個使用場景
// 1.像這種給別的線程投遞了阻塞任務后,進行wait等到執行完畢
// 2.Thread::Get()函數中獲取消息的時候,如果獲取不到就會陷入永久的wait直到被wakup()
// 對於第二點,此處提到了一個問題,A向B投遞了一個阻塞任務task1后wait等待結果,此時別的線程
// 向A的隊列投遞了一個任務task1,投遞的時候會有wakeup()的操作,那么上面檢測ready的loop會把
// 這個wakeup()給吃掉,當任務完成時,由於wakeup被吃掉了,導致線程獲取task得時候會陷入wait
// 無法及時處理task,(表述上確實如此,但代碼上看似乎沒有這樣得問題,因為是先檢測隊列是否為空
// 再繼續wait的)
current_thread->socketserver()->WakeUp();
}
} else {
// 非常google thread
done_event->Wait(rtc::Event::kForever);
}
}
2.4 API類的異步代理類
signaling_thread_ 是用來處理Api層次任務的線程,類對外的接口會通過代理類,將內部的接口任務投遞到signaling_thread_ 中;
比如類PeerConnection,在api\peer_connection_proxy.h
定義了其代理類如下
BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
AddTrack,
rtc::scoped_refptr<MediaStreamTrackInterface>,
const std::vector<std::string>&)
.....
這樣的代理類是通過api/proxy.h
的一組宏完成的的,這組宏的用法在文件有很詳細的說明:
//
// Example usage:
// 1. 創建interface類
// class TestInterface : public rtc::RefCountInterface {
// public:
// std::string FooA() = 0;
// std::string FooB(bool arg1) const = 0;
// std::string FooC(bool arg1) = 0;
// };
//
// Note that return types can not be a const reference.
// 2.繼承接口類,實現
// class Test : public TestInterface {
// ... implementation of the interface.
// };
//
// 3. 通過宏生命代理類
// BEGIN_PROXY_MAP(Test)
// PROXY_PRIMARY_THREAD_DESTRUCTOR()
// PROXY_METHOD0(std::string, FooA)
// PROXY_CONSTMETHOD1(std::string, FooB, arg1)
// PROXY_SECONDARY_METHOD1(std::string, FooC, arg1)
// END_PROXY_MAP()
//
// Where the destructor and first two methods are invoked on the primary
// thread, and the third is invoked on the secondary thread.
//
// The proxy can be created using
// 4.創建代理對象
// TestProxy::Create(Thread* signaling_thread, Thread* worker_thread,
// TestInterface*).
//
將PeerConnection的代理宏展開
template <class INTERNAL_CLASS>
class PeerConnectionProxyWithInternal;
typedef PeerConnectionProxyWithInternal<PeerConnectionInterface> PeerConnectionProxy;
// 代理類繼承PeerConnectionInterface接口
template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal : public PeerConnectionInterface {
protected:
typedef PeerConnectionInterface C;
public:
const INTERNAL_CLASS* internal() const { return c_; }
INTERNAL_CLASS* internal() { return c_; }
protected:
PeerConnectionProxyWithInternal(rtc::Thread* primary_thread,
rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) :
primary_thread_(primary_thread), secondary_thread_(secondary_thread),
c_(PeerConnection) {}
private:
// 放入的兩個線程
mutable rtc::Thread* primary_thread_;
mutable rtc::Thread* secondary_thread_;
protected:
~PeerConnectionProxyWithInternal() {
MethodCall<PeerConnectionProxyWithInternal, void> call( this, &PeerConnectionProxyWithInternal::DestroyInternal);
call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 28), destructor_thread());
}
private:
void DestroyInternal() { c_ = nullptr; }
rtc::scoped_refptr<INTERNAL_CLASS> c_;
public:
// 創建代理對象靜態方法
static rtc::scoped_refptr<PeerConnectionProxyWithInternal>
Create( rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) {
return new rtc::RefCountedObject<PeerConnectionProxyWithInternal>( primary_thread, secondary_thread, PeerConnection);
}
private:
rtc::Thread* destructor_thread() const { return primary_thread_; }
public:
// local_streams的代理方法
rtc::scoped_refptr<StreamCollectionInterface> local_streams() override {
MethodCall<C, rtc::scoped_refptr<StreamCollectionInterface>> call(c_, &C::local_streams);
return call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 30), primary_thread_);
}
代理類創建的時候可以放入兩個目的線程primary_thread_
和secondary_thread_
,用來提供給代理方法使用。
local_streams()
的代理方法是通過宏PROXY_METHOD0()
創建的,該宏會創建一個MethodCall<> call
封裝要執行的函數local_streams()
,然后通過call.Marshal()
將任務投遞到primary_thread_
Marshal()
方法如下所示:
R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
if (t->IsCurrent()) {
// 是當前線程,Invoke
Invoke(std::index_sequence_for<Args...>());
} else {
// 不是則PostTask 然后阻塞等
t->PostTask(std::unique_ptr<QueuedTask>(this));
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}