4. 多線程
一般情況下,服務端開啟一條線程做io_service::run()工作就足夠了。但是,有些情況下可能會變得很糟糕。
從之前的分析,我們知道異步操作的一個關鍵步驟就是io_service回調我們注冊的handler。現在假設客戶端與服務端建立了四個socket連接,相應的I/O對象分別為socket1, socket2, socket3, socket4,並且假設io_service現在正在處理socket1注冊的handler。如果這個handler處理的過程很長,那么在這期間socket2,socket3,socket4注冊的handler會一直得不到執行,造成不良的使用體驗。
針對這個問題,解決之道只有采用多線程的方法。多線程的用法很簡單,我們只要把線程函數boost::asio::io_service::run和io_service指針綁定好傳給boost::thread類就好了。如下所示:
boost::thread t(boost::bind(&boost::asio::io_service::run, &io)); t.join();
但是,引入多線程又會引入多線程同步的問題,如果這個問題沒解決好,死機就是家常便飯了。幸好,asio給我們提供了strand這個類(當然,也可以使用mutex,但是使用strand會使代碼更加優雅)。下面,簡單介紹下strand這個類。
1) 用法
用法很簡單,首先定義下變量。
boost::asio::io_service::strand strand_(&io); //注意io_service對象地址作為他的參數。
然后在注冊回調函數時,在外面套上一層strand_.wrap()就好了,如下所示:
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this))); timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
這樣的話,這兩個異步操作的回調函數肯定是被順序執行的。
2) 源碼分析
在分析源碼之前,我們看下一個完整的調用堆棧:
我們把不采用strand時的調用堆棧圖拿來比對下
不知道有沒有被嚇一跳,采用strand方式竟然會多出這么多層調用,讓回調的路途看上去如此漫長。
好了,廢話不多說,我們strand的那張調用堆棧圖中尋找strand的蛛絲馬跡。回調的路上這個函數boost::asio::io_service::strand::dispatch,頓時眼前一亮,讓我想起strand類中的dispatch函數。眼尖的朋友可能發現調用堆棧上出現了兩次boost::asio::io_service::strand::dispatch,不要奇怪,這兩次的handler是不一樣的,如下圖。
這是先被調用的
這是后被調用的
下面貼出io_service::strand類:
class io_service::strand { public: explicit strand(boost::asio::io_service& io_service) : service_(boost::asio::use_service< boost::asio::detail::strand_service>(io_service)) { service_.construct(impl_); } ~strand() { } boost::asio::io_service& get_io_service() { return service_.get_io_service(); } /* 這就是第一個出現在回調路上的函數。 這個函數的作用是讓strand執行給定的handler。 還有一點要說的就是,如果當前線程調用了service::run,那么該線程可以直接調用handler。這也是和post的區別之一。我們可以假想下如果回調的路上不是strand::dispatch,而是strand::post,那么我們的線程棧上也不一定會這么長了。*/ template <typename CompletionHandler> BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ()) dispatch(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { // If you get an error on the following line it means that your handler does // not meet the documented type requirements for a CompletionHandler. BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check; detail::async_result_init< CompletionHandler, void ()> init( BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler)); // 注意此處是service_是boost::asio::detail::strand_service類型的哦; // strand_service里面才是真正控制多線程安全的地方。 service_.dispatch(impl_, init.handler); return init.result.get(); } // 和dispatch都有投遞任務的作用。只是post會馬上返回,handler會被某個調用service::run的線程執行。 template <typename CompletionHandler> BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ()) post(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler) { // If you get an error on the following line it means that your handler does // not meet the documented type requirements for a CompletionHandler. BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check; detail::async_result_init< CompletionHandler, void ()> init( BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler)); service_.post(impl_, init.handler); return init.result.get(); } // 這個wrap函數就是上面說起的那個打包函數。有了它,io_service會先調用strand然后再調用handler,相當於在回調的路上設置了一道關卡,通過strand保證線程安全性。 template <typename Handler> #if defined(GENERATING_DOCUMENTATION) unspecified #else detail::wrapped_handler<strand, Handler, detail::is_continuation_if_running> #endif wrap(Handler handler) { return detail::wrapped_handler<io_service::strand, Handler, detail::is_continuation_if_running>(*this, handler); } bool running_in_this_thread() const { return service_.running_in_this_thread(impl_); } private: boost::asio::detail::strand_service& service_; boost::asio::detail::strand_service::implementation_type impl_; };
光這個類是看不出具體實現細節的,相要了解更多實現細節需要分析strand_service這個類。
具體的多線程控制方面,我們可以看下strand_service::strand_impl這個嵌套類
// The underlying implementation of a strand. class strand_impl : public operation { public: strand_impl(); private: // Only this service will have access to the internal values. friend class strand_service; friend struct on_do_complete_exit; friend struct on_dispatch_exit; // 這就是那個用來多線程控制的互斥鎖 // Mutex to protect access to internal data. boost::asio::detail::mutex mutex_; // 用來表示strand是否被某個handler“鎖住”的變量 // Indicates whether the strand is currently "locked" by a handler. This // means that there is a handler upcall in progress, or that the strand // itself has been scheduled in order to invoke some pending handlers. bool locked_; // 哦,等待處理的排隊隊列 // The handlers that are waiting on the strand but should not be run until // after the next time the strand is scheduled. This queue must only be // modified while the mutex is locked. op_queue<operation> waiting_queue_; // 已經獲取鎖並准備運行的handler // The handlers that are ready to be run. Logically speaking, these are the // handlers that hold the strand's lock. The ready queue is only modified // from within the strand and so may be accessed without locking the mutex. op_queue<operation> ready_queue_; };
可以看出mutex_、locked_、waiting_queue_、ready_queue_這四個變量保證了線程安全性。具體實現方法,可以自己去調試下,這里就不細細分析了(其實是肚子餓了,要去吃飯了^^)。
PS: 異步調用+多線程肯定會大大增加你的調試復雜度,加上日志記錄是勢在必行的事情。這里向大家推薦下簡單易用的glog:https://github.com/google/glog