1.主線程定義回調對象
2.調用io object的操作
3.io object會另開線程,定義opertion op來執行操作,同時將回調對象加到op的do_complete上。進行操作
4.完成操作加入完成隊列
5.io_service線程循環從完成隊列取事件,調用其事件對應的回調函數
Operation
還記得前面我們在分析resolver的實現的時候,挖了一個關於operation的坑?為了不讓自己陷進去,現在來填吧;接下來我們就來看看asio中的各種operation。
和前面提到過的service的類似,這里的operation也分為兩大系:IOCP Enable和Disable系列。這里我們重點關注下圖中紅色部分表示的IOCP Enable系列operation。

OVERLAPPED基類
從上圖可以看到,所有IOCP Enable的operation,其基類都是struct OVERLAPPED結構,該結構是Win32進行交疊IO一個非常重要的結構,用以異步執行過程中的參數傳遞。所有的operation直接從該結構繼承的結果,就是所有operation對象,可以直接作為OVERLAPPED結構在異步函數中進行傳遞。
例如在win_iocp_socket_service_base中,為了啟動一個receive的異步操作, start_receive_op函數就直接把傳遞進來的operation指針作為OVERLAPPED結構傳遞給::WSARecv函數,從而發起一個異步服務請求。
void win_iocp_socket_service_base::start_receive_op( win_iocp_socket_service_base::base_implementation_type& impl, WSABUF* buffers, std::size_t buffer_count, socket_base::message_flags flags, bool noop,operation* op) { update_cancellation_thread_id(impl); iocp_service_.work_started();
if (noop) iocp_service_.on_completion(op); else if (!is_open(impl)) iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); else { DWORD bytes_transferred = 0; DWORD recv_flags = flags; int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count), &bytes_transferred, &recv_flags, op, 0); DWORD last_error = ::WSAGetLastError(); if (last_error == ERROR_NETNAME_DELETED) last_error = WSAECONNRESET; else if (last_error == ERROR_PORT_UNREACHABLE) last_error = WSAECONNREFUSED; if (result != 0 && last_error != WSA_IO_PENDING) iocp_service_.on_completion(op, last_error, bytes_transferred); else iocp_service_.on_pending(op); } } |
執行流程
關於operation對象的創建、傳遞,以及完成handler的執行序列等,使用下圖可以清晰的描述。

下表反映了Windows環境下,部分的異步請求所對應的服務、win函數、operation等信息:
異步請求 |
服務 |
start op |
Win32函數 |
對應operation |
ip::tcp::socket::async_connect |
win_iocp_socket_service |
start_connect_op() |
::connect |
reactive_socket_connect_op |
ip::tcp::socket::async_read_some |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_receive |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_write_some |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::socket::async_send |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::acceptor::async_accept |
start_accept_op() |
::AcceptEx |
win_iocp_socket_accept_op |
|
|
|
|
|
|
ip::tcp::resolver::async_resolve |
resolver_service |
start_resolve_op() |
::getaddrinfo |
resolve_op |
靜態的do_complete
不知你是否注意到,在operation的類圖中,所有從operation繼承的子類,都定義了一個do_complete()函數,然而該函數聲明為static,這又是為何呢?
我們以win_iocp_socket_recv_op為例來進行說明。該類中的do_complete是這樣聲明的:
staticvoid do_complete(io_service_impl* owner,
operation* base,
const boost::system::error_code& result_ec,
std::size_t bytes_transferred)
該類的構造函數,又把此函數地址傳遞給父類win_iocp_operation去初始化父類成員,這兩個類的構造函數分別如下,請注意加粗代碼:
win_iocp_socket_recv_op ::
win_iocp_socket_recv_op(socket_ops::state_type state,
socket_ops::weak_cancel_token_type cancel_token,
const MutableBufferSequence& buffers, Handler& handler)
:operation(&win_iocp_socket_recv_op::do_complete),
state_(state),
cancel_token_(cancel_token),
buffers_(buffers),
handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))
{
}
win_iocp_operation ::win_iocp_operation(func_type func)
: next_(0),
func_(func)
{
reset();
}
至此,我們明白,將do_complete聲明為static,可以方便獲取函數指針,並在父類中進行回調。那么,不僅要問,既然兩個類存在繼承關系,那么為何不將do_complete聲明為虛函數呢?
再回頭看看這些類的最頂層基類,就會明白的。最頂層的OVERLAPPED基類,使得將operation對象作為OVERLAPPED對象在異步函數中進行傳遞成為可能;如果將do_complete聲明為虛函數,則多數編譯器會在對象起始位置放置vptr,這樣就改變了內存布局,從而不能再把operation對象直接作為OVERLAPPED對象進行傳遞了。
當然,一定要用虛函數的話,也不是不可能,只是在傳遞對象的時候,就需要考慮到vptr的存在,這會有兩個方面的問題:一是進行多態類型轉換時,效率上的損失;二是各家編譯器對vtpr的實現各不相同,跨平台的asio就需要進行多種適配,這無疑又過於煩躁了。於是作者就采取了最為簡單有效的方式——用static函數來進行回調——簡單,就美。
win_iocp_io_service的實現
在Windows NT環境下(IOCP Enabled),win_iocp_io_service代表着io_service,是整個asio的運轉核心。本節開始來分析該類的實現。
從類的命名也可以看出,IOCP是該實現的核心。IOCP(IO Completion Port, IOCP)在windows上,可以說是效率最高的異步IO模型了,他使用有限的線程,處理盡可能多的並發IO請求。該模型雖說可以應用於各種IO處理,但目前應用較多的還是網絡IO方面。
我們都知道,在Window是環境下使用IOCP,基本上需要這樣幾個步驟:
- 使用Win函數CreateIoCompletionPort()創建一個完成端口對象;
- 創建一個IO對象,如用於listen的socket對象;
- 再次調用CreateIoCompletionPort()函數,分別在參數中指定第二步創建的IO對象和第一步創建的完成端口對象。由於指定了這兩個參數,這一次的調用,只是告訴系統,后續該IO對象上所有的完成事件,都投遞到綁定的完成端口上。
- 創建一個線程或者線程池,用以服務完成端口事件;
所有這些線程調用GetQueuedCompletionStatus()函數等待一個完成端口事件的到來;
- 進行異步調用,例如WSASend()等操作。
- 在系統執行完異步操作並把事件投遞到端口上,或者客戶自己調用了PostQueuedCompletionStatus()函數,使得在完成端口上等待的一個線程蘇醒,執行后續的服務操作。
那么,這些步驟,是如何分散到asio中的呢?來吧,先從完成端口創建開始。
完成端口的創建
如上所述,完成端口的創建,需要調用CreateIoCompletionPort()函數,在win_iocp_io_service的構造函數中,就有這樣的操作:
win_iocp_io_service::win_iocp_io_service( boost::asio::io_service& io_service, size_tconcurrency_hint) : boost::asio::detail::service_base<win_iocp_io_service>(io_service), iocp_(), outstanding_work_(0), stopped_(0), stop_event_posted_(0), shutdown_(0), dispatch_required_(0) { BOOST_ASIO_HANDLER_TRACKING_INIT;
iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0)))); if (!iocp_.handle) { DWORD last_error = ::GetLastError(); boost::system::error_code ec(last_error, boost::asio::error::get_system_category()); boost::asio::detail::throw_error(ec, "iocp"); } } |
win_iocp_io_service的構造函數,負責創建一個完成端口,並把此完成端口對象的句柄交給一個auto_handle進行管理——auto_handle的唯一用途,就是在對象析構時,調用::CloseHandle()把windows句柄資源關閉,從而保證不會資源泄露。
我們在windows環境下,聲明一個boost::asio::io_service對象,其內部就創建了一個win_iocp_io_service的實例;因此,一個io_service對象就對應着一個完成端口對象——這也就可以解釋,為什么所有的IO Object都需要一個io_service參數了——這樣,大家就好公用外面定義好的完成端口對象。
除了io_service對象會創建一個完成端口對象,事實上,在asio中,另外一個service也會創建一個,這就是boost::asio::ip::resolver_service。該類對應的detail實現boost::asio::detail::resolver_service中,有一個數據成員是: io_service,這樣就同樣創建了一個完成端口對象:
namespace boost {
namespace asio {
namespace detail {
class resolver_service_base
{
...
protected:
// Private io_service used for performing asynchronous host resolution.
scoped_ptr<boost::asio::io_service> work_io_service_;
...
至於該完成端口的用途如何,我們在后續部分再來說明——搽,又開始挖坑了。
完成端口的綁定
在創建了io對象后,例如socket,就需要將此對象和完成端口對象綁定起來,以指示操作系統將該io對象上后續所有的完成事件發送到某個完成端口上,該操作同樣是由CreateIoCompletionPort()函數完成,只是所使用的參數不同。
在win_iocp_io_service中,這個操作由下面的代碼完成——請注意參數的差別:
boost::system::error_code win_iocp_io_service::register_handle( HANDLE handle, boost::system::error_code& ec) { if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0) { DWORD last_error = ::GetLastError(); ec = boost::system::error_code(last_error, boost::asio::error::get_system_category()); } else { ec = boost::system::error_code(); } return ec; } |
通過代碼搜索,我們發現函數win_iocp_socket_service_base::do_open()內部調用了register_handle();該函數的作用是打開一個socket(其中調用了socket函數socket()去創建一個socket),也就是說,在打開一個socket后,就把該socket綁定到指定的完成端口上,這樣,后續的事件就會發送到完成端口了。
此外還有另外的和assign相關的兩個函數也調用了register_handle(),不再貼出其代碼了。
線程函數
IOCP要求至少有一個線程進行服務,也可以有一堆線程;io_service早就為這些線程准備好了服務例程,即io_service::run()函數。
- 如果應用只打算使用一個線程進行服務,那么在主線程中准備好了異步請求后,調用io_service::run()即可。注意,必須先發起一個異步請求,然后才能調用run()。參考一下run()的實現就會明白。
- 如果打算用多個線程進行服務,可以創建多個線程,指定io_service::run()作為線程函數即可。一個最簡單的示例是:
void server::run() { // Create a pool of threads to run all of the io_services. std::vector<boost::shared_ptr<boost::thread> > threads; for (std::size_t i = 0; i < thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread( new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_) ) ); threads.push_back(thread); }
// Wait for all threads in the pool to exit. for (std::size_t i = 0; i < threads.size(); ++i) threads[i]->join(); } |
由於io_service::run()又是委托win_iocp_io_service::run()來實現的,我們來看看后者的實現:
size_t win_iocp_io_service::run(boost::system::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { stop(); ec = boost::system::error_code(); return 0; }
win_iocp_thread_info this_thread; thread_call_stack::context ctx(this, this_thread);
size_t n = 0; while (do_one(true, ec)) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } |
run()首先檢查是否有需要處理的操作,如果沒有,函數退出;win_iocp_io_service使用outstanding_work_來記錄當前需要處理的任務數。如果該數值不為0,則委托do_one函數繼續處理——asio中,所有的臟活累活都在這里處理了。
win_iocp_io_service::do_one函數較長,我們只貼出核心代碼
size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec) { for (;;) { // Try to acquire responsibility for dispatching timers and completed ops. if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1 { mutex::scoped_lock lock(dispatch_mutex_);
// Dispatch pending timers and operations. op_queue<win_iocp_operation> ops; ops.push(completed_ops_); timer_queues_.get_ready_timers(ops); post_deferred_completions(ops);?#2 update_timeout(); }
// Get the next operation from the queue. DWORD bytes_transferred = 0; dword_ptr_t completion_key = 0; LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3 DWORD last_error = ::GetLastError();
if (overlapped) { win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4 boost::system::error_code result_ec(last_error, boost::asio::error::get_system_category());
// We may have been passed the last_error and bytes_transferred in the // OVERLAPPED structure itself. if (completion_key == overlapped_contains_result) { result_ec = boost::system::error_code(static_cast<int>(op->Offset), *reinterpret_cast<boost::system::error_category*>(op->Internal)); bytes_transferred = op->OffsetHigh; }
// Otherwise ensure any result has been saved into the OVERLAPPED // structure. else { op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category()); op->Offset = result_ec.value(); op->OffsetHigh = bytes_transferred; }
// Dispatch the operation only if ready. The operation may not be ready // if the initiating function (e.g. a call to WSARecv) has not yet // returned. This is because the initiating function still wants access // to the operation's OVERLAPPED structure. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) { // Ensure the count of outstanding work is decremented on block exit. work_finished_on_block_exit on_exit = { this };?#5 (void)on_exit;?#6
op->complete(*this, result_ec, bytes_transferred);?#7 ec = boost::system::error_code(); return 1; } } else if (!ok) { ... |
做一下簡要說明:
- #1:變量dispatch_required_記錄了由於資源忙,而沒有成功投遞的操作數;所有這些操作都記錄在隊列completed_ops_中;
- #2:將所有需要投遞的操作,投遞出去;至於什么樣的操作需要投遞,何時投遞,以及為先前會投遞失敗,失敗后如何處理等,我們后續說明——再次挖坑了。
- #3:IOCP的核心操作函數GetQueuedCompletionStatus()出現了。該函數導致線程在完成端口上進行等待,直到超時或者某個完成端口數據包到來。
- #4:注意這里將 OVERLAPPED結構直接轉換為 operation對象。相關內容在前面的operation:OVERLAPPED基類部分已經有說明。
- #5:該變量保證在操作完成,return之后,win_iocp_io_service對象所記錄的任務數outstanding_work_會自動減1——是啊,辛辛苦苦做的事兒,能不記錄下來嘛!
- #6:這一行從功能上講沒有什么特別的用途;不過有了這一行,可以抑制有些編譯器針對 #5所聲明的變量沒有被使用的編譯器警告;
- #7:調用operation對象的complete()函數,從而調用到異步操作所設定的回調函數。具體流程參考operation:執行流程。
任務投遞
上述的線程函數,會在GetQueuedCompletionStatus()函數上進行等待,直到超時或者有完成端口數據包到來;
完成端口數據包,有兩個來源:一個是用戶所請求的異步操作完成,異步服務執行者(這里是操作系統)向該完成端口投遞完成端口數據包;另外一種情況是,用戶自己使用IOCP的另外一個核心函數PostQueuedCompletionStatus()向完成端口投遞數據包;
一般的異步操作請求,是不需要用戶自己主動向完成端口投遞數據的,例如async_read, asyn_write等操作;
有另外一些操作,由於沒有對應或者作者並沒有采用支持OVERLAPPED IO操作的Win32函數,就需要實現者自己管理完成事件,並進行完成端口數據包的投遞,比如:
- async_resolve:由於系統沒有提供對應的OVERLAPPED IO操作,需要實現者自己管理,所以其自己進行投遞
- async_connect:由於作者並沒有采用支持OVERLAPPED IO的ConnectEx()版本的連接函數,而是采用了標准的socket函數connect()進行連接,所以也需要自己進行投遞
另外還有一些io_service提供的操作,例如請求io_service執行代為執行指定handler的操作:
- dispatch(handler)
- post(handler)
所有這些需要自己投遞完成端口數據包的操作,基本上都是這樣一個投遞流程:
- 調用win_iocp_io_service::post_immediate_completion(op)
- 調用work_started()給outstanding_work_加 1
- 調用post_deferred_completion(op)
- 由於自行管理,主動將op->ready_置為 1,表明op就緒
- 調用PostQueuedCompletionStatus(op)進行投遞
- 如果投遞失敗,則把該op放置到等待dispatch的隊列completed_ops_ 中,待下一次do_one()執行時,再次投遞
OK,至此,基本分析完了operation的投遞,總數填了一個前面挖下的坑。
Resolver自己的IOCP
前面說過,Resolver自己會創建一個IOCP,為什么會這樣呢?由於Win32下面沒有提供對應於地址解析的overlapped版本的函數,為了實現async_resolve操作,作者自己實現了這樣一個異步服務。在resolver_service內部,有一個io_service數據成員,該數據成員創建了一個IOCP;除此之外,該service內部還啟動一個工作線程來執行io_service::run(),使用此線程來模擬異步服務。
使用resolver進行async_resolve的詳細過程如下:
Main Thread (IOCP#1) |
Resolver Thread (IOCP #2) |
|
|
|
|
1.構建主io_service對象, IOCP#1被創建 |
|
|
|
|
|
2.構建 resolver對象, IOCP#2被創建,同時該resolver持有主io_service的引用 |
|
|
|
|
|
3.發起異步調用:resolver.async_resolve() |
|
|
|
|
|
4. resolve_op被創建 |
|
|
|
|
|
5. Resolver線程啟動,主線程開始等待 |
|
|
|
|
|
|
|
6.開始運行,激活等待事件,並在 IOCP#2上開始等待 |
|
|
|
7.線程恢復執行;將op投遞到 IOCP#2 |
|
|
|
|
|
|
|
8.執行op->do_complete()操作,地址解析完成后,將op再回投給IOCP#1 |
|
|
|
9. do_one()得到從Resolver線程投遞回來的op,開始執行op->do_complete()操作,此時回調async_resolve所設置的handler |
|
|
|
|
|
10.結束 |
|
|
請注意step8和 step9,執行同樣一個op->do_complete()函數,為什么操作不一樣呢?看其實現就知道,該函數內部,會判斷執行此函數時的owner,如果owner是主io_service對象,則說明是在主線程中執行,此時進行handler的回調;否則就說明在工作線程中,就去執行真正的地址解析工作;
任務的取消
針對socket上提交的異步請求,可以使用cancel()函數來取消掉該socket上所有沒執行的異步請求。
使用該函數,在Windows Vista(不含)以前的版本,有幾點需要注意:
- 需要定義BOOST_ASIO_ENABLE_CANCELIO來啟用該功能
- cancel()函數在內部調用Win32函數 CancelIo()。
- 該函數只能取消來自當前線程的異步請求
- 對於正在執行的異步操作,則要看異步服務提供者是如何實現的了,可能會被取消,也可能不會;
針對這些問題,另外的替代方案是:
- 在Window是上定義BOOST_ASIO_DISABLE_IOCP來禁用IOCP,使用老式的reactor模式(及select IO)。
- 或者使用close()來關閉socket,如此一來所有未被執行的請求則都會被取消掉。
在windows vista及后續版本中,cancel()函數在內部調用Win32函數 CancelIoEx(),該函數可以取消來自任何線程的異步操作請求,不存在上述問題。
需要注意的是,即使異步請求被取消了,所指定的handler也會被執行,只是傳遞的error code 為:boost::asio::error::operation_aborted。
win_iocp_socket_service實現
該service提供了windows下所有socket相關的功能,是asio在windows環境中一個非常重要的角色,他所提供的函數主要分下面兩類:
- XXXXX(), async_XXXXX():對某個操作的同步、異步函數接口;主要被上層服務調用;例如connect(), async_connect()等;
- start_XXXXX_op() :向windows發出對應的異步操作請求,例如WSARecv;
不過關於該類的實現前面已經做了較多的涉及,不再單獨詳述了。
前攝器模式
現在我們已經把Windows環境下所涉及到的關鍵部件都涉及到了,此刻我們再回過頭來,從高層俯瞰一下asio的架構,看看是否會有不一樣的感受呢?事實上,asio的文檔用下面的圖來說明asio的高層架構——前攝器模式,我們也從這個圖開始:
呵呵,其實這張圖,從一開始就是為了表達Proactor(前攝器)模式的,基本上它和asio沒半毛錢關系,只不過asio既支持同步IO,又支持異步IO,在異步IO部分,是參照Proactor模式來實現的。下面我們來分別說說asio的前攝器模式中的各個組件:
- Initiator,(初始化器?)中文名還真不清楚,不過其實就是客戶代碼,甚至可以簡單理解到main函數,所有的是是非非,都是從這兒開始的。
- Asynchronous Operation,定義的一系列異步操作,對應到Windows平台,諸如AcceptEx,WSASend,WSARecv等函數。在asio中,這些函數封裝在win_iocp_socket_service, resolver_service類中。
- Asynchronous Operation Processor,異步操作處理器,他負責執行異步操作,並在操作完成后,把完成事件投放到完成事件隊列上。
僅僅從asio使用者的角度看,高層的stream_socket_service類就是一個這樣的處理器,因為從tcp::socket發送的異步操作都是由其完成處理的。但是從真正實現的角度看,這樣的異步操作在Windwos上,大部分由操作系統負責完成的,另外一部分由asio自己負責處理,如resolver_service,因此Windows操作系統和asio一起組成了異步操作處理器。
- Completion Handler,完成事件處理器;這是由用戶自己定義的一個函數(函數對象),在異步操作完成后,由前攝器負責把該函數調用起來。
在Windows平台上,io_service類通過win_iocp_io_service類的do_one()函數把每個異步操作所設定的completion handler調用起來。
- Completion Event Queue,完成事件隊列,存儲由異步操作處理器發送過來的完成事件,當異步事件多路分離器將其中一個事件取走之后,該事件從隊列中刪除;
在Windows上,asio的完成事件隊列由操作系統負責管理;只不過該隊列中的數據有兩個來源,一個是Windows內部,另外一個是asio中自己PostQueuedCompletionStatus()所提交的事件。
- Asynchronous Event Demultiplexer,異步事件多路分離器,他的作用就是在完成事件隊列上等待,一旦有事件到來,他就把該事件返回給調用者。
在Windows上,這一功能也是由操作系統完成的,具體來說,我認為是由GetQueuedCompletionStatus完成的,而該函數時由do_one()調用的,因此,從高層的角度來看,這個分離器,也是由io_service負責的。
- Proactor,前攝器,負責調度異步事件多路分離器去干活,並在異步操作完成時,調度所對應的Completion Handler。在asio中,這部分由io_service來做,具體Windows就是win_iocp_io_service。
基於上述信息,我們重繪practor模式架構圖如下: