muduo學習筆記(六) 多線程的TcpServer


@


前言

前面文章介紹了muduo網絡庫的單線程設計方式,即一個EventLoop 處理所有的事件,包括鏈接的建立、IO、計算、以及鏈接的銷毀,本文介紹下muduo中的多線程設計方法。

多線程TcpServer

EventLoopThreadPool

多線程的muduo::TcpServer,主要通過添加一個EventLoopThreadPool 事件循環線程池實現,新建TcpConnection時從event loop pool里挑選一個loop給TcpConnection用。 也就是說多線程TcpServer自己的EventLoop只用來接受新連接, 而新連接會用其他EventLoop來執行IO。 (單線程TcpServer的EventLoop是與TcpConnection共享的。)

EventLoopThreadPooll 按one loop per thread的思想實現多線程TcpServer, 此時主線程循環只負責TCP鏈接的建立,及任務的分配,需要讓哪個線程干活, 就把timer或IO(如TCP連接) 注冊到那個線程的循環里即可;對實時性有要求的connection可以單獨用一個線程; 數據量大的connection可以獨占一個線程;並把數據處理任務分攤到另幾個計算線程中(用線程池);其他次要的輔助性connections共享一個線程。

線程池設計模式

池是一種設計模式,線程是是一種資源,線程池設計模式通常是事先申請一定的資源,當需要使用時,去資源池中申請資源,用完后再放回資源池中。EventLoopThreadPool 正是借鑒了這種設計模式,雖然使用的線程並不會放回資源池中,但是本身的一個EventLoop即一個Reactor,本身就具備等待機制了。

muduo中的使用

TcpServer每次新建一條TcpConnection就會通過EventLoopThreadPool::getNextLoop()方法來取一個EventLoop, 目前的getNextLoop()只是循環的從池中取一條loop,如果提供給每條TcpConncetion的是均等的服務,那么這樣就能很均勻的分配系統的資源了。

TcpServer的工作方式取決於EventLoopThreadPool中線程的創建數量。
0 意味着所有的I/O 都在TcpServer的主事件循環中,不會創建新的線程。
1 意味着所有的 I/O 在另一個線程中 ,TcpServer的主線程只負責建立連接。
N 意味着新的連接會被循環的分配到N條線程中工作。

連接的建立、消息、銷毀

Alt text

on_connection

1、對於新的連接new connection 會在 TcpServer的Loop 中由Acceptor建立,

void Acceptor::handleRead()
{
  LOG_TRACE << "Acceptor::handleRead()";
  p_loop->assertInLoopThread();
  InetAddress peerAddr;
  int connfd = m_acceptSocket.accept(&peerAddr);

2、之后從loop_thread_pool中取一個loop將新的connection注冊上去。

void TcpServer::newConnetion(int sockfd, const InetAddress& peerAddr)
{
	InetAddress localAddr(sockets::getLocalAddr(sockfd));
    EventLoop* loop;
    loop = ex_event_loop_thread_pool->getNextLoop();
    TcpConnectionPtr conn(new TcpConnection(loop,
               connName, sockfd, localAddr, peerAddr));
    conn->setConnectionCallBack(m_connectionCallBack);
    conn->setMessageCallBack(m_messageCallBack);
    conn->setCloseCallBack(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
    loop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));

on_message

1、message將在connection自己所處的loop中處理,無需多言。

on_close

1、連接的銷毀稍微復雜一點,connection會在自己的loop中調用Tcp:Server::removeConnection,我們需要將他移動到TcpServer的Loop線程中先解除TcpServer對connection的使用,然后在回到connection自己的loop中銷毀連接connectDestroyed()。

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  p_loop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  p_loop->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << m_name
           << "] - connection " << conn->name();
  m_closeCallBack(conn);
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
}

簡單透傳服務實現

利用muduo多線程TcpServer 實現一個簡單的透傳服務,從connection中收到的消息直接轉發給所有的其他connection。

//ws_tcp_server.hpp
#pragma once
#include <async_logging>
#include <muduo_server>

namespace ws {

extern std::unique_ptr<muduo::EventLoop> active_event_loop;

class tcp_server{
public:
  //! ctor
  tcp_server(void);
  //! dtor
  ~tcp_server(void) = default;

  //! copy ctor
  tcp_server(const tcp_server&) = delete;
  //! assignment operator
  tcp_server& operator=(const tcp_server&) = delete;

public:

  //! listen_and_serve
  //! start server and listened on port
  void listen_and_serve(std::size_t port);

private:

  void on_connection(const muduo::TcpConnectionPtr&);

  void on_message(const muduo::TcpConnectionPtr& , muduo::Buffer*, ssize_t );

  void on_close(const muduo::TcpConnectionPtr&);
private:

  //! 
  //typedef std::vector<muduo::TcpConnectionPtr> connections_t;

  //! 
  typedef std::map<std::string, muduo::TcpConnectionPtr> connections_map_t;

private:

  //! master connection
  //muduo::TcpConnectionPtr m_master_connection;

  //! slave connection
  //connections_t m_connections;

  //! muduo tcp server
  std::unique_ptr<muduo::TcpServer> m_tcp_server;

  //! connections map 
  connections_map_t m_connections_map;

  std::size_t cnt_connetions;
};

}

//ws_tcp_server.cpp
#include "ws_tcp_server.hpp"

#include <assert.h>

static muduo::EventLoopThread static_event_loop;
std::unique_ptr<muduo::EventLoop> ws::active_event_loop = std::unique_ptr<muduo::EventLoop>(static_event_loop.startLoop());

using namespace ws;

tcp_server::tcp_server()
  :cnt_connetions(0)
{

}

void tcp_server::listen_and_serve(std::size_t port)
{
  assert(active_event_loop);

  muduo::ex_event_loop_thread_pool = std::unique_ptr<muduo::EventLoopThreadPool>(new muduo::EventLoopThreadPool(ws::active_event_loop.get(), "event_loop", 3));
  active_event_loop->runInLoop(std::bind(&muduo::EventLoopThreadPool::start, muduo::ex_event_loop_thread_pool.get()));

  m_tcp_server = std::unique_ptr<muduo::TcpServer>(new muduo::TcpServer(active_event_loop.get(), InetAddress(port)));

  m_tcp_server->setConnectionCallBack(std::bind(&tcp_server::on_connection, this, std::placeholders::_1));
  m_tcp_server->setMessageCallBack(std::bind(&tcp_server::on_message, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  m_tcp_server->setCloseCallBack(std::bind(&tcp_server::on_close, this, std::placeholders::_1));
  m_tcp_server->start();
}

void tcp_server::on_connection(const muduo::TcpConnectionPtr& conn)
{
  cnt_connetions++;
  std::string conn_name = "tcp connection " + std::to_string(cnt_connetions);
  conn->setConnectionName(conn_name);

  m_connections_map[conn_name] = std::move(conn);

}

void tcp_server::on_message(const muduo::TcpConnectionPtr& conn, muduo::Buffer* buf, ssize_t size)
{
  connections_map_t::iterator iter;
  for(iter = m_connections_map.begin(); iter != m_connections_map.end(); iter++)
  {
    if(iter->second != conn){
      LOG_DEBUG << "con " << conn->name() << " send to " << iter->second->name();
      iter->second->send(buf->peek(), size);
    }
  }

  buf->retrieve(size);

}

void tcp_server::on_close(const muduo::TcpConnectionPtr& conn)
{
  size_t n = m_connections_map.erase(conn->name());
  (void)n;
  assert(n == 1);
}

// main.cpp
#include <async_logging>
#include "ws_tcp_server.hpp"

int main()
{
  Logger::setLogLevel(Logger::DEBUG);

  ws::tcp_server tcp_server;
  tcp_server.listen_and_serve(8008);

  getchar();
}
../bin/ws_server 
2019-03-19 16:22:17.202112 [TRACE] [EventLoopThread.cpp:41] [startLoop] EventLoopThread::startLoop() wait()
2019-03-19 16:22:17.203199 [TRACE] [TimerQueue.cpp:20] [createTimerfd] createTimerfd() fd : 4
2019-03-19 16:22:17.203376 [TRACE] [Epoll.cpp:115] [updateChannel] fd= 4 events 3
2019-03-19 16:22:17.203549 [TRACE] [EventLoop.cpp:23] [createEventfd] createEventfd() fd : 5
2019-03-19 16:22:17.203711 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E72371B30 in thread 44057
2019-03-19 16:22:17.203872 [TRACE] [Epoll.cpp:115] [updateChannel] fd= 5 events 3
2019-03-19 16:22:17.204188 [TRACE] [EventLoopThread.cpp:65] [threadFunc] EventLoopThread::threadFunc() notify()
2019-03-19 16:22:17.204332 [TRACE] [EventLoop.cpp:74] [loop] EventLoop 0x7F1E72371B30 start loopig
2019-03-19 16:22:17.204477 [TRACE] [Epoll.cpp:72] [poll] Epoll::poll() maxConcurrencySize 2
2019-03-19 16:22:17.204794 [TRACE] [EventLoopThread.cpp:46] [startLoop] EventLoopThread::startLoop() wakeup
2019-03-19 16:22:17.210324 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E71B70B30 in thread 44058
2019-03-19 16:22:17.212173 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E7136FB30 in thread 44059
2019-03-19 16:22:17.212725 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E70B6EB30 in thread 44060
2019-03-19 16:22:22.399790 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #1] from 127.0.0.1:51400
2019-03-19 16:22:22.399833 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E71B70B30
2019-03-19 16:22:22.399842 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #1] at 0x7F1E6C0046F0 fd=17
2019-03-19 16:22:24.901612 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #2] from 127.0.0.1:51402
2019-03-19 16:22:24.901625 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E7136FB30
2019-03-19 16:22:24.901635 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #2] at 0x7F1E6C006A30 fd=18
2019-03-19 16:22:25.903035 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
2019-03-19 16:22:32.499161 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #3] from 127.0.0.1:51404
2019-03-19 16:22:32.499174 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E70B6EB30
2019-03-19 16:22:32.499183 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #3] at 0x7F1E6C008D80 fd=19
2019-03-19 16:22:33.499744 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 3 send to tcp connection 1
2019-03-19 16:22:33.499771 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 3 send to tcp connection 2
2019-03-19 16:22:35.986563 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
2019-03-19 16:22:35.986670 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 3
2019-03-19 16:22:36.982713 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
2019-03-19 16:22:36.982759 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 3
2019-03-19 16:22:38.244229 [INFO ] [TcpServer.cpp:90] TcpServer::removeConnectionInLoop [Serv ] - connection tcp connection 2
2019-03-19 16:22:38.244281 [DEBUG] [TcpConnection.cpp:36] [~TcpConnection] TcpConnection::dtor[tcp connection 2] at 0x7F1E6C006A30 fd=18 state=kDisConnected


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM