boost::asio的io_service處理過程


1.主線程定義回調對象

2.調用io object的操作

3.io object會另開線程,定義opertion op來執行操作,同時將回調對象加到op的do_complete上。進行操作

4.完成操作加入完成隊列

5.io_service線程循環從完成隊列取事件,調用其事件對應的回調函數

 

 

 

 

 

Operation

還記得前面我們在分析resolver的實現的時候,挖了一個關於operation的坑?為了不讓自己陷進去,現在來填吧;接下來我們就來看看asio中的各種operation

 

和前面提到過的service的類似,這里的operation也分為兩大系:IOCP EnableDisable系列。這里我們重點關注下圖中紅色部分表示的IOCP Enable系列operation

 


  

OVERLAPPED基類

從上圖可以看到,所有IOCP Enableoperation,其基類都是struct OVERLAPPED結構,該結構是Win32進行交疊IO一個非常重要的結構,用以異步執行過程中的參數傳遞。所有的operation直接從該結構繼承的結果,就是所有operation對象,可以直接作為OVERLAPPED結構在異步函數中進行傳遞。

 

例如在win_iocp_socket_service_base中,為了啟動一個receive的異步操作, start_receive_op函數就直接把傳遞進來的operation指針作為OVERLAPPED結構傳遞給::WSARecv函數,從而發起一個異步服務請求。

void win_iocp_socket_service_base::start_receive_op(

   win_iocp_socket_service_base::base_implementation_type& impl,

   WSABUF* buffers, std::size_t buffer_count,

   socket_base::message_flags flags, bool noop,operation* op)

{

 update_cancellation_thread_id(impl);

 iocp_service_.work_started();

 

 if (noop)

   iocp_service_.on_completion(op);

 else if (!is_open(impl))

   iocp_service_.on_completion(op, boost::asio::error::bad_descriptor);

 else

 {

   DWORD bytes_transferred = 0;

   DWORD recv_flags = flags;

   int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count),

       &bytes_transferred, &recv_flags, op, 0);

   DWORD last_error = ::WSAGetLastError();

   if (last_error == ERROR_NETNAME_DELETED)

     last_error = WSAECONNRESET;

   else if (last_error == ERROR_PORT_UNREACHABLE)

     last_error = WSAECONNREFUSED;

   if (result != 0 && last_error != WSA_IO_PENDING)

     iocp_service_.on_completion(op, last_error, bytes_transferred);

   else

     iocp_service_.on_pending(op);

 }

}

 

執行流程

關於operation對象的創建、傳遞,以及完成handler的執行序列等,使用下圖可以清晰的描述。

 


  

下表反映了Windows環境下,部分的異步請求所對應的服務、win函數、operation等信息:

 

異步請求

服務

start op

Win32函數

對應operation

ip::tcp::socket::async_connect

win_iocp_socket_service

start_connect_op()

::connect

reactive_socket_connect_op

ip::tcp::socket::async_read_some

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_receive

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_write_some

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::socket::async_send

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::acceptor::async_accept

start_accept_op()

::AcceptEx

win_iocp_socket_accept_op

 

 

 

 

 

ip::tcp::resolver::async_resolve

resolver_service

start_resolve_op()

::getaddrinfo

resolve_op

 

 

靜態的do_complete

不知你是否注意到,在operation的類圖中,所有從operation繼承的子類,都定義了一個do_complete()函數,然而該函數聲明為static,這又是為何呢?

 

我們以win_iocp_socket_recv_op為例來進行說明。該類中的do_complete是這樣聲明的:

     staticvoid do_complete(io_service_impl* owner,

         operation* base,

         const boost::system::error_code& result_ec,

         std::size_t bytes_transferred)

 

該類的構造函數,又把此函數地址傳遞給父類win_iocp_operation去初始化父類成員,這兩個類的構造函數分別如下,請注意加粗代碼:

win_iocp_socket_recv_op ::

win_iocp_socket_recv_op(socket_ops::state_type state,

     socket_ops::weak_cancel_token_type cancel_token,

     const MutableBufferSequence& buffers, Handler& handler)

   :operation(&win_iocp_socket_recv_op::do_complete),

         state_(state),

         cancel_token_(cancel_token),

         buffers_(buffers),

         handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))

   {

   }

 

win_iocp_operation ::win_iocp_operation(func_type func)

         : next_(0),

         func_(func)

   {

         reset();

   }

 

至此,我們明白,將do_complete聲明為static,可以方便獲取函數指針,並在父類中進行回調。那么,不僅要問,既然兩個類存在繼承關系,那么為何不將do_complete聲明為虛函數呢?

 

再回頭看看這些類的最頂層基類,就會明白的。最頂層的OVERLAPPED基類,使得將operation對象作為OVERLAPPED對象在異步函數中進行傳遞成為可能;如果將do_complete聲明為虛函數,則多數編譯器會在對象起始位置放置vptr,這樣就改變了內存布局,從而不能再把operation對象直接作為OVERLAPPED對象進行傳遞了。

 

當然,一定要用虛函數的話,也不是不可能,只是在傳遞對象的時候,就需要考慮到vptr的存在,這會有兩個方面的問題:一是進行多態類型轉換時,效率上的損失;二是各家編譯器對vtpr的實現各不相同,跨平台的asio就需要進行多種適配,這無疑又過於煩躁了。於是作者就采取了最為簡單有效的方式——用static函數來進行回調——簡單,就美。

 

win_iocp_io_service的實現

Windows NT環境下(IOCP Enabled),win_iocp_io_service代表着io_service,是整個asio的運轉核心。本節開始來分析該類的實現。

 

從類的命名也可以看出,IOCP是該實現的核心。IOCPIO Completion Port IOCP)在windows上,可以說是效率最高的異步IO模型了,他使用有限的線程,處理盡可能多的並發IO請求。該模型雖說可以應用於各種IO處理,但目前應用較多的還是網絡IO方面。

 

我們都知道,在Window是環境下使用IOCP,基本上需要這樣幾個步驟:

  1. 使用Win函數CreateIoCompletionPort()創建一個完成端口對象;
  2. 創建一個IO對象,如用於listensocket對象;
  3. 再次調用CreateIoCompletionPort()函數,分別在參數中指定第二步創建的IO對象和第一步創建的完成端口對象。由於指定了這兩個參數,這一次的調用,只是告訴系統,后續該IO對象上所有的完成事件,都投遞到綁定的完成端口上。
  4. 創建一個線程或者線程池,用以服務完成端口事件;

所有這些線程調用GetQueuedCompletionStatus()函數等待一個完成端口事件的到來;

  1. 進行異步調用,例如WSASend()等操作。
  2. 在系統執行完異步操作並把事件投遞到端口上,或者客戶自己調用了PostQueuedCompletionStatus()函數,使得在完成端口上等待的一個線程蘇醒,執行后續的服務操作。

 

那么,這些步驟,是如何分散到asio中的呢?來吧,先從完成端口創建開始。

完成端口的創建

如上所述,完成端口的創建,需要調用CreateIoCompletionPort()函數,在win_iocp_io_service的構造函數中,就有這樣的操作:

win_iocp_io_service::win_iocp_io_service(

   boost::asio::io_service& io_service, size_tconcurrency_hint)

 : boost::asio::detail::service_base<win_iocp_io_service>(io_service),

   iocp_(),

   outstanding_work_(0),

   stopped_(0),

   stop_event_posted_(0),

   shutdown_(0),

   dispatch_required_(0)

{

 BOOST_ASIO_HANDLER_TRACKING_INIT;

 

 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,

     static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));

 if (!iocp_.handle)

 {

   DWORD last_error = ::GetLastError();

   boost::system::error_code ec(last_error,

       boost::asio::error::get_system_category());

   boost::asio::detail::throw_error(ec, "iocp");

 }

}

 

