使用C++11封裝線程池ThreadPool


讀本文之前,請務必閱讀:

使用C++11的function/bind組件封裝Thread以及回調函數的使用

Linux組件封裝(五)一個生產者消費者問題示例

 

線程池本質上是一個生產者消費者模型,所以請熟悉這篇文章:Linux組件封裝(五)一個生產者消費者問題示例

在ThreadPool中,物品為計算任務,消費者為pool內的線程,而生產者則是調用線程池的每個函數。

搞清了這一點,我們很容易就需要得出,ThreadPool需要一把互斥鎖和兩個同步變量,實現同步與互斥

存儲任務,當然需要一個任務隊列。

除此之外,我們還需要一系列的Thread,因為Thread無法復制,所以我們使用unique_ptr作為一個中間層

所以Thread的數據變量如下:

class ThreadPool : boost::noncopyable
{
public:
    typedef std::function<void ()> Task;

    ThreadPool(size_t queueSize, size_t threadsNum);
    ~ThreadPool();

    void start();
    void stop();

    void addTask(Task task); //C++11
    Task getTask();

    bool isStarted() const { return isStarted_; }

    void runInThread();

private:
    mutable MutexLock mutex_;
    Condition empty_;
    Condition full_;

    size_t queueSize_;
    std::queue<Task> queue_;

    const size_t threadsNum_;
    std::vector<std::unique_ptr<Thread> > threads_;
    bool isStarted_;
};

顯然,我們使用了function,作為任務隊列的任務元素。

 

構造函數的實現較簡單,不過,之前務必注意元素的聲明順序與初始化列表的順序相一致。

ThreadPool::ThreadPool(size_t queueSize, size_t threadsNum)
: empty_(mutex_),
  full_(mutex_),
  queueSize_(queueSize),
  threadsNum_(threadsNum),
  isStarted_(false)
{

}

添加和取走任務是生產者消費者模型最核心的部分,但是套路較為固定,如下:

void ThreadPool::addTask(Task task)
{
    MutexLockGuard lock(mutex_);
    while(queue_.size() >= queueSize_)
        empty_.wait();
    queue_.push(std::move(task));
    full_.notify();
}


ThreadPool::Task ThreadPool::getTask()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty())
        full_.wait();
    Task task = queue_.front();
    queue_.pop();
    empty_.notify();
    return task;
}

注意我們的addTask使用了C++11的move語義,在傳入右值時,可以提高性能。

還有一些老生常談的問題,例如:

wait前加鎖

使用while循環判斷wait條件(為什么?)

要想啟動線程,需要給Thread提供一個回調函數,編寫如下:

void ThreadPool::runInThread()
{
    while(1)
    {
        Task task(getTask());
        if(task)
            task();
    }
}

就是不停的取走任務,然后執行。

OK,有了線程的回調函數,那么我們可以編寫start函數。

void ThreadPool::start()
{
    isStarted_ = true;
    //std::vector<std::unique<Thread> >
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_.push_back(
            std::unique_ptr<Thread>(
                new Thread(
                    std::bind(&ThreadPool::runInThread, this))));
    }
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_[ix]->start();
    }

}

這里較難理解的是線程的創建,Thread內存放的是std::unique_ptr<Thread>,而ptr的創建需要使用new動態創建Thread,Thread則需要在創建時,傳入回調函數,我們采用bind適配runInThread的參數值。

這里我們采用C++11的unique_ptr,成功實現vector無法存儲Thread(為什么?)的問題

 

我們的第一個版本已經編寫完畢了。

 

添加stop功能

 

剛才的ThreadPool只能啟動,無法stop,我們從幾個方面着手,利用bool變量isStarted_,實現正確退出。

改動的有以下幾點:

首先是Thread的回調函數不再是一個死循環,而是:

void ThreadPool::runInThread()
{
    while(isStarted_)
    {
        Task task(getTask());
        if(task)
            task();
    }
}

然后addTask和getTask,在while循環判斷時,加入了bool變量:

void ThreadPool::addTask(Task task)
{
    MutexLockGuard lock(mutex_);
    while(queue_.size() >= queueSize_ && isStarted_)
        empty_.wait();

    if(!isStarted_)
        return;

    queue_.push(std::move(task));
    full_.notify();
}


ThreadPool::Task ThreadPool::getTask()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty() && isStarted_)
        full_.wait();

    if(!isStarted_) //線程池關閉
        return Task(); //空任務

    assert(!queue_.empty());
    Task task = queue_.front();
    queue_.pop();
    empty_.notify();
    return task;
}

這里注意,退出while循環后,需要再判斷一次bool變量,因為未必是條件滿足了,可能是線程池需要退出,調整了isStarted變量。

最后一個關鍵是我們的stop函數:

 

void ThreadPool::stop()
{
    if(isStarted_ == false)
        return;

    {
        MutexLockGuard lock(mutex_);
        isStarted_ = false;
        //清空任務
        while(!queue_.empty()) 
            queue_.pop();
    }
    full_.notifyAll(); //激活所有的線程
    empty_.notifyAll();
    
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_[ix]->join();
    }
    threads_.clear();
}

這里有幾個關鍵:

先將bool設置為false,然后調用notifyAll,激活所有等待的線程(為什么)。

 

最后我們總結下ThreadPool關閉的流程

1.isStarted設置為false

2.加鎖,清空隊列

3.發信號激活所有線程

4.正在運行的Thread,執行到下一次循環,退出

5.正在等待的線程被激活,然后while判斷為false,執行到下一句,檢查bool值,然后退出。

6.主線程依次join每個線程。

7.退出。

 

最后補充下析構函數的實現:

ThreadPool::~ThreadPool()
{
    if(isStarted_)
        stop();
}

 

完畢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM