muduo線程池ThreadPool,采用的是固定線程數目的線程池方案。
線程池模型
模型圖如下:
這個是通用線程池,雙端隊列存放的是多個可調用對象(即用戶任務),而非函數指針,因此可以通過std::bind配接器傳參。雙端隊列queue_,有時也稱為工作隊列。
其工作原理:首先創建並啟動一組線程,稱為線程池threads_,由用戶指定其大小maxQueueSize_,每個元素對對應一個線程。每個線程函數都是一樣的,在其中會運行一個loop循環:從雙端隊列取出一個任務對象task,如果非空,就執行之,如此往復。
當有一個用戶線程想要通過線程池運行一個用戶任務時,就可以將用戶任務函數及參數封裝成一個可調用對象Task f,然后通過線程池接口,將f加入雙端隊列末尾。當線程池有線程空閑時(未執行用戶任務),就會從雙端隊列頭部取出一個Task對象task,然后執行之。
[======]
線程池的組成
線程池主要由以下幾個部分組成:
1)工作隊列queue_,用雙端隊列實現,能從尾部加入用戶任務對應的可調用對象;
2)用戶任務Task f,封裝了用戶任務,包含任務函數和參數;
3)線程組threads_,用於管理工作的線程數組;
4)工作線程,執行回調函數。
[======]
ThreadPool接口
ThreadPool提供以下public接口,供用戶使用:
class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;
explicit ThreadPool(const string& nameArg = string("ThreadPool")); // 構造函數, 初始化各數據成員
~ThreadPool();
// Must be called before start().
void setMaxQueueSize(int maxSize); // 設置工作隊列最大大小
void setThreadInitCallback(const Task& cb); // 設置線程初始化完成后回調函數
void start(int numThreads); // 指定創建線程個數, 啟動線程池中的所有線程
void stop(); // 停止線程池各線程
const std::string& name() const; // 當前線程池名稱
size_t queueSize() const; // 返回工作隊列大小
void run(Task f); // 運行用戶任務f
...
}
ThreadPool使用的線程Thread是Linux下NPTL線程庫(Pthreads)的封裝。ThreadPool將線程池的創建(構造)與啟動(start())分隔開來,並沒有直接在構造函數中啟動線程組;將停止(stop())與析構分隔開。
[======]
ThreadPool實現
數據成員
private:
...
mutable MutexLock mutex_; // 用於線程安全保護數據成員的互斥鎖
Condition notEmpty_ GUARDED_BY(mutex_); // 工作隊列非空條件(元素個數為0)
Condition notFull_ GUARDED_BY(mutex_); // 工作隊列非滿條件(元素個數未達到上限值)
std::string name_;
Task threadInitCallback_; // 線程初始化完成后的回調對象
std::vector<std::unique_ptr<muduo::Thread>> threads_; // 線程組指針
std::deque<Task> queue_ GUARDED_BY(mutex_); // 工作隊列
size_t maxQueueSize_; // 工作隊列最大大小
bool running_; // 線程(循環)是否運行標志
主要成員:工作隊列queue_,以及限制工作隊列大小的maxQueueSize_,線程組指針threads_, 線程運行標志,以及用於保護它們的互斥鎖mutex_,用於喚醒阻塞線程的條件變量notEmpty_, notFull_。
每個線程中都有一個loop,用running表示是否運行的標志,running只有在線程池停止的時候,才會被其他線程調用,可以用鎖來保護,其他時候,只會被同一個線程訪問,因此無需使用原子類型。
線程池的構造
ThreadPool::ThreadPool(const string &nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}
用戶可以指定線程池名稱,默認為"ThreadPool",便於調試跟蹤,日志診斷問題;值得注意的是工作隊列最大大小maxQueueSize_初值0,用戶可通過setMaxQueueSize修改其大小;
啟動與停止
用戶可通過start()啟動線程池,需要指定線程組中子線程數量,一旦創建成功后,各子線程就會投入運行,直到調用stop() 停止線程池運行。
由於 Thread已內含一個門閥,會讓調用線程等待新線程函數啟動,因此,這里不必再設置門閥等待線程池中線程的啟動。相反,如果有子線程運行所需要的數據,就需要在創建之前就准備好,比如running_,要在線程循環前就設置為true,否則子線程loop不會運行,而是直接退出。
void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(static_cast<size_t>(numThreads));
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof(id), "%d", i + 1);
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_ + id));
threads_[i]->start();
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();
notFull_.notifyAll();
}
for (auto& thr : threads_)
{
thr->join();
}
}
為什么start()中不加鎖,而stop()卻要加鎖?
因為start() 中,在子線程啟動后,並沒有對共享數據進行訪問,也就不存在競態條件。而stop()中,有對共享數據,如running_、notEmpty、notFull,因此,需要加鎖對這些數據進行保護。
這里,子線程退出方式是連接(join)線程,而非分離(detach)線程。個人認為兩種方案都可以,不過,join更容易在開發階段,排查問題,因為如果線程無法正常退出,調用線程會阻塞在join調用上。
往工作隊列加入任務對象
調用線程通過run(),向線程池的請求運行用戶指定的任務對象,該對象會被加入到工作隊列末尾,空閑子線程會自動從工作隊列中取任務對象執行。
void ThreadPool::run(Task task)
{
if (threads_.empty()) // 子線程數量為0
{
task();
}
else
{ // 子線程數量非0
MutexLockGuard lock(mutex_);
while (isFull() && running_)
{
notFull_.wait();
}
if (!running_) return;
assert(!isFull());
queue_.push_back(std::move(task));
notEmpty_.notify();
}
}
這里有2個特殊情況需要注意:
1)threads_為空,即沒有創建線程,可能是用戶指定線程數為0或非法數量(如負數),也有可能是進程創建的線程數達到系統限制,從而創建線程失敗。
不論什么原因,為避免進程崩潰,可以直接在當前線程中調用用戶任務。
2)采用的是isFull()成員來判斷工作隊列是否滿,而不是容器自帶的size()來判斷。
在isFull()內部,添加了一個互斥鎖斷言,確保isFull()的調用線程已經取得了mutex_鎖;否則,一旦有其他線程在未取得鎖的情況下,訪問應受鎖保護工作隊列成員,可能導致意外情況。
bool ThreadPool::isFull() const
{
mutex_.assertLocked();
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
從工作隊列取任務對象
用take從工作隊列頭部取出一個任務對象。通常是子線程空閑時調用,取出后,用來執行用戶任務。
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
notEmpty_.wait();
}
Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}
子線程loop
主要工作:從工作隊列取用戶任務,然后執行之。循環往復,直到線程池停止工作。
實現該工作的runInThread()是在用戶調用start()時,自動啟動的,不需要用戶自行調研。
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
Task task(take());
if (task)
{
task();
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in TheadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw ; // rethrow
}
}
這里,用了try-catch語句塊將代碼包裹起來,因為不知道用戶代碼會干些什么,很有可能會產生異常,因此需要捕獲異常。對於不確定的異常,可以rethrow(繼續上拋)。
另外,threadInitCallback_讓用戶有機會在線程初始化完成后,運行用戶任務之前,做一些事情。
[======]
ThreadPool的使用、測試
基本流程
// 創建線程池對象
ThreadPool pool("MyThreadPool");
// 設置工作隊列最大尺寸
pool.setMaxQueueSize(maxSize);
// 啟動線程池線程組, 指定線程數量
pool.start(threadNum);
// 運行用戶指定任務
pool.run(userTask); // userTask是用戶任務(可調用對象)
...
// 停止線程池(如有需要)
pool.stop();
截取自muduo的部分代碼,對ThreadPool進行測試:
// from muduo project
// muduo/base/tests/ThreadPool_test.cc
void print()
{
printf("tid=%d\n", muduo::CurrentThread::tid());
}
void printString(const std::string& str)
{
LOG_INFO << str;
usleep(100*1000);
}
void test(int maxSize)
{
LOG_WARN << "Test ThreadPool with max queue size = " << maxSize;
muduo::ThreadPool pool("MainThreadPool");
pool.setMaxQueueSize(maxSize);
pool.start(5);
LOG_WARN << "Adding";
pool.run(print);
pool.run(print);
for (int i = 0; i < 100; ++i) {
char buf[32];
snprintf(buf, sizeof(buf), "task %d", i);
pool.run(std::bind(printString, std::string(buf))); // 演示了如何向線程池加入含參的可調用對象
}
LOG_WARN << "Done";
// 演示了如何等待線程池運行完用戶任務
muduo::CountDownLatch latch(1);
pool.run(std::bind(&muduo::CountDownLatch::countDown, &latch));
latch.wait(); // wait for pool running latch.countDown()
pool.stop();
}
int main()
{
test(0);
test(1);
test(5);
test(10);
test(50);
return 0;
}
有2點問題:
1)run只接受一個參數,那么調用線程如何向線程池傳參?
解決方案有很多,一種是使用模板函數,為向工作隊列加用戶任務的run函數添加不定參數的重載版本;另一種,是使用std::bind配機器,向run傳遞一個新的可調用對象。muduo采用的是后者。
2)調用線程端的用戶,如何獲取用戶任務執行結果?
run()沒有任何返回值,用戶只能自行設計用戶任務函數及參數,通過參數狀態取得結果。
當然,還有另外的辦法就是,讓run()返回一個std::future<return_type>,通過future異步獲取結果。參考:http://www.purecpp.org/detail?id=2260
[======]
小結
1)線程池為避免頻繁創建、銷毀線程,提供一組子線程,能從工作隊列取任務、執行任務,而用戶可以向工作隊列加入任務,從而完成用戶任務。
[======]
參考
https://docs.microsoft.com/zh-cn/windows/win32/procthread/thread-pools
http://www.purecpp.org/detail?id=2260