win_iocp_io_service的構造函數,負責創建一個完成端口,並把此完成端口對象的句柄交給一個auto_handle進行管理——auto_handle的唯一用途,就是在對象析構時,調用::CloseHandle()windows句柄資源關閉,從而保證不會資源泄露。

 

我們在windows環境下,聲明一個boost::asio::io_service對象,其內部就創建了一個win_iocp_io_service的實例;因此,一個io_service對象就對應着一個完成端口對象——這也就可以解釋,為什么所有的IO Object都需要一個io_service參數了——這樣,大家就好公用外面定義好的完成端口對象。

 

除了io_service對象會創建一個完成端口對象,事實上,在asio中,另外一個service也會創建一個,這就是boost::asio::ip::resolver_service。該類對應的detail實現boost::asio::detail::resolver_service中,有一個數據成員是: io_service,這樣就同樣創建了一個完成端口對象:

    namespace boost {

namespace asio {

namespace detail {

 

class resolver_service_base

{

...

protected:

// Private io_service used for performing asynchronous host resolution.

            scoped_ptr<boost::asio::io_service> work_io_service_;

...

 

至於該完成端口的用途如何,我們在后續部分再來說明——搽,又開始挖坑了。

完成端口的綁定

在創建了io對象后,例如socket,就需要將此對象和完成端口對象綁定起來,以指示操作系統將該io對象上后續所有的完成事件發送到某個完成端口上,該操作同樣是由CreateIoCompletionPort()函數完成,只是所使用的參數不同。

 

win_iocp_io_service中,這個操作由下面的代碼完成——請注意參數的差別:

boost::system::error_code win_iocp_io_service::register_handle(

   HANDLE handle, boost::system::error_code& ec)

{

 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0)

 {

   DWORD last_error = ::GetLastError();

   ec = boost::system::error_code(last_error,

       boost::asio::error::get_system_category());

 }

 else

 {

   ec = boost::system::error_code();

 }

 return ec;

}

 

通過代碼搜索,我們發現函數win_iocp_socket_service_base::do_open()內部調用了register_handle();該函數的作用是打開一個socket(其中調用了socket函數socket()去創建一個socket),也就是說,在打開一個socket后,就把該socket綁定到指定的完成端口上,這樣,后續的事件就會發送到完成端口了。

 

此外還有另外的和assign相關的兩個函數也調用了register_handle(),不再貼出其代碼了。

 

線程函數

IOCP要求至少有一個線程進行服務,也可以有一堆線程;io_service早就為這些線程准備好了服務例程,即io_service::run()函數。

  • 如果應用只打算使用一個線程進行服務,那么在主線程中准備好了異步請求后,調用io_service::run()即可。注意,必須先發起一個異步請求,然后才能調用run()。參考一下run()的實現就會明白。
  • 如果打算用多個線程進行服務,可以創建多個線程,指定io_service::run()作為線程函數即可。一個最簡單的示例是:

void server::run()

{

 // Create a pool of threads to run all of the io_services.

 std::vector<boost::shared_ptr<boost::thread> > threads;

 for (std::size_t i = 0; i < thread_pool_size_; ++i)

 {

boost::shared_ptr<boost::thread>

   thread(

       new boost::thread(

               boost::bind(&boost::asio::io_service::run, &io_service_)

           )

       );

   threads.push_back(thread);

 }

 

 // Wait for all threads in the pool to exit.

 for (std::size_t i = 0; i < threads.size(); ++i)

   threads[i]->join();

}

 

由於io_service::run()又是委托win_iocp_io_service::run()來實現的,我們來看看后者的實現:

size_t win_iocp_io_service::run(boost::system::error_code& ec)

{

 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)

 {

   stop();

   ec = boost::system::error_code();

   return 0;

 }

 

 win_iocp_thread_info this_thread;

 thread_call_stack::context ctx(this, this_thread);

 

 size_t n = 0;

  while (do_one(true, ec))

   if (n != (std::numeric_limits<size_t>::max)())

     ++n;

 return n;

}

 

run()首先檢查是否有需要處理的操作,如果沒有,函數退出;win_iocp_io_service使用outstanding_work_來記錄當前需要處理的任務數。如果該數值不為0,則委托do_one函數繼續處理——asio中,所有的臟活累活都在這里處理了。

 

win_iocp_io_service::do_one函數較長,我們只貼出核心代碼

size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)

{

 for (;;)

 {

   // Try to acquire responsibility for dispatching timers and completed ops.

   if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1

   {

     mutex::scoped_lock lock(dispatch_mutex_);

 

     // Dispatch pending timers and operations.

     op_queue<win_iocp_operation> ops;

     ops.push(completed_ops_);

     timer_queues_.get_ready_timers(ops);

     post_deferred_completions(ops);?#2

     update_timeout();

   }

 

   // Get the next operation from the queue.

   DWORD bytes_transferred = 0;

   dword_ptr_t completion_key = 0;

   LPOVERLAPPED overlapped = 0;

::SetLastError(0);

   BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,

       &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3

   DWORD last_error = ::GetLastError();

 

   if (overlapped)

   {

     win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4

     boost::system::error_code result_ec(last_error,

         boost::asio::error::get_system_category());

 

     // We may have been passed the last_error and bytes_transferred in the

     // OVERLAPPED structure itself.

     if (completion_key == overlapped_contains_result)

     {

       result_ec = boost::system::error_code(static_cast<int>(op->Offset),

           *reinterpret_cast<boost::system::error_category*>(op->Internal));

       bytes_transferred = op->OffsetHigh;

     }

 

     // Otherwise ensure any result has been saved into the OVERLAPPED

     // structure.

     else

     {

       op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());

       op->Offset = result_ec.value();

       op->OffsetHigh = bytes_transferred;

     }

 

     // Dispatch the operation only if ready. The operation may not be ready

     // if the initiating function (e.g. a call to WSARecv) has not yet

     // returned. This is because the initiating function still wants access

     // to the operation's OVERLAPPED structure.

     if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)

     {

       // Ensure the count of outstanding work is decremented on block exit.

       work_finished_on_block_exit on_exit = { this };?#5

       (void)on_exit;?#6

 

       op->complete(*this, result_ec, bytes_transferred);?#7

       ec = boost::system::error_code();

       return 1;

     }

   }

   else if (!ok)

{

...

 

做一下簡要說明:

-  #1變量dispatch_required_記錄了由於資源忙,而沒有成功投遞的操作數;所有這些操作都記錄在隊列completed_ops_中;

-  #2將所有需要投遞的操作,投遞出去;至於什么樣的操作需要投遞,何時投遞,以及為先前會投遞失敗,失敗后如何處理等,我們后續說明——再次挖坑了。

-  #3IOCP的核心操作函數GetQueuedCompletionStatus()出現了。該函數導致線程在完成端口上進行等待,直到超時或者某個完成端口數據包到來。

-  #4注意這里將 OVERLAPPED結構直接轉換為 operation對象。相關內容在前面的operation:OVERLAPPED基類部分已經有說明。

-  #5該變量保證在操作完成,return之后,win_iocp_io_service對象所記錄的任務數outstanding_work_會自動減1——是啊,辛辛苦苦做的事兒,能不記錄下來嘛!

-  #6這一行從功能上講沒有什么特別的用途;不過有了這一行,可以抑制有些編譯器針對 #5所聲明的變量沒有被使用的編譯器警告;

-  #7調用operation對象的complete()函數,從而調用到異步操作所設定的回調函數。具體流程參考operation:執行流程

 

任務投遞

上述的線程函數,會在GetQueuedCompletionStatus()函數上進行等待,直到超時或者有完成端口數據包到來;

 

完成端口數據包,有兩個來源:一個是用戶所請求的異步操作完成,異步服務執行者(這里是操作系統)向該完成端口投遞完成端口數據包;另外一種情況是,用戶自己使用IOCP的另外一個核心函數PostQueuedCompletionStatus()向完成端口投遞數據包;

 

一般的異步操作請求,是不需要用戶自己主動向完成端口投遞數據的,例如async_read, asyn_write等操作;

 

有另外一些操作,由於沒有對應或者作者並沒有采用支持OVERLAPPED IO操作的Win32函數,就需要實現者自己管理完成事件,並進行完成端口數據包的投遞,比如:

  • async_resolve:由於系統沒有提供對應的OVERLAPPED IO操作,需要實現者自己管理,所以其自己進行投遞
  • async_connect:由於作者並沒有采用支持OVERLAPPED IOConnectEx()版本的連接函數,而是采用了標准的socket函數connect()進行連接,所以也需要自己進行投遞

 

另外還有一些io_service提供的操作,例如請求io_service執行代為執行指定handler的操作:

  • dispatch(handler)
  • post(handler)

 

所有這些需要自己投遞完成端口數據包的操作,基本上都是這樣一個投遞流程:

  • 調用win_iocp_io_service::post_immediate_completion(op)
    • 調用work_started()outstanding_work_ 1
    • 調用post_deferred_completion(op)
      • 由於自行管理,主動將op->ready_置為 1,表明op就緒
      • 調用PostQueuedCompletionStatus(op)進行投遞
      • 如果投遞失敗,則把該op放置到等待dispatch的隊列completed_ops_ 中,待下一次do_one()執行時,再次投遞

 

OK,至此,基本分析完了operation的投遞,總數填了一個前面挖下的坑。

Resolver自己的IOCP

前面說過,Resolver自己會創建一個IOCP,為什么會這樣呢?由於Win32下面沒有提供對應於地址解析的overlapped版本的函數,為了實現async_resolve操作,作者自己實現了這樣一個異步服務。在resolver_service內部,有一個io_service數據成員,該數據成員創建了一個IOCP;除此之外,該service內部還啟動一個工作線程來執行io_service::run(),使用此線程來模擬異步服務。

 

使用resolver進行async_resolve的詳細過程如下:

Main Thread (IOCP#1)

 

Resolver Thread (IOCP #2)

 

 

 

1.構建io_service對象, IOCP#1被創建

 

 

 

 

 

2.構建 resolver對象, IOCP#2被創建,同時該resolver持有io_service的引用

 

 

 

 

 

3.發起異步調用:resolver.async_resolve()

 

 

 

 

 

4. resolve_op被創建

 

 

 

 

 

5. Resolver線程啟動,主線程開始等待

 

 

 

 

 

 

 

6.開始運行,激活等待事件,並在 IOCP#2上開始等待

 

 

 

7.線程恢復執行;將op投遞到 IOCP#2

 

 

 

 

 

 

 

8.執行op->do_complete()操作,地址解析完成后,將op再回投給IOCP#1

 

 

 

9. do_one()得到Resolver線程投遞回來的op開始執行op->do_complete()操作,此時回調async_resolve所設置的handler

 

 

 

 

 

10.結束

 

 

 

請注意step8 step9執行同樣一個op->do_complete()函數,為什么操作不一樣呢?看其實現就知道,該函數內部,會判斷執行此函數時的owner,如果owner是主io_service對象,則說明是在主線程中執行,此時進行handler的回調;否則就說明在工作線程中,就去執行真正的地址解析工作;

任務的取消

針對socket上提交的異步請求,可以使用cancel()函數來取消掉該socket上所有沒執行的異步請求。

 

使用該函數,在Windows Vista(不含)以前的版本,有幾點需要注意:

  • 需要定義BOOST_ASIO_ENABLE_CANCELIO來啟用該功能
  • cancel()函數在內部調用Win32函數 CancelIo()
  • 該函數只能取消來自當前線程的異步請求
  • 對於正在執行的異步操作,則要看異步服務提供者是如何實現的了,可能會被取消,也可能不會;

針對這些問題,另外的替代方案是:

  • Window是上定義BOOST_ASIO_DISABLE_IOCP來禁用IOCP,使用老式的reactor模式(及select IO)。
  • 或者使用close()來關閉socket,如此一來所有未被執行的請求則都會被取消掉。

 

windows vista及后續版本中,cancel()函數在內部調用Win32函數 CancelIoEx(),該函數可以取消來自任何線程的異步操作請求,不存在上述問題。

 

需要注意的是,即使異步請求被取消了,所指定的handler也會被執行,只是傳遞的error code 為:boost::asio::error::operation_aborted

win_iocp_socket_service實現

service提供了windows下所有socket相關的功能,是asiowindows環境中一個非常重要的角色,他所提供的函數主要分下面兩類:

  • XXXXX(), async_XXXXX()對某個操作的同步、異步函數接口;主要被上層服務調用;例如connect(), async_connect()等;
  • start_XXXXX_op() :windows發出對應的異步操作請求,例如WSARecv

 

不過關於該類的實現前面已經做了較多的涉及,不再單獨詳述了。

前攝器模式

現在我們已經把Windows環境下所涉及到的關鍵部件都涉及到了,此刻我們再回過頭來,從高層俯瞰一下asio的架構,看看是否會有不一樣的感受呢?事實上,asio的文檔用下面的圖來說明asio的高層架構——前攝器模式,我們也從這個圖開始:

boost.asio 學習筆記05——asio的windows實現 - 地線 - 別再讓虛假消息充斥這本已混亂的世界

 

 

呵呵,其實這張圖,從一開始就是為了表達Proactor(前攝器)模式的,基本上它和asio沒半毛錢關系,只不過asio既支持同步IO,又支持異步IO,在異步IO部分,是參照Proactor模式來實現的。下面我們來分別說說asio的前攝器模式中的各個組件:

  • Initiator,(初始化器?)中文名還真不清楚,不過其實就是客戶代碼,甚至可以簡單理解到main函數,所有的是是非非,都是從這兒開始的。
  • Asynchronous Operation,定義的一系列異步操作,對應到Windows平台,諸如AcceptExWSASendWSARecv等函數。在asio中,這些函數封裝在win_iocp_socket_service resolver_service類中。
  • Asynchronous Operation Processor,異步操作處理器,他負責執行異步操作,並在操作完成后,把完成事件投放到完成事件隊列上。

僅僅從asio使用者的角度看,高層的stream_socket_service類就是一個這樣的處理器,因為從tcp::socket發送的異步操作都是由其完成處理的。但是從真正實現的角度看,這樣的異步操作在Windwos上,大部分由操作系統負責完成的,另外一部分由asio自己負責處理,如resolver_service,因此Windows操作系統和asio一起組成了異步操作處理器。

  • Completion Handler,完成事件處理器;這是由用戶自己定義的一個函數(函數對象),在異步操作完成后,由前攝器負責把該函數調用起來。

Windows平台上,io_service類通過win_iocp_io_service類的do_one()函數把每個異步操作所設定的completion handler調用起來。

  • Completion Event Queue完成事件隊列,存儲由異步操作處理器發送過來的完成事件,當異步事件多路分離器將其中一個事件取走之后,該事件從隊列中刪除;

Windows上,asio的完成事件隊列由操作系統負責管理;只不過該隊列中的數據有兩個來源,一個是Windows內部,另外一個是asio中自己PostQueuedCompletionStatus()所提交的事件。

  • Asynchronous Event Demultiplexer,異步事件多路分離器,他的作用就是在完成事件隊列上等待,一旦有事件到來,他就把該事件返回給調用者。

Windows上,這一功能也是由操作系統完成的,具體來說,我認為是由GetQueuedCompletionStatus完成的,而該函數時由do_one()調用的,因此,從高層的角度來看,這個分離器,也是由io_service負責的。

  • Proactor,前攝器,負責調度異步事件多路分離器去干活,並在異步操作完成時,調度所對應的Completion Handler。在asio中,這部分由io_service來做,具體Windows就是win_iocp_io_service

 

基於上述信息,我們重繪practor模式架構圖如下:

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM