erizo使用Worker來管理Task,每個Task是一個函數片段,其執行完全由Worker來接管。這次主要學習Worker的結構定義和實現機制
1 class Worker : public std::enable_shared_from_this<Worker> { 2 public: 3 typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; 4 typedef std::function<void()> Task; //返回值為空的函數為Task 5 typedef std::function<bool()> ScheduledTask; //返回值為bool的函數為scheduletask 6 7 explicit Worker(std::weak_ptr<Scheduler> scheduler, 8 std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>()); 9 ~Worker(); 10 11 virtual void task(Task f); //設置運行Task 12 13 virtual void start(); //開啟線程 14 virtual void start(std::shared_ptr<std::promise<void>> start_promise); //同步方式開啟線程,即確定線程啟動后,調用者才會返回 15 virtual void close(); //停止線程 16 17 virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta); //定時器,可以取消的定時器 18 virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id); //取消定時器 19 20 virtual void scheduleEvery(ScheduledTask f, duration period); //循環定時器,f返回false時停止執行 21 22 private: 23 void scheduleEvery(ScheduledTask f, duration period, duration next_delay); 24 std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f); 25 26 protected: 27 int next_scheduled_ = 0; 28 29 private: 30 std::weak_ptr<Scheduler> scheduler_; 31 std::shared_ptr<Clock> clock_; 32 boost::asio::io_service service_; 33 asio_worker service_worker_; 34 boost::thread_group group_; 35 std::atomic<bool> closed_; 36 };
先來研究一下構造函數
Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock) : scheduler_{scheduler}, //構造定時器變量 clock_{the_clock}, //構造自己的時鍾變量 service_{}, //構造io_service對象 service_worker_{new asio_worker::element_type(service_)}, //為io_service注入service_worker,避免直接退出 closed_{false} { //線程控制變量,為true時,退出 }
在構造函數中,使用boost io service,構建了基本的線程架構。
研究一下start
void Worker::start() { auto promise = std::make_shared<std::promise<void>>(); start(promise); } void Worker::start(std::shared_ptr<std::promise<void>> start_promise) { auto this_ptr = shared_from_this(); auto worker = [this_ptr, start_promise] { //創建一個代理worker,准備好執行過程 start_promise->set_value(); //通知promise,線程已經就緒 if (!this_ptr->closed_) { //如果不是close狀態 return this_ptr->service_.run(); //調用io service的run函數,開啟線程過程 } return size_t(0); }; group_.add_thread(new boost::thread(worker)); //實際創建線程,並將之添加到group里面 }
提供了兩個start函數,無參的直接創建一個promise,調用有參數的,並且並未使用get_future.wait進行流程控制。
這里就可以理解為:無參數start,不關心線程是否創建成功,如果在線程沒有創建成功時,調用了task函數,則可能出現異常錯誤。有參數的start為外面控制線程存在,優化處理流程提供了可能。
看一下close函數
void Worker::close() { closed_ = true; service_worker_.reset(); group_.join_all(); service_.stop(); }
在close函數中,將變量設為true,並調用各種析構。
從start和close的控制可以看到,Worker的start和close只能成功調用一次,如果close以后,再start,線程就會直接退出了。這應該也是一個小弊端了。
看task函數
void Worker::task(Task f) { service_.post(f); }
task調用io service的post,直接投遞任務。也就是說task實際上就是一個基礎的處理,讓任務進行投遞然后執行。
std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) { auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta); auto id = std::make_shared<ScheduledTaskReference>(); if (auto scheduler = scheduler_.lock()) { scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,給scheduler的scheduleFromNow做參數傳遞 this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一個task,生成的task功能是如果id->isCancelled為true,直接返回,否則執行f,並將這個task傳遞給自己的任務投遞方法 { if (id->isCancelled()) { return; } } f(); })); }), delta_ms); } return id; }
在scheduleFromNow里面,調用了scheduler的scheduleFromNow方法,在scheduler里面,進入定時線程,到達時間后,執行Worker的task方法,投遞一個Task,進而激活Worker,運行Task內容,完成定時執行操作。
停止定時器:
void Worker::unschedule(std::shared_ptr<ScheduledTaskReference> id) { id->cancel(); }
設置cancel,停止定時器
循環定時器:
void Worker::scheduleEvery(ScheduledTask f, duration period) { scheduleEvery(f, period, period); } void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay) { time_point start = clock_->now(); std::shared_ptr<Clock> clock = clock_; scheduleFromNow(safeTask([start, period, next_delay, f, clock](std::shared_ptr<Worker> this_ptr) { if (f()) { duration clock_skew = clock->now() - start - next_delay; duration delay = period - clock_skew; this_ptr->scheduleEvery(f, period, delay); //循環遞歸調用 } }), std::max(next_delay, duration{0})); }
循環定時器,使用遞歸調用,來實現循環定時器,其停止依托於ScheduledTask的返回值為false,停止循環。
總結:
Worker提供了基本的線程管理,提供了Task執行機制以及定時器控制機制,但是沒有提供資源重復使用的機制,即多次調用close,start的機制
使用例子:
#include <thread/Worker.h> void sample_worker_task() { std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2); std::shared_ptr<erizo::Worker> worker_no_promise = std::make_shared<erizo::Worker>(schedule); worker_no_promise->start(); //maybe, you should do a sleep. int index = 0; worker_no_promise->task([index] {//here may corruption because the thread maybe not ok when task push printf("index is %d\n", index); }); index++; worker_no_promise->task([index] { printf("second task index is %d\n", index); }); //worker_no_promise->close(); //worker_no_promise->reset(); std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule); std::promise pro = std::make_shared<std::promise<void>>(); worker_with_promise->start(pro); pro.get_future().wait();//wait util the thread is ok index++; worker_with_promise->task([index] {//here is safe printf("index is %d\n", index); }); index++; worker_with_promise->task([index] { printf("second task index is %d\n", index); }); //worker_with_promise->close(); //worker_with_promise->reset(); //schedule->reset(); } void sample_woker_schedule_task() { std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2); std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule); std::promise pro = std::make_shared<std::promise<void>>(); worker_with_promise->start(pro); pro.get_future().wait();//wait util the thread is ok int index = 0; std::shared_ptr<Clock> clk = std::make_shared<SteadyClock>(); printf("now is %u", clk->now().time_point); worker_with_promise->scheduleFromNow([clk] {//schedule once after 10 secondes printf("delay 10, now is %u", clk->now().time_point); }, 10000); worker_with_promise->scheduleEvery([index, clk] {//schedule multi per 3 secondes printf("schedule evevry index:%d, now:%u", index, clk->now().time_point); index++; if (index > 3) { return true; } return false; }, 3000); //worker_with_promise->close(); //worker_with_promise->reset(); //schedule->reset(); }