asio的異步與線程模型解析


前言

本文使用的asio1.76.0從github獲取,老版本中有一個很重要的結構叫做io_service,新版本中改成了io_context,下面主要通過c++14下的一個異步的例子分析boost.asio的線程模型,其代碼結構比較復雜,時間有限不能分析的很詳細,只是做大體結構分析

正文

asio的線程模型和異步的調用如下圖

程序以一個io_context為核心,其下有一個scheduler對象(調度器),scheduler下面放着一個(op_queue_)任務隊列,一個epoll_fd,執行io_context.run()的時候,開始一個循環去競爭消費任務隊列;任務隊列有兩種任務,一種是epoll任務,就是去做一次epoll_wait,還有一種任務是執行回調用函數;當使用一個異步調用的時候,本質就是往reactor中添加一個fd的event,將其回調保存進去 event._data中,當執行epoll_wait()監聽到該event事件后,取出該fd對應的callback放入到任務隊列(op_queue),由不同的線程去競爭消費;

拿官方的例子簡單分析這個流程,該例子來源於 asio/src/examples/cpp14/echo/async_tcp_echo_server.cpp

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>

using asio::ip::tcp;

class session
  : public std::enable_shared_from_this<session>
{
public:
  session(tcp::socket socket)
    : socket_(std::move(socket))
  {
  }

  void start()
  {
    do_read();
  }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(asio::buffer(data_, max_length),
        [this, self](std::error_code ec, std::size_t length)
        {
          if (!ec)
          {
            do_write(length);
          }
        });
  }

  void do_write(std::size_t length)
  {
    auto self(shared_from_this());
    asio::async_write(socket_, asio::buffer(data_, length),
        [this, self](std::error_code ec, std::size_t /*length*/)
        {
          if (!ec)
          {
            do_read();
          }
        });
  }

  tcp::socket socket_;
  enum { max_length = 1024 };
  char data_[max_length];
};

class server
{
public:
  server(asio::io_context& io_context, short port)
    : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
      socket_(io_context)
  {
    do_accept();
  }

private:
  void do_accept()
  {
    acceptor_.async_accept(socket_,
        [this](std::error_code ec)
        {
          if (!ec)
          {
            std::make_shared<session>(std::move(socket_))->start();
          }

          do_accept();
        });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_echo_server <port>\n";
      return 1;
    }

    asio::io_context io_context;

    server s(io_context, std::atoi(argv[1]));

    io_context.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

代碼很長,這里重點分析io_context.run();acceptor_.async_accept以分析其線程模型和異步,main()中創建了io_context 和 server, server中創建了acceptor和socket,然后執行一個異步async_accept,將一個lambda函數注冊進去,accept事件發生了,進行accept創建客戶端的socket后,調用該lambda回調去處理新accept的socket; 然后執行io_content.run(),做一個循環監聽

程序的調用圖如下所示(原圖)

如圖上標黃,主要分成3個比較重要的部分:

第一部分是acceptor的創建,放入epoll進行監聽, 在server的構造函數中開始構造acceptor

class server
{
public:
  server(asio::io_context& io_context, short port)
    : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)), //構造acceptor
      socket_(io_context)
  {
    do_accept();
  }

private:
	......

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

但其實是用了一個typedef,構造的是class basic_socket_acceptor<tcp>,在這里,除了看到了熟悉的bind和listen,重要的是open

  // /usr/local/include/asio/basic_socket_acceptor.hpp
  
  template <typename ExecutionContext>
  basic_socket_acceptor(ExecutionContext& context,
      const endpoint_type& endpoint, bool reuse_addr = true,
      typename constraint<
        is_convertible<ExecutionContext&, execution_context&>::value
      >::type = 0)
    : impl_(0, 0, context)
  {
    asio::error_code ec;
    const protocol_type protocol = endpoint.protocol();
    impl_.get_service().open(impl_.get_implementation(), protocol, ec); //open
    asio::detail::throw_error(ec, "open");
    if (reuse_addr)
    {
      impl_.get_service().set_option(impl_.get_implementation(),
          socket_base::reuse_address(true), ec);
      asio::detail::throw_error(ec, "set_option");
    }
    impl_.get_service().bind(impl_.get_implementation(), endpoint, ec);// bind
    asio::detail::throw_error(ec, "bind");
    impl_.get_service().listen(impl_.get_implementation(),			   // listen
        socket_base::max_listen_connections, ec);
    asio::detail::throw_error(ec, "listen");
  }

open()do_open()的封裝,為了實現跨平台, 作用是創建一個socket,並為其添加相應epoll event

  // Open a new socket implementation.
  asio::error_code open(implementation_type& impl,
      const protocol_type& protocol, asio::error_code& ec)
  {
    if (!do_open(impl, protocol.family(),
          protocol.type(), protocol.protocol(), ec))
      impl.protocol_ = protocol;
    return ec;
  }

以下是do_open( ),socket_holder創建了server端的socket, register_descriptor( )將該socket注冊到了該io_context上的epoll fd上

asio::error_code reactive_socket_service_base::do_open(
    reactive_socket_service_base::base_implementation_type& impl,
    int af, int type, int protocol, asio::error_code& ec)
{
  if (is_open(impl))
  {
    ec = asio::error::already_open;
    return ec;
  }

  socket_holder sock(socket_ops::socket(af, type, protocol, ec)); //創建了socket
  if (sock.get() == invalid_socket)
    return ec;

  // 注冊到epoll上
  if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
  {
    ec = asio::error_code(err,
        asio::error::get_system_category());
    return ec;
  }

  impl.socket_ = sock.release();
  switch (type)
  {
  case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;
  case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;
  default: impl.state_ = 0; break;
  }
  ec = asio::error_code();
  return ec;
}

到此,一個完整的server的socket就創建完畢,並且進行bind和listen,並放入到epoll中


第二個重要的部分是async_accept()這個異步的運行

class server
{
	......
private:
  void do_accept()
  {
    acceptor_.async_accept(socket_,  //執行異步調用
        [this](std::error_code ec)
        {
          if (!ec)
          {
            std::make_shared<session>(std::move(socket_))->start();
          }

          do_accept();
        });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

使用第一部分創建好的acceptor進行async_accept()后,來到一個看起來很復雜的模板函數

  template <typename Protocol1, typename Executor1,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
        AcceptHandler ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  ASIO_INITFN_AUTO_RESULT_TYPE(AcceptHandler,
      void (asio::error_code))
  async_accept(basic_socket<Protocol1, Executor1>& peer,
      ASIO_MOVE_ARG(AcceptHandler) handler
        ASIO_DEFAULT_COMPLETION_TOKEN(executor_type),
      typename constraint<
        is_convertible<Protocol, Protocol1>::value
      >::type = 0)
  {
    return async_initiate<AcceptHandler, void (asio::error_code)>(
        initiate_async_accept(this), handler,
        &peer, static_cast<endpoint_type*>(0));
  }

不用太在意頭,看看async_initiate()做了啥

template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE Signature,
    typename Initiation, typename... Args>
inline typename constraint<
    !detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
    ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
    ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
    ASIO_MOVE_ARG(Args)... args)
{
  async_completion<CompletionToken, Signature> completion(token);

  ASIO_MOVE_CAST(Initiation)(initiation)(
      ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
        Signature))(completion.completion_handler),
      ASIO_MOVE_CAST(Args)(args)...);

  return completion.result.get();
}

來到了一個更加復雜的模板函數,但是函數體只有兩三句話,發現其調用的時候模板類Initiation的operator( ),這個模板類是class initiate_async_accept, 其對應的operator( )如下

    template <typename AcceptHandler, typename Protocol1, typename Executor1>
    void operator()(ASIO_MOVE_ARG(AcceptHandler) handler,
        basic_socket<Protocol1, Executor1>* peer,
        endpoint_type* peer_endpoint) const
    {
      // If you get an error on the following line it means that your handler
      // does not meet the documented type requirements for a AcceptHandler.
      ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;

      detail::non_const_lvalue<AcceptHandler> handler2(handler);
      self_->impl_.get_service().async_accept(
          self_->impl_.get_implementation(), *peer, peer_endpoint,
          handler2.value, self_->impl_.get_executor());
    }

核心就一句self_->impl_.get_service().async_accept(...), 這個impl_.get_service()獲取到的是class reactive_socket_service,在basic_socket_acceptor中可以看到

class basic_socket_acceptor
  : public socket_base
{
...
detail::io_object_impl<detail::reactive_socket_service<Protocol>, Executor> impl_;
}

為此,找到了async_accept(..)的實現如下,看起來很長,但重點在最后的start_accept_op()

  // Start an asynchronous accept. The peer and peer_endpoint objects must be
  // valid until the accept's handler is invoked.
  template <typename Socket, typename Handler, typename IoExecutor>
  void async_accept(implementation_type& impl, Socket& peer,
      endpoint_type* peer_endpoint, Handler& handler, const IoExecutor& io_ex)
  {
    bool is_continuation =
      asio_handler_cont_helpers::is_continuation(handler);

    // Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_accept_op<Socket, Protocol, Handler, IoExecutor> op;
    typename op::ptr p = { asio::detail::addressof(handler),
      op::ptr::allocate(handler), 0 };
    p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
        peer, impl.protocol_, peer_endpoint, handler, io_ex);

    ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
          &impl, impl.socket_, "async_accept"));

    start_accept_op(impl, p.p, is_continuation, peer.is_open()); // 開始accept 的操作
    p.v = p.p = 0;
  }

start_accept_op()調用了start_op()

void reactive_socket_service_base::start_accept_op(
    reactive_socket_service_base::base_implementation_type& impl,
    reactor_op* op, bool is_continuation, bool peer_is_open)
{
  if (!peer_is_open)
    start_op(impl, reactor::read_op, op, is_continuation, true, false);
  else
  {
    op->ec_ = asio::error::already_open;
    reactor_.post_immediate_completion(op, is_continuation);
  }
}

start_op()中調用的是reactor.start_op()

void reactive_socket_service_base::start_op(
    reactive_socket_service_base::base_implementation_type& impl,
    int op_type, reactor_op* op, bool is_continuation,
    bool is_non_blocking, bool noop)
{
  if (!noop)
  {
    if ((impl.state_ & socket_ops::non_blocking)
        || socket_ops::set_internal_non_blocking(
          impl.socket_, impl.state_, true, op->ec_))
    {
      reactor_.start_op(op_type, impl.socket_,
          impl.reactor_data_, op, is_continuation, is_non_blocking); //調用reactor的start_op()
      return;
    }
  }

  reactor_.post_immediate_completion(op, is_continuation);
}

reacttor.start_op()很長,截取重點如下,其會根據傳入的op_type(read_op = 0, write_op = 1,connect_op = 1, except_op = 2) 做一些event的監聽事件的epoll_mod,然后把operation(就是傳入的回調)放入當前的描述符的op_type下標對應的數組中,用於觸發的時候去運行

void epoll_reactor::start_op(int op_type, socket_type descriptor,
    epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
    bool is_continuation, bool allow_speculative)
{
 
	...
  if (op_type == write_op)
      {
        if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
        {
          epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
          {
            descriptor_data->registered_events_ |= ev.events;
          }
          else
          {
            op->ec_ = asio::error_code(errno,
                asio::error::get_system_category());
            scheduler_.post_immediate_completion(op, is_continuation);
            return;
          }
        }
      }
  ...

  descriptor_data->op_queue_[op_type].push(op);
  scheduler_.work_started();
}

這一小節整體看下來,差不多可以得到這樣的關系 acceptor->reacotr_socket_service->reactor


第三部分是io_context.run()的時候如何進行epoll( )並處理事件的

從main開始進行io_context.run( )

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_echo_server <port>\n";
      return 1;
    }

    asio::io_context io_context;

    server s(io_context, std::atoi(argv[1]));

    io_context.run(); //開始循環處理
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

實指執行了impl_.run()

io_context::count_type io_context::run()
{
  asio::error_code ec;
  count_type s = impl_.run(ec);
  asio::detail::throw_error(ec);
  return s;
}	

其實指運行的是scheduler::run( ),看到其中有一個for循環運行do_run_one( )處理

std::size_t scheduler::run(asio::error_code& ec)
{
  ec = asio::error_code();
  if (outstanding_work_ == 0)
  {
    stop();
    return 0;
  }

  thread_info this_thread;
  this_thread.private_outstanding_work = 0;
  thread_call_stack::context ctx(this, this_thread);

  mutex::scoped_lock lock(mutex_);

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock()) //for循環處理
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;
}

do_run_one( )比較長,但是很好理解,其從任務隊列獲取任務,然后判斷是否為task,如果是則說明需要做一次epoll對去接收fd的事件回來進行處理,如果不是,則是回調任務,把任務對應的回調函數運行即可;

其次,可以看到當判斷任務有多個的時候會執行wakeup_event_.unlock_and_signal_one(lock);去喚醒別的線程也來消費該任務隊列,鎖的管理上,在上面的for循環上,一直都加有鎖,而下面真正獲取了任務去執行的時候就會把鎖給放掉,同時用task_cleanup這個RAII的類,在處理完任務之后把鎖給加回來;

std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
    scheduler::thread_info& this_thread,
    const asio::error_code& ec)
{
  while (!stopped_)
  {
    if (!op_queue_.empty())
    {
      // Prepare to execute first handler from queue.
      operation* o = op_queue_.front(); //從任務隊列中獲取任務
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_) // epoll任務的話, 進行一次epoll,
      {
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);//喚醒其它線程
        else
          lock.unlock();

        task_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Run the task. May throw an exception. Only block if the operation
        // queue is empty and we're not polling, otherwise we want to return
        // as soon as possible.
        task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
      }
      else //回調的話,進行回調
      {
        std::size_t task_result = o->task_result_;

        if (more_handlers && !one_thread_)
          wake_one_thread_and_unlock(lock);
        else
          lock.unlock();

        // Ensure the count of outstanding work is decremented on block exit.
        work_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Complete the operation. May throw an exception. Deletes the object.
        o->complete(this, ec, task_result); //調用回調函數
        this_thread.rethrow_pending_exception();

        return 1;
      }
    }
    else
    {
      wakeup_event_.clear(lock);
      wakeup_event_.wait(lock);
    }
  }

  return 0;
}

這里細看一下task->run( )進行epoll時是怎么收任務的, 將epoll返回的event中的事件保存下來以傳給回調函數處理事件(上面調用回調函數時候傳遞的task_result),把這個描述符數據(包含了回調和結果)放入任務隊列;這里有一個疑惑,epoll_reactor::run()這個函數是沒有做加鎖操作的,可epoll fd是共享的,存在多個線程同時epoll( )的問題需要做同步,不知道其怎么實現的

void epoll_reactor::run(long usec, op_queue<operation>& ops)
{

  // Block on the epoll descriptor.
  epoll_event events[128];
  int num_events = epoll_wait(epoll_fd_, events, 128, timeout);


  // Dispatch the waiting events.
  for (int i = 0; i < num_events; ++i)
  {
		...
		void* ptr = events[i].data.ptr;
		descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
        descriptor_data->set_ready_events(events[i].events); //保存事件
        ops.push(descriptor_data);	// 放入任務隊列
        ...
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM