Boost.ASIO簡要分析-4 多線程


4. 多線程

一般情況下,服務端開啟一條線程做io_service::run()工作就足夠了。但是,有些情況下可能會變得很糟糕。

從之前的分析,我們知道異步操作的一個關鍵步驟就是io_service回調我們注冊的handler。現在假設客戶端與服務端建立了四個socket連接,相應的I/O對象分別為socket1, socket2, socket3, socket4,並且假設io_service現在正在處理socket1注冊的handler。如果這個handler處理的過程很長,那么在這期間socket2,socket3,socket4注冊的handler會一直得不到執行,造成不良的使用體驗。

針對這個問題,解決之道只有采用多線程的方法。多線程的用法很簡單,我們只要把線程函數boost::asio::io_service::run和io_service指針綁定好傳給boost::thread類就好了。如下所示:

boost::thread t(boost::bind(&boost::asio::io_service::run, &io));

t.join();

但是,引入多線程又會引入多線程同步的問題,如果這個問題沒解決好,死機就是家常便飯了。幸好,asio給我們提供了strand這個類(當然,也可以使用mutex,但是使用strand會使代碼更加優雅)。下面,簡單介紹下strand這個類。

1) 用法

用法很簡單,首先定義下變量。

boost::asio::io_service::strand strand_(&io); //注意io_service對象地址作為他的參數。

然后在注冊回調函數時,在外面套上一層strand_.wrap()就好了,如下所示:

timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
這樣的話,這兩個異步操作的回調函數肯定是被順序執行的。

2) 源碼分析

在分析源碼之前,我們看下一個完整的調用堆棧:

圖片1

我們把不采用strand時的調用堆棧圖拿來比對下

圖片2

不知道有沒有被嚇一跳,采用strand方式竟然會多出這么多層調用,讓回調的路途看上去如此漫長。

好了,廢話不多說,我們strand的那張調用堆棧圖中尋找strand的蛛絲馬跡。回調的路上這個函數boost::asio::io_service::strand::dispatch,頓時眼前一亮,讓我想起strand類中的dispatch函數。眼尖的朋友可能發現調用堆棧上出現了兩次boost::asio::io_service::strand::dispatch,不要奇怪,這兩次的handler是不一樣的,如下圖。

這是先被調用的

wps47EF.tmp

這是后被調用的

wps47FF.tmp

下面貼出io_service::strand類:

class io_service::strand
{
public:
  explicit strand(boost::asio::io_service& io_service)
    : service_(boost::asio::use_service<
        boost::asio::detail::strand_service>(io_service))
  {
    service_.construct(impl_);
  }

  ~strand()
  {
  }

  boost::asio::io_service& get_io_service()
  {
    return service_.get_io_service();
  }

/* 這就是第一個出現在回調路上的函數。
   這個函數的作用是讓strand執行給定的handler。
   還有一點要說的就是,如果當前線程調用了service::run,那么該線程可以直接調用handler。這也是和post的區別之一。我們可以假想下如果回調的路上不是strand::dispatch,而是strand::post,那么我們的線程棧上也不一定會這么長了。*/
  template <typename CompletionHandler>
  BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ())
  dispatch(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler)
  {
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a CompletionHandler.
    BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

    detail::async_result_init<
      CompletionHandler, void ()> init(
        BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler));

// 注意此處是service_是boost::asio::detail::strand_service類型的哦;
// strand_service里面才是真正控制多線程安全的地方。
    service_.dispatch(impl_, init.handler);

    return init.result.get();
  }

// 和dispatch都有投遞任務的作用。只是post會馬上返回,handler會被某個調用service::run的線程執行。
  template <typename CompletionHandler>
  BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ())
  post(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler)
  {
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a CompletionHandler.
    BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

    detail::async_result_init<
      CompletionHandler, void ()> init(
        BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler));

    service_.post(impl_, init.handler);

    return init.result.get();
  }

// 這個wrap函數就是上面說起的那個打包函數。有了它,io_service會先調用strand然后再調用handler,相當於在回調的路上設置了一道關卡,通過strand保證線程安全性。
  template <typename Handler>
#if defined(GENERATING_DOCUMENTATION)
  unspecified
#else
  detail::wrapped_handler<strand, Handler, detail::is_continuation_if_running>
#endif
  wrap(Handler handler)
  {
    return detail::wrapped_handler<io_service::strand, Handler,
        detail::is_continuation_if_running>(*this, handler);
  }

  bool running_in_this_thread() const
  {
    return service_.running_in_this_thread(impl_);
  }

private:
  boost::asio::detail::strand_service& service_;
  boost::asio::detail::strand_service::implementation_type impl_;
};

光這個類是看不出具體實現細節的,相要了解更多實現細節需要分析strand_service這個類。

具體的多線程控制方面,我們可以看下strand_service::strand_impl這個嵌套類

// The underlying implementation of a strand.

class strand_impl

    : public operation

  {

public:

strand_impl();

private:

// Only this service will have access to the internal values.

friend class strand_service;

friend struct on_do_complete_exit;

friend struct on_dispatch_exit;

// 這就是那個用來多線程控制的互斥鎖

// Mutex to protect access to internal data.

boost::asio::detail::mutex mutex_;

// 用來表示strand是否被某個handler“鎖住”的變量

// Indicates whether the strand is currently "locked" by a handler. This

// means that there is a handler upcall in progress, or that the strand

// itself has been scheduled in order to invoke some pending handlers.

bool locked_;

// 哦,等待處理的排隊隊列

// The handlers that are waiting on the strand but should not be run until

// after the next time the strand is scheduled. This queue must only be

// modified while the mutex is locked.

op_queue<operation> waiting_queue_;

// 已經獲取鎖並准備運行的handler

// The handlers that are ready to be run. Logically speaking, these are the

// handlers that hold the strand's lock. The ready queue is only modified

// from within the strand and so may be accessed without locking the mutex.

op_queue<operation> ready_queue_;

  };

可以看出mutex_、locked_、waiting_queue_、ready_queue_這四個變量保證了線程安全性。具體實現方法,可以自己去調試下,這里就不細細分析了(其實是肚子餓了,要去吃飯了^^)。

PS: 異步調用+多線程肯定會大大增加你的調試復雜度,加上日志記錄是勢在必行的事情。這里向大家推薦下簡單易用的glog:https://github.com/google/glog


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM