前言
本文使用的asio1.76.0從github獲取,老版本中有一個很重要的結構叫做io_service,新版本中改成了io_context,下面主要通過c++14下的一個異步的例子分析boost.asio的線程模型,其代碼結構比較復雜,時間有限不能分析的很詳細,只是做大體結構分析
正文
asio的線程模型和異步的調用如下圖
程序以一個io_context為核心,其下有一個scheduler對象(調度器),scheduler下面放着一個(op_queue_)任務隊列,一個epoll_fd,執行io_context.run()的時候,開始一個循環去競爭消費任務隊列;任務隊列有兩種任務,一種是epoll任務,就是去做一次epoll_wait,還有一種任務是執行回調用函數;當使用一個異步調用的時候,本質就是往reactor中添加一個fd的event,將其回調保存進去 event._data中,當執行epoll_wait()監聽到該event事件后,取出該fd對應的callback放入到任務隊列(op_queue),由不同的線程去競爭消費;
拿官方的例子簡單分析這個流程,該例子來源於 asio/src/examples/cpp14/echo/async_tcp_echo_server.cpp
,
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
using asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_, max_length),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
});
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
asio::async_write(socket_, asio::buffer(data_, length),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server
{
public:
server(asio::io_context& io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](std::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
代碼很長,這里重點分析io_context.run();
和acceptor_.async_accept
以分析其線程模型和異步,main()中創建了io_context 和 server, server中創建了acceptor和socket,然后執行一個異步async_accept,將一個lambda函數注冊進去,accept事件發生了,進行accept創建客戶端的socket后,調用該lambda回調去處理新accept的socket; 然后執行io_content.run(),做一個循環監聽
程序的調用圖如下所示(原圖)
如圖上標黃,主要分成3個比較重要的部分:
第一部分是acceptor的創建,放入epoll進行監聽, 在server的構造函數中開始構造acceptor
class server
{
public:
server(asio::io_context& io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)), //構造acceptor
socket_(io_context)
{
do_accept();
}
private:
......
tcp::acceptor acceptor_;
tcp::socket socket_;
};
但其實是用了一個typedef,構造的是class basic_socket_acceptor<tcp>
,在這里,除了看到了熟悉的bind和listen,重要的是open
// /usr/local/include/asio/basic_socket_acceptor.hpp
template <typename ExecutionContext>
basic_socket_acceptor(ExecutionContext& context,
const endpoint_type& endpoint, bool reuse_addr = true,
typename constraint<
is_convertible<ExecutionContext&, execution_context&>::value
>::type = 0)
: impl_(0, 0, context)
{
asio::error_code ec;
const protocol_type protocol = endpoint.protocol();
impl_.get_service().open(impl_.get_implementation(), protocol, ec); //open
asio::detail::throw_error(ec, "open");
if (reuse_addr)
{
impl_.get_service().set_option(impl_.get_implementation(),
socket_base::reuse_address(true), ec);
asio::detail::throw_error(ec, "set_option");
}
impl_.get_service().bind(impl_.get_implementation(), endpoint, ec);// bind
asio::detail::throw_error(ec, "bind");
impl_.get_service().listen(impl_.get_implementation(), // listen
socket_base::max_listen_connections, ec);
asio::detail::throw_error(ec, "listen");
}
open()
是do_open()
的封裝,為了實現跨平台, 作用是創建一個socket,並為其添加相應epoll event
// Open a new socket implementation.
asio::error_code open(implementation_type& impl,
const protocol_type& protocol, asio::error_code& ec)
{
if (!do_open(impl, protocol.family(),
protocol.type(), protocol.protocol(), ec))
impl.protocol_ = protocol;
return ec;
}
以下是do_open( ),socket_holder創建了server端的socket, register_descriptor( )將該socket注冊到了該io_context上的epoll fd上
asio::error_code reactive_socket_service_base::do_open(
reactive_socket_service_base::base_implementation_type& impl,
int af, int type, int protocol, asio::error_code& ec)
{
if (is_open(impl))
{
ec = asio::error::already_open;
return ec;
}
socket_holder sock(socket_ops::socket(af, type, protocol, ec)); //創建了socket
if (sock.get() == invalid_socket)
return ec;
// 注冊到epoll上
if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
{
ec = asio::error_code(err,
asio::error::get_system_category());
return ec;
}
impl.socket_ = sock.release();
switch (type)
{
case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;
case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;
default: impl.state_ = 0; break;
}
ec = asio::error_code();
return ec;
}
到此,一個完整的server的socket就創建完畢,並且進行bind和listen,並放入到epoll中
第二個重要的部分是async_accept()
這個異步的運行
class server
{
......
private:
void do_accept()
{
acceptor_.async_accept(socket_, //執行異步調用
[this](std::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
使用第一部分創建好的acceptor進行async_accept()后,來到一個看起來很復雜的模板函數
template <typename Protocol1, typename Executor1,
ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
AcceptHandler ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
ASIO_INITFN_AUTO_RESULT_TYPE(AcceptHandler,
void (asio::error_code))
async_accept(basic_socket<Protocol1, Executor1>& peer,
ASIO_MOVE_ARG(AcceptHandler) handler
ASIO_DEFAULT_COMPLETION_TOKEN(executor_type),
typename constraint<
is_convertible<Protocol, Protocol1>::value
>::type = 0)
{
return async_initiate<AcceptHandler, void (asio::error_code)>(
initiate_async_accept(this), handler,
&peer, static_cast<endpoint_type*>(0));
}
不用太在意頭,看看async_initiate()
做了啥
template <typename CompletionToken,
ASIO_COMPLETION_SIGNATURE Signature,
typename Initiation, typename... Args>
inline typename constraint<
!detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
ASIO_MOVE_ARG(Args)... args)
{
async_completion<CompletionToken, Signature> completion(token);
ASIO_MOVE_CAST(Initiation)(initiation)(
ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
Signature))(completion.completion_handler),
ASIO_MOVE_CAST(Args)(args)...);
return completion.result.get();
}
來到了一個更加復雜的模板函數,但是函數體只有兩三句話,發現其調用的時候模板類Initiation的operator( ),這個模板類是class initiate_async_accept
, 其對應的operator( )如下
template <typename AcceptHandler, typename Protocol1, typename Executor1>
void operator()(ASIO_MOVE_ARG(AcceptHandler) handler,
basic_socket<Protocol1, Executor1>* peer,
endpoint_type* peer_endpoint) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a AcceptHandler.
ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;
detail::non_const_lvalue<AcceptHandler> handler2(handler);
self_->impl_.get_service().async_accept(
self_->impl_.get_implementation(), *peer, peer_endpoint,
handler2.value, self_->impl_.get_executor());
}
核心就一句self_->impl_.get_service().async_accept(...)
, 這個impl_.get_service()獲取到的是class reactive_socket_service
,在basic_socket_acceptor中可以看到
class basic_socket_acceptor
: public socket_base
{
...
detail::io_object_impl<detail::reactive_socket_service<Protocol>, Executor> impl_;
}
為此,找到了async_accept(..)的實現如下,看起來很長,但重點在最后的start_accept_op()
// Start an asynchronous accept. The peer and peer_endpoint objects must be
// valid until the accept's handler is invoked.
template <typename Socket, typename Handler, typename IoExecutor>
void async_accept(implementation_type& impl, Socket& peer,
endpoint_type* peer_endpoint, Handler& handler, const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);
// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_accept_op<Socket, Protocol, Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
peer, impl.protocol_, peer_endpoint, handler, io_ex);
ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_accept"));
start_accept_op(impl, p.p, is_continuation, peer.is_open()); // 開始accept 的操作
p.v = p.p = 0;
}
start_accept_op()
調用了start_op()
void reactive_socket_service_base::start_accept_op(
reactive_socket_service_base::base_implementation_type& impl,
reactor_op* op, bool is_continuation, bool peer_is_open)
{
if (!peer_is_open)
start_op(impl, reactor::read_op, op, is_continuation, true, false);
else
{
op->ec_ = asio::error::already_open;
reactor_.post_immediate_completion(op, is_continuation);
}
}
而start_op()
中調用的是reactor.start_op()
void reactive_socket_service_base::start_op(
reactive_socket_service_base::base_implementation_type& impl,
int op_type, reactor_op* op, bool is_continuation,
bool is_non_blocking, bool noop)
{
if (!noop)
{
if ((impl.state_ & socket_ops::non_blocking)
|| socket_ops::set_internal_non_blocking(
impl.socket_, impl.state_, true, op->ec_))
{
reactor_.start_op(op_type, impl.socket_,
impl.reactor_data_, op, is_continuation, is_non_blocking); //調用reactor的start_op()
return;
}
}
reactor_.post_immediate_completion(op, is_continuation);
}
reacttor.start_op()很長,截取重點如下,其會根據傳入的op_type(read_op = 0, write_op = 1,connect_op = 1, except_op = 2) 做一些event的監聽事件的epoll_mod,然后把operation(就是傳入的回調)放入當前的描述符的op_type下標對應的數組中,用於觸發的時候去運行
void epoll_reactor::start_op(int op_type, socket_type descriptor,
epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
...
if (op_type == write_op)
{
if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
{
epoll_event ev = { 0, { 0 } };
ev.events = descriptor_data->registered_events_ | EPOLLOUT;
ev.data.ptr = descriptor_data;
if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
{
descriptor_data->registered_events_ |= ev.events;
}
else
{
op->ec_ = asio::error_code(errno,
asio::error::get_system_category());
scheduler_.post_immediate_completion(op, is_continuation);
return;
}
}
}
...
descriptor_data->op_queue_[op_type].push(op);
scheduler_.work_started();
}
這一小節整體看下來,差不多可以得到這樣的關系 acceptor->reacotr_socket_service->reactor
第三部分是io_context.run()的時候如何進行epoll( )並處理事件的
從main開始進行io_context.run( )
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run(); //開始循環處理
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
實指執行了impl_.run()
io_context::count_type io_context::run()
{
asio::error_code ec;
count_type s = impl_.run(ec);
asio::detail::throw_error(ec);
return s;
}
其實指運行的是scheduler::run( ),看到其中有一個for循環運行do_run_one( )處理
std::size_t scheduler::run(asio::error_code& ec)
{
ec = asio::error_code();
if (outstanding_work_ == 0)
{
stop();
return 0;
}
thread_info this_thread;
this_thread.private_outstanding_work = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock()) //for循環處理
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
do_run_one( )比較長,但是很好理解,其從任務隊列獲取任務,然后判斷是否為task,如果是則說明需要做一次epoll對去接收fd的事件回來進行處理,如果不是,則是回調任務,把任務對應的回調函數運行即可;
其次,可以看到當判斷任務有多個的時候會執行wakeup_event_.unlock_and_signal_one(lock);
去喚醒別的線程也來消費該任務隊列,鎖的管理上,在上面的for循環上,一直都加有鎖,而下面真正獲取了任務去執行的時候就會把鎖給放掉,同時用task_cleanup這個RAII的類,在處理完任務之后把鎖給加回來;
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
scheduler::thread_info& this_thread,
const asio::error_code& ec)
{
while (!stopped_)
{
if (!op_queue_.empty())
{
// Prepare to execute first handler from queue.
operation* o = op_queue_.front(); //從任務隊列中獲取任務
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
if (o == &task_operation_) // epoll任務的話, 進行一次epoll,
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);//喚醒其它線程
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
}
else //回調的話,進行回調
{
std::size_t task_result = o->task_result_;
if (more_handlers && !one_thread_)
wake_one_thread_and_unlock(lock);
else
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
o->complete(this, ec, task_result); //調用回調函數
this_thread.rethrow_pending_exception();
return 1;
}
}
else
{
wakeup_event_.clear(lock);
wakeup_event_.wait(lock);
}
}
return 0;
}
這里細看一下task->run( )進行epoll時是怎么收任務的, 將epoll返回的event中的事件保存下來以傳給回調函數處理事件(上面調用回調函數時候傳遞的task_result),把這個描述符數據(包含了回調和結果)放入任務隊列;這里有一個疑惑,epoll_reactor::run()這個函數是沒有做加鎖操作的,可epoll fd是共享的,存在多個線程同時epoll( )的問題需要做同步,不知道其怎么實現的
void epoll_reactor::run(long usec, op_queue<operation>& ops)
{
// Block on the epoll descriptor.
epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
// Dispatch the waiting events.
for (int i = 0; i < num_events; ++i)
{
...
void* ptr = events[i].data.ptr;
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
descriptor_data->set_ready_events(events[i].events); //保存事件
ops.push(descriptor_data); // 放入任務隊列
...
}
}