本文主要對比Muduo多線程模型方案8 和方案9 。
方案8:reactor + thread pool ,有一個線程來充當reactor 接受連接分發事件,將要處理的事件分配給thread pool中的線程,由thread pool 來完成事件處理。實例代碼見:examples/sudoku/server_threadpool.cc
這里截取關鍵部分代碼進行說明。
class
SudokuServer
{
public
:
SudokuServer(EventLoop* loop,
const
InetAddress& listenAddr,
int
numThreads)
: loop_(loop),
server_(loop, listenAddr,
"SudokuServer"
),
numThreads_(numThreads),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection,
this
, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage,
this
, _1, _2, _3));
}
void
start()
{
LOG_INFO <<
"starting "
<< numThreads_ <<
" threads."
;
threadPool_.start(numThreads_);
// 注意這里,threadPool 的類型是: ThreadPool,且位置在start 里面
server_.start();
}
private
:
void
onConnection(
const
TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() <<
" -> "
<< conn->localAddress().toIpPort() <<
" is "
<< (conn->connected() ?
"UP"
:
"DOWN"
);
}
void
onMessage(
const
TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
...
if
(!processRequest(conn, request)) // 封裝計算任務執行方法
{
conn->send(
"Bad Request!\r\n"
);
conn->shutdown();
break
;
}
}
...
}
}
bool
processRequest(
const
TcpConnectionPtr& conn,
const
string& request)
{
...
if
(puzzle.size() == implicit_cast<size_t>(kCells))
{
threadPool_.run(boost::bind(&solve, conn, puzzle, id));
// 將計算任務轉移到 threadPool 線程
}
else
{
goodRequest =
false
;
}
return
goodRequest;
}
static
void
solve(
const
TcpConnectionPtr& conn,
const
string& puzzle,
const
string& id)
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle); // solveSudou 是一個pure function, 是可重入的
if
(id.empty())
{
conn->send(result+
"\r\n"
);
}
else
{
conn->send(id+
":"
+result+
"\r\n"
);
}
}
EventLoop* loop_;
TcpServer server_;
ThreadPool threadPool_;
// 注意類型,方案8, reactor + threadpool
int
numThreads_;
Timestamp startTime_;
};
void
ThreadPool::start(
int
numThreads) // 創建 thread pool,具體thread 調度這里暫時不分析
{
assert(threads_.empty());
running_ =
true
;
threads_.reserve(numThreads);
for
(
int
i = 0; i < numThreads; ++i)
{
char
id[32];
snprintf(id,
sizeof
id,
"%d"
, i);
threads_.push_back(
new
muduo::Thread(
boost::bind(&ThreadPool::runInThread,
this
), name_+id));
threads_[i].start();
}
}
方案9:main-reactor + subreactors, one loop per thread, 有一個主線程來扮演main-reactor 專門語句 accept 連接,其它線程負責讀寫文件描述符(socket)
class
SudokuServer
{
public
:
SudokuServer(EventLoop* loop,
const
InetAddress& listenAddr,
int
numThreads)
: loop_(loop),
server_(loop, listenAddr,
"SudokuServer"
),
numThreads_(numThreads),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection,
this
, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage,
this
, _1, _2, _3));
server_.setThreadNum(numThreads);
// 設置 EventLoopThreadPool里面的thread數量
}
void
start()
{
LOG_INFO <<
"starting "
<< numThreads_ <<
" threads."
;
server_.start();
}
private
:
void
onConnection(
const
TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() <<
" -> "
<< conn->localAddress().toIpPort() <<
" is "
<< (conn->connected() ?
"UP"
:
"DOWN"
);
}
void
onMessage(
const
TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
...
if
(!processRequest(conn, request))
//准備計算
{
conn->send(
"Bad Request!\r\n"
);
conn->shutdown();
break
;
}
...
}
}
bool
processRequest(
const
TcpConnectionPtr& conn,
const
string& request)
{
...
if
(puzzle.size() == implicit_cast<size_t>(kCells))
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle);
// 計算在當前線程完成
if
(id.empty())
{
conn->send(result+
"\r\n"
);
}
...
}
// 注意這里沒有類型為ThreadPool的 threadPool_成員,整個類使用Muduo默認線程模型的EventLoopThreadPool,TcpServer 聚合了EventLoopThreadPool
EventLoop* loop_;
TcpServer server_;
int
numThreads_;
Timestamp startTime_;
};
void
TcpServer::setThreadNum(
int
numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
// 設置了 EventLoopThreadPool 里面的線程個數,為后面的threadPool_->start()服務
}
void
TcpServer::start()
{
if
(!started_)
{
started_ =
true
;
threadPool_->start(threadInitCallback_);
// TcpServer 中的 threadPool 類型是 EventLoopThreadPool
}
if
(!acceptor_->listenning())
{
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
void
EventLoopThreadPool::start(
const
ThreadInitCallback& cb) // 開啟線程的方式是使用EventLoopThread,這個類將EventLoop 和 Thread 封裝在一起實現 one loop per thread
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ =
true
;
for
(
int
i = 0; i < numThreads_; ++i)
{
EventLoopThread* t =
new
EventLoopThread(cb); // 設置線程的 callback
threads_.push_back(t);
loops_.push_back(t->startLoop()); // 保存loop方便管理和分配任務,任務分配其實是通過EventLoop::runInLoop() 來進行的
}
if
(numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}
總結一下,這里所謂的Reactor就是持有Poller的結構(稍微有點狹隘,這里先就這樣理解),Poller負責事件監聽和分發。持有EventLoop的結構就持有Poller。
對於方案8只有一個類持有EventLoop,也就是只創建了一個EventLoop,這個Loop就是reactor,其它的Thread 是通過ThreadPool來實現的,因此只有reactor所在的線程來完成I/O,其它線程用於完成計算任務,所以說這個模型適合於計算密集型而不是I/O密集型。
對於方案9,存在多個Reactor,其中main reactor 持有Acceptor,專門用於監聽三個半事件中的連接建立,消息到達和連接斷開以及消息發送事件都讓sub reactor來完成。由於main reactor 只關心連接建立事件,能夠適應高並發的IO請求,多個subreactor的存在也能兼顧I/O與計算,因此被認為是一個比較好的方案。
后面還會深入學習Muduo網絡庫相關的內容,包括Reactor結構的簡化,線程池的實現,現代C++的編寫方式,使用C++11進行重寫等。現在看來C++11 thread library 提供的接口基本可以替換 posix thread library,雖然底層也許是通過posix thread實現的,畢竟Linux內核針對NPTL進行過修改。C++11 提供了 thread_local 來描述 線程局部存儲,但是沒有pthread_key_create() 提供 destructor那樣的功能,或者遇到需要使用TLS的地方轉過來使用posix 提供的接口。
Muduo 多線程 線程池 reactor