licode學習之erizo篇--Worker


erizo使用Worker來管理Task,每個Task是一個函數片段,其執行完全由Worker來接管。這次主要學習Worker的結構定義和實現機制

 1 class Worker : public std::enable_shared_from_this<Worker> {
 2  public:
 3   typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
 4   typedef std::function<void()> Task; //返回值為空的函數為Task  5   typedef std::function<bool()> ScheduledTask;  //返回值為bool的函數為scheduletask  6 
 7   explicit Worker(std::weak_ptr<Scheduler> scheduler,
 8                   std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());
 9   ~Worker();
10 
11   virtual void task(Task f); //設置運行Task 12 
13   virtual void start(); //開啟線程 14   virtual void start(std::shared_ptr<std::promise<void>> start_promise);  //同步方式開啟線程,即確定線程啟動后,調用者才會返回 15   virtual void close(); //停止線程 16 
17   virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta); //定時器,可以取消的定時器 18   virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id); //取消定時器 19 
20   virtual void scheduleEvery(ScheduledTask f, duration period);  //循環定時器,f返回false時停止執行 21 
22  private:
23   void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
24   std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);
25 
26  protected:
27   int next_scheduled_ = 0;
28 
29  private:
30   std::weak_ptr<Scheduler> scheduler_;
31   std::shared_ptr<Clock> clock_;
32   boost::asio::io_service service_;
33   asio_worker service_worker_;
34   boost::thread_group group_;
35   std::atomic<bool> closed_;
36 };

先來研究一下構造函數

Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
    : scheduler_{scheduler}, //構造定時器變量
      clock_{the_clock}, //構造自己的時鍾變量
      service_{}, //構造io_service對象
      service_worker_{new asio_worker::element_type(service_)}, //為io_service注入service_worker,避免直接退出
      closed_{false} { //線程控制變量,為true時,退出
}

在構造函數中,使用boost io service,構建了基本的線程架構。

研究一下start

void Worker::start() {
  auto promise = std::make_shared<std::promise<void>>();
  start(promise);
}

void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
  auto this_ptr = shared_from_this();
  auto worker = [this_ptr, start_promise] { //創建一個代理worker,准備好執行過程
    start_promise->set_value(); //通知promise,線程已經就緒 if (!this_ptr->closed_) { //如果不是close狀態 return this_ptr->service_.run(); //調用io service的run函數,開啟線程過程
    }
    return size_t(0);
  };
  group_.add_thread(new boost::thread(worker)); //實際創建線程,並將之添加到group里面
}

提供了兩個start函數,無參的直接創建一個promise,調用有參數的,並且並未使用get_future.wait進行流程控制。

這里就可以理解為:無參數start,不關心線程是否創建成功,如果在線程沒有創建成功時,調用了task函數,則可能出現異常錯誤。有參數的start為外面控制線程存在,優化處理流程提供了可能。

看一下close函數

void Worker::close() {
  closed_ = true;
  service_worker_.reset();
  group_.join_all();
  service_.stop();
}

在close函數中,將變量設為true,並調用各種析構。

從start和close的控制可以看到,Worker的start和close只能成功調用一次,如果close以后,再start,線程就會直接退出了。這應該也是一個小弊端了。

看task函數

void Worker::task(Task f) {
  service_.post(f);
}

task調用io service的post,直接投遞任務。也就是說task實際上就是一個基礎的處理,讓任務進行投遞然后執行。

std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) {
  auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
  auto id = std::make_shared<ScheduledTaskReference>();
  if (auto scheduler = scheduler_.lock()) {
    scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,給scheduler的scheduleFromNow做參數傳遞
      this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,生成的task功能是如果id->isCancelled為true,直接返回,否則執行f,並將這個task傳遞給自己的任務投遞方法
        {
          if (id->isCancelled()) {
            return;
          }
        }
        f();
      }));
    }), delta_ms);
  }
  return id;
}

 在scheduleFromNow里面,調用了scheduler的scheduleFromNow方法,在scheduler里面,進入定時線程,到達時間后,執行Worker的task方法,投遞一個Task,進而激活Worker,運行Task內容,完成定時執行操作。

停止定時器:

void Worker::unschedule(std::shared_ptr<ScheduledTaskReference> id) {
  id->cancel();
}

設置cancel,停止定時器

循環定時器:

void Worker::scheduleEvery(ScheduledTask f, duration period) {
  scheduleEvery(f, period, period);
}

void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay) {
  time_point start = clock_->now();
  std::shared_ptr<Clock> clock = clock_;

  scheduleFromNow(safeTask([start, period, next_delay, f, clock](std::shared_ptr<Worker> this_ptr) {
    if (f()) {
      duration clock_skew = clock->now() - start - next_delay;
      duration delay = period - clock_skew;
      this_ptr->scheduleEvery(f, period, delay); //循環遞歸調用
    }
  }), std::max(next_delay, duration{0}));
}

循環定時器,使用遞歸調用,來實現循環定時器,其停止依托於ScheduledTask的返回值為false,停止循環。

總結:

Worker提供了基本的線程管理,提供了Task執行機制以及定時器控制機制,但是沒有提供資源重復使用的機制,即多次調用close,start的機制

使用例子:

#include <thread/Worker.h>

void sample_worker_task()
{
    std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2);
    std::shared_ptr<erizo::Worker> worker_no_promise = std::make_shared<erizo::Worker>(schedule);
    worker_no_promise->start();
    //maybe, you should do a sleep.
    int index = 0;
    worker_no_promise->task([index] {//here may corruption because the thread maybe not ok when task push
        printf("index is %d\n", index);
    });
    index++;
    worker_no_promise->task([index] {
        printf("second task index is %d\n", index);
    });
    //worker_no_promise->close();
    //worker_no_promise->reset();

    std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule);
    std::promise pro = std::make_shared<std::promise<void>>();
    worker_with_promise->start(pro);
    pro.get_future().wait();//wait util the thread is ok
    index++;
    worker_with_promise->task([index] {//here is safe
        printf("index is %d\n", index);
    });
    index++;
    worker_with_promise->task([index] {
        printf("second task index is %d\n", index);
    });

    //worker_with_promise->close();
    //worker_with_promise->reset();
    //schedule->reset();
}

void sample_woker_schedule_task()
{
    std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2);
    std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule);
    std::promise pro = std::make_shared<std::promise<void>>();
    worker_with_promise->start(pro);
    pro.get_future().wait();//wait util the thread is ok
    
    int index = 0;
    std::shared_ptr<Clock> clk = std::make_shared<SteadyClock>();
    printf("now is %u", clk->now().time_point);
    worker_with_promise->scheduleFromNow([clk] {//schedule once after 10 secondes
        printf("delay 10, now is %u", clk->now().time_point);
    }, 10000);

    worker_with_promise->scheduleEvery([index, clk] {//schedule multi per 3 secondes
        printf("schedule evevry index:%d, now:%u", index, clk->now().time_point);
        index++;
        if (index > 3)
        {
            return true;
        }
        return false;
    }, 3000);
    //worker_with_promise->close();
    //worker_with_promise->reset();
    //schedule->reset();
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM