webrtc源碼分析(2)-線程任務管理


1.前言

webrtc線程源於chromium,其中有消息隊列,通信等功能,相對於原始的std::thread或者posix pthread而言,好用不少,本文介紹了webrtc 線程的常用功能以及實現;
webrtc版本:M91

2.正文

2.1 webrtc中的主要線程

出於管理接口即時性,平衡IO任務或其它任務阻塞性等,通過異步的方式,將不同類型的任務歸類到不同的異步線程去處理是常見的處理方式,webrtc中一共有三大線程

  • signaling_thread_: 處理PeerConnection有關的接口任務和observer回調

  • network_thread_:網絡io等

  • worker_thread_:其它阻塞耗時的任務

2.2 使用Invoke在異步線程執行任務

當需要將一個任務放到異步線程的時候,只需要使用thread->Invoke<>()函數即可,取JsepTransportController::SetLocalDescription() 作為例子, 在函數最開頭就檢查了,network_thread線程是否是當前線程,如果不是則通過network_thread_->Invoke<>()將當前函數投遞到network_thread_中去執行:

RTCError JsepTransportController::SetLocalDescription(
    SdpType type,
    const cricket::SessionDescription* description) {
  // network線程運行
  if (!network_thread_->IsCurrent()) {
    return network_thread_->Invoke<RTCError>(
        RTC_FROM_HERE, [=] { return SetLocalDescription(type, description); });
  }

  RTC_DCHECK_RUN_ON(network_thread_);
  if (!initial_offerer_.has_value()) {
    initial_offerer_.emplace(type == SdpType::kOffer);
    if (*initial_offerer_) {
      SetIceRole_n(cricket::ICEROLE_CONTROLLING);
    } else {
      SetIceRole_n(cricket::ICEROLE_CONTROLLED);
    }
  }
  return ApplyDescription_n(/*local=*/true, type, description);
}

2.3 Invoke的實現和線程任務管理

2.3.1 任務的投遞

以上述的SetLocalDescription()為例,其調用的network_thread_->Invoke()如下:

template中的第一個參數ReturnT為執行函數的返回值類型,第二個參數用來SFINAE確保返回值不是void(還有一個適配void類型的版本)

作為task的任務被轉化成了FunctionView類型的functor,傳入到函數InvokeInternal()

  template <
      class ReturnT,
      typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result = functor(); });
    return result;
  }

InvokeInternal()會將functor轉化成Msg handler然后放到this線程隊列中去

void Thread::InvokeInternal(const Location& posted_from,
                            rtc::FunctionView<void()> functor) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
               "src_func", posted_from.function_name());

  class FunctorMessageHandler : public MessageHandler {
   public:
    explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
        : functor_(functor) {}
    void OnMessage(Message* msg) override { functor_(); }

   private:
    rtc::FunctionView<void()> functor_;
  // 將funtor轉化成Msg handler
  } handler(functor);
  // 發送到this線程隊列中
  Send(posted_from, &handler);
}

Thread::Send()函數中有非常多的細節,首先會判斷當前線程和this線程是否相同,是就直接執行,否則生成一個QueueTask 投遞到this線程的隊列中去

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  RTC_DCHECK(!IsQuitting());
  if (IsQuitting())
    return;

  // Sent messages are sent to the MessageHandler directly, in the context
  // of "thread", like Win32 SendMessage. If in the right context,
  // call the handler directly.
  // 構造成msg
  Message msg;
  msg.posted_from = posted_from;
  msg.phandler = phandler;
  msg.message_id = id;
  msg.pdata = pdata;

  // 如果當前線程就是this線程的話,直接將
  // 執行任務即可
  if (IsCurrent()) {
    msg.phandler->OnMessage(&msg);
    return;
  }

  AssertBlockingIsAllowedOnCurrentThread();

  // 獲取當前線程
  Thread* current_thread = Thread::Current();

#if RTC_DCHECK_IS_ON
  if (current_thread) {
    RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
    ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
                                                             this);
  }
#endif

  // Perhaps down the line we can get rid of this workaround and always require
  // current_thread to be valid when Send() is called.
  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());

  bool ready = false;
  // 將msg封裝達成QueueTask,放到線程隊列中
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done = done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));

  if (current_thread) {
    // 當前的thread是google thread
    bool waited = false;
    crit_.Enter();
    while (!ready) {
      // 任務未執行完,阻塞等待到任務完成被喚醒
      crit_.Leave(); 
      current_thread->socketserver()->Wait(kForever, false); // epoll wait
      waited = true;
      crit_.Enter();
    }
    crit_.Leave();

    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.

    if (waited) {
      // socketserver有兩個使用場景
      // 1.像這種給別的線程投遞了阻塞任務后,進行wait等到執行完畢
      // 2.Thread::Get()函數中獲取消息的時候,如果獲取不到就會陷入永久的wait直到被wakup()
      // 對於第二點,此處提到了一個問題,A向B投遞了一個阻塞任務task1后wait等待結果,此時別的線程
      // 向A的隊列投遞了一個任務task1,投遞的時候會有wakeup()的操作,那么上面檢測ready的loop會把
      // 這個wakeup()給吃掉,當任務完成時,由於wakeup被吃掉了,導致線程獲取task得時候會陷入wait
      // 無法及時處理task,(表述上確實如此,但代碼上看似乎沒有這樣得問題,因為是先檢測隊列是否為空
      // 再繼續wait的)
      current_thread->socketserver()->WakeUp();
    }
  } else {
    // 非常google thread
    done_event->Wait(rtc::Event::kForever);
  }
}

先跟着主流程走看看這個PostTask()函數, PostTask()內部直接調用了POST(), 在POST()中把msg再次封裝成一個rtc::message然后投遞到this線程的任務隊列messages_中,然后執行WakeUpSocketServer() 喚醒this線程消費任務,至此,任務的投遞過程就完成了;

void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
  // Though Post takes MessageData by raw pointer (last parameter), it still
  // takes it with ownership.
  Post(RTC_FROM_HERE, &queued_task_handler_,
       /*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}

void Thread::Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata,
                  bool time_sensitive) {
  RTC_DCHECK(!time_sensitive);
  if (IsQuitting()) {
    delete pdata;
    return;
  }

  // Keep thread safe
  // Add the message to the end of the queue
  // Signal for the multiplexer to return

  {
    CritScope cs(&crit_);
    // 將QueueTask 封裝成 rtc::message 放到message隊列中
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    messages_.push_back(msg);
  }
  // 喚醒this線程消費任務
  WakeUpSocketServer();
}

2.3.2 任務的消費

接下來看看task的消費流程,thread啟動之后會運行Run() 然后運行ProcessMessages()

void Thread::Run() {
  ProcessMessages(kForever);
}

bool Thread::ProcessMessages(int cmsLoop) {
  // Using ProcessMessages with a custom clock for testing and a time greater
  // than 0 doesn't work, since it's not guaranteed to advance the custom
  // clock's time, and may get stuck in an infinite loop.
  RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
             cmsLoop == kForever);
  int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
  int cmsNext = cmsLoop;

  while (true) {
#if defined(WEBRTC_MAC)
    ScopedAutoReleasePool pool;
#endif
    Message msg;
    // 獲取消息
    if (!Get(&msg, cmsNext))
      return !IsQuitting();
    // 分發處理
    Dispatch(&msg);

    if (cmsLoop != kForever) {
      cmsNext = static_cast<int>(TimeUntil(msEnd));
      if (cmsNext < 0)
        return true;
    }
  }
}

Thread::Get()中會從消息隊列messages_獲取消息,看起來很長,核心的只有幾句:

遍歷delay_messages_,獲取到期消息並放入到messages_隊列中

將messages_存在的消息取出,返回出去,如果沒有,則break陷入阻塞直到被喚醒

bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
  // Return and clear peek if present
  // Always return the peek if it exists so there is Peek/Get symmetry

  if (fPeekKeep_) {
    *pmsg = msgPeek_;
    fPeekKeep_ = false;
    return true;
  }

  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch

  int64_t cmsTotal = cmsWait;
  int64_t cmsElapsed = 0;
  int64_t msStart = TimeMillis();
  int64_t msCurrent = msStart;
  while (true) {
    // Check for posted events
    int64_t cmsDelayNext = kForever;
    bool first_pass = true;
    while (true) {
      // All queue operations need to be locked, but nothing else in this loop
      // (specifically handling disposed message) can happen inside the crit.
      // Otherwise, disposed MessageHandlers will cause deadlocks.
      {
        CritScope cs(&crit_);
        // On the first pass, check for delayed messages that have been
        // triggered and calculate the next trigger time.
        if (first_pass) {
          first_pass = false;
          // 遍歷delay message到期消息
          while (!delayed_messages_.empty()) {
            if (msCurrent < delayed_messages_.top().run_time_ms_) {
              cmsDelayNext =
                  TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
              break;
            }
            // 將到期消息移動到messages_隊列中
            messages_.push_back(delayed_messages_.top().msg_);
            delayed_messages_.pop();
          }
        }
        // Pull a message off the message queue, if available.
        // 獲取messages_任務
        if (messages_.empty()) {
          break;
        } else {
          *pmsg = messages_.front();
          messages_.pop_front();
        }
      }  // crit_ is released here.

      // If this was a dispose message, delete it and skip it.
      if (MQID_DISPOSE == pmsg->message_id) {
        RTC_DCHECK(nullptr == pmsg->phandler);
        delete pmsg->pdata;
        *pmsg = Message();
        continue;
      }
      return true;
    }

    if (IsQuitting())
      break;

    // Which is shorter, the delay wait or the asked wait?

    int64_t cmsNext;
    if (cmsWait == kForever) {
      cmsNext = cmsDelayNext;
    } else {
      cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
        cmsNext = cmsDelayNext;
    }

    {
      // 阻塞直到消息來
      // Wait and multiplex in the meantime
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
        return false;
    }

    // If the specified timeout expired, return

    msCurrent = TimeMillis();
    cmsElapsed = TimeDiff(msCurrent, msStart);
    if (cmsWait != kForever) {
      if (cmsElapsed >= cmsWait)
        return false;
    }
  }
  return false;
}

當消息被獲取出,就調用dispatch()然后執行,至此,任務就被執行完成了

void Thread::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
               pmsg->posted_from.file_name(), "src_func",
               pmsg->posted_from.function_name());
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  pmsg->phandler->OnMessage(pmsg);// 執行
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch. Posted from: "
                     << pmsg->posted_from.ToString();
    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}

調用QueuedTaskHandler::OnMessage(),從msg->pdata中還原成QueueTask運行,然后release釋放掉

void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
  RTC_DCHECK(msg);
  //取出data 還原成queue task
  auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
  std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
  // Thread expects handler to own Message::pdata when OnMessage is called
  // Since MessageData is no longer needed, delete it.
  delete data;

  // 運行之后釋放
  // QueuedTask interface uses Run return value to communicate who owns the
  // task. false means QueuedTask took the ownership.
  if (!task->Run())
    task.release();
}

2.3.3 執行結果的返回

上述task執行的時候涉及到兩個非常重要的函數task->Run()task.release():


task->Run()會將最初的投遞進來的函數運行然后存到result中,流程如下

首先構造QueueTask時的lambda

  void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  
  ......
  
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },      // <= Run()
      [this, &ready, current_thread, done = done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));
      
  ......
}

msg.phandler->OnMessage(&msg) 中的phanlder是InvokeInternal構造的FunctorMessageHandler,調用的override的OnMessage()函數

void Thread::InvokeInternal(const Location& posted_from,
                            rtc::FunctionView<void()> functor) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
               "src_func", posted_from.function_name());

  class FunctorMessageHandler : public MessageHandler {
   public:
    explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
        : functor_(functor) {}
    void OnMessage(Message* msg) override { functor_(); } // <= OnMessage()

   private:
    rtc::FunctionView<void()> functor_;
  } handler(functor);
  Send(posted_from, &handler);
}

functor_()是Invoke中是將result = functor(); 封裝成的一個lambda,所以執行完成后的result會存在該result中;

  template <
      class ReturnT,
      typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result = functor(); });
    return result;
  }

在之前投遞任務的send函數中,當前線程PostTask()后就開始current_thread->socketserver()->Wait(kForever, false), 陷入阻塞;

task.release()則會喚醒投遞任務后陷入wait的當前線程,讓其將result返回

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
......


  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());

  bool ready = false;
  // 將msg封裝達成QueueTask,放到線程隊列中
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done = done_event.get()] { // <= task.release()
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp(); // 喚醒
        } else {
          done->Set();
        }
      }));

  if (current_thread) {
    // 當前的thread是google thread
    bool waited = false;
    crit_.Enter();
    while (!ready) {
      // 任務未執行完,阻塞等待到任務完成被喚醒
      crit_.Leave(); 
      current_thread->socketserver()->Wait(kForever, false); // epoll wait
      waited = true;
      crit_.Enter();
    }
    crit_.Leave();

    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.

    if (waited) {
      // socketserver有兩個使用場景
      // 1.像這種給別的線程投遞了阻塞任務后,進行wait等到執行完畢
      // 2.Thread::Get()函數中獲取消息的時候,如果獲取不到就會陷入永久的wait直到被wakup()
      // 對於第二點,此處提到了一個問題,A向B投遞了一個阻塞任務task1后wait等待結果,此時別的線程
      // 向A的隊列投遞了一個任務task1,投遞的時候會有wakeup()的操作,那么上面檢測ready的loop會把
      // 這個wakeup()給吃掉,當任務完成時,由於wakeup被吃掉了,導致線程獲取task得時候會陷入wait
      // 無法及時處理task,(表述上確實如此,但代碼上看似乎沒有這樣得問題,因為是先檢測隊列是否為空
      // 再繼續wait的)
      current_thread->socketserver()->WakeUp();
    }
  } else {
    // 非常google thread
    done_event->Wait(rtc::Event::kForever);
  }
}

2.4 API類的異步代理類

signaling_thread_ 是用來處理Api層次任務的線程,類對外的接口會通過代理類,將內部的接口任務投遞到signaling_thread_ 中;

比如類PeerConnection,在api\peer_connection_proxy.h定義了其代理類如下

BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
              AddTrack,
              rtc::scoped_refptr<MediaStreamTrackInterface>,
              const std::vector<std::string>&)
.....

這樣的代理類是通過api/proxy.h的一組宏完成的的,這組宏的用法在文件有很詳細的說明:

//
// Example usage:
// 1. 創建interface類
// class TestInterface : public rtc::RefCountInterface {
//  public:
//   std::string FooA() = 0;
//   std::string FooB(bool arg1) const = 0;
//   std::string FooC(bool arg1) = 0;
//  };
//
// Note that return types can not be a const reference.
// 2.繼承接口類,實現
// class Test : public TestInterface {
// ... implementation of the interface.
// };
//
// 3. 通過宏生命代理類
// BEGIN_PROXY_MAP(Test)
//   PROXY_PRIMARY_THREAD_DESTRUCTOR()
//   PROXY_METHOD0(std::string, FooA)
//   PROXY_CONSTMETHOD1(std::string, FooB, arg1)
//   PROXY_SECONDARY_METHOD1(std::string, FooC, arg1)
// END_PROXY_MAP()
//
// Where the destructor and first two methods are invoked on the primary
// thread, and the third is invoked on the secondary thread.
//
// The proxy can be created using
// 4.創建代理對象
//   TestProxy::Create(Thread* signaling_thread, Thread* worker_thread,
//                     TestInterface*).
//

將PeerConnection的代理宏展開

template <class INTERNAL_CLASS> 
class PeerConnectionProxyWithInternal; 
typedef PeerConnectionProxyWithInternal<PeerConnectionInterface> PeerConnectionProxy; 

// 代理類繼承PeerConnectionInterface接口
template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal : public PeerConnectionInterface {
protected: 
     typedef PeerConnectionInterface C; 
     public: 
     const INTERNAL_CLASS* internal() const { return c_; } 

     INTERNAL_CLASS* internal() { return c_; } 

protected:
     PeerConnectionProxyWithInternal(rtc::Thread* primary_thread, 
        rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) : 
     primary_thread_(primary_thread), secondary_thread_(secondary_thread), 
     c_(PeerConnection) {} 

private: 
	 // 放入的兩個線程
     mutable rtc::Thread* primary_thread_; 
     mutable rtc::Thread* secondary_thread_; 
     protected:
      ~PeerConnectionProxyWithInternal() {
           MethodCall<PeerConnectionProxyWithInternal, void> call( this, &PeerConnectionProxyWithInternal::DestroyInternal); 
           call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 28), destructor_thread()); 
    } 

private: 
    void DestroyInternal() { c_ = nullptr; } 

    rtc::scoped_refptr<INTERNAL_CLASS> c_; 

public: 
	// 創建代理對象靜態方法
    static rtc::scoped_refptr<PeerConnectionProxyWithInternal> 
    Create( rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) {
         return new rtc::RefCountedObject<PeerConnectionProxyWithInternal>( primary_thread, secondary_thread, PeerConnection); 
    }

private:
     rtc::Thread* destructor_thread() const { return primary_thread_; } 

public:
	// local_streams的代理方法
    rtc::scoped_refptr<StreamCollectionInterface> local_streams() override {
         MethodCall<C, rtc::scoped_refptr<StreamCollectionInterface>> call(c_, &C::local_streams); 
         return call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 30), primary_thread_);
    }
    

代理類創建的時候可以放入兩個目的線程primary_thread_secondary_thread_ ,用來提供給代理方法使用。

local_streams()的代理方法是通過宏PROXY_METHOD0()創建的,該宏會創建一個MethodCall<> call封裝要執行的函數local_streams(),然后通過call.Marshal()將任務投遞到primary_thread_

Marshal()方法如下所示:

  R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
    if (t->IsCurrent()) {
      // 是當前線程,Invoke
      Invoke(std::index_sequence_for<Args...>());
    } else {
      // 不是則PostTask 然后阻塞等
      t->PostTask(std::unique_ptr<QueuedTask>(this));
      event_.Wait(rtc::Event::kForever);
    }
    return r_.moved_result();
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM