(原創)C++半同步半異步線程池


c++11 boost技術交流群:296561497,歡迎大家來交流技術。

線程池可以高效的處理任務,線程池中開啟多個線程,等待同步隊列中的任務到來,任務到來多個線程會搶着執行任務,當到來的任務太多,達到上限時需要等待片刻,任務上限保證內存不會溢出。線程池的效率和cpu核數相關,多核的話效率更高,線程數一般取cpu數量+2比較合適,否則線程過多,線程切換頻繁反而會導致效率降低。

線程池有兩個活動過程:1.外面不停的往線程池添加任務;2.線程池內部不停的取任務執行。活動圖如下:

線程池中的隊列是用的上一篇博文中的同步隊列。具體代碼:

#include<vector>
#include<thread>
#include<functional>
#include<memory>
#include <atomic>
#include"SyncQueue.hpp"

const int MaxTaskCount = 100;
class ThreadPool
{
public:
    using Task = std::function<void()>;
    ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
    {
        Start(numThreads);
    }

    ~ThreadPool(void)
    {
        //如果沒有停止時則主動停止線程池
        Stop();
    }

    void Stop()
    {
        std::call_once(m_flag, [this]{StopThreadGroup(); }); //保證多線程情況下只調用一次StopThreadGroup
    }

    void AddTask(Task&&task)
    {
        m_queue.Put(std::forward<Task>(task));
    }

    void AddTask(const Task& task)
    {
        m_queue.Put(task);
    }

private:
    void Start(int numThreads)
    {
        m_running = true;
        //創建線程組
        for (int i = 0; i <numThreads; ++i)
        {
            m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
        }
    }    

    void RunInThread()
    {
        while (m_running)
        {
            //取任務分別執行
            std::list<Task> list;
            m_queue.Take(list);

            for (auto& task : list)
            {
                if (!m_running)
                    return;

                task();
            }
        }
    }

    void StopThreadGroup()
    {
        m_queue.Stop(); //讓同步隊列中的線程停止
        m_running = false; //置為false,讓內部線程跳出循環並退出

        for (auto thread : m_threadgroup) //等待線程結束
        {
            if (thread)
                thread->join();
        }
        m_threadgroup.clear();
    }

    std::list<std::shared_ptr<std::thread>> m_threadgroup; //處理任務的線程組
    SyncQueue<Task> m_queue; //同步隊列     
    atomic_bool m_running; //是否停止的標志
    std::once_flag m_flag;
};

 上面的代碼中用到了同步隊列SyncQueue,它的實現在這里。測試代碼如下:

void TestThdPool()
{
    ThreadPool pool;bool runing = true;

    std::thread thd1([&pool,&runing]{
        while(runing)
        {
            cout<<"produce "<<this_thread::get_id()<< endl;

            pool.AddTask([]{
                std::cout <<"consume "<<this_thread::get_id()<< endl;
            });
        }
    });


    this_thread::sleep_for(std::chrono::seconds(10));
    runing = false;
    pool.Stop();
    
    thd1.join();
    getchar();
}

上面的測試代碼中,thd1是生產者線程,線程池內部會不斷消費生產者產生的任務。在需要的時候可以提前停止線程池,只要調用Stop函數就行了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM