C++ 線程池的實現


寫了一個簡易線程池,

原理簡單介紹下,就是設置一個任務隊列queue,用來放要執行的函數,還有一個線程數組vector,用來存放所有的線程。

線程創建以后就存放在相應的vector里,空閑的線程去queue里去取要執行的函數地址,在run函數中執行,假如一個線程的run函數執行好后,

發現隊列沒有任務可取,則阻塞該線程,通過conidtion_variable變量的wait()函數進行阻塞,等待新的任務被添加進來后,會有一個cond變量的notify_one()

函數來喚醒阻塞中的run函數。

現在放代碼吧!

線程池頭文件Thread_Pool.h

 

/********************************************
            線程池頭文件

        Author:十面埋伏但莫慌
        Time:2020/05/03

*********************************************/
#pragma once
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<thread>
#include<queue>
#include<mutex>
#include<atomic>
#include<vector>
#include<condition_variable>

typedef std::function<void()> Func;//定義線程執行函數類型,方便后面編碼使用。
//任務類
template<typename T>
class Task {
public:
    Task() {}
    ~Task() {}
    int push(T func)//添加任務;
    {
        try {
            tasks.emplace(func);
        }
        catch (std::exception e)
        {
            throw e;
            return -1;
        }
        return 1;
    }
    int getTaskNum()//獲得當前隊列中的任務數;
    {
        return tasks.size();
    }
    T pop()//取出待執行的任務;
    {
        T temp;
        if (tasks.empty())
            return temp;
        else
        {
            temp = tasks.front();
            tasks.pop();
            return temp;
        }
    }

private:
    
    std::queue<T> tasks;//任務隊列
};
//線程池類
class Thread_Pool {
public:
    Thread_Pool() :IsStart(false) {}
    ~Thread_Pool();
    int addTasks(Func&& tasks);//添加任務;
    void start();//開啟線程池;
    void stop();//關閉線程池;
    int getTaskNum();//獲得當前隊列中的任務數;
private:
    void run();//線程工作函數;

private:
    static const int maxThreadNum = 3;//最大線程數為3;
    std::mutex mx;//鎖;
    std::condition_variable cond;//條件量;
    std::vector<std::thread*> threads;//線程向量;
    bool IsStart;//原子變量,判斷線程池是否運行;
    Task<Func> tasks;//任務變量;
};
#endif

 

線程池實現文件 Thread_Pool.cpp

/********************************************
            線程池CPP文件

        Author:十面埋伏但莫慌
        Time:2020/05/03

*********************************************/
#include"Thread_Pool.h"
#include<iostream>



int Thread_Pool::addTasks(Func&& func)
{
    std::unique_lock<std::mutex> lock(mx);
    int ret = tasks.push(func);
    if (ret == 1)
    {
        std::cout << "添加任務成功" << std::endl;
        cond.notify_one();
    }
    
    return ret;
}
void Thread_Pool::start() {
    if (!IsStart) {    
        {
            std::unique_lock<std::mutex> lock(mx);
            IsStart = true;
        }

        threads.reserve(maxThreadNum);
        for (int i = 0; i < maxThreadNum; i++)
        {
            threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this)));            
        }
        
    }
}

void Thread_Pool::run()
{
    while (IsStart)
    {
        Func f;
        if (tasks.getTaskNum() == 0 && IsStart)
        {
            std::unique_lock<std::mutex> lock(mx);
            cond.wait(lock);
        }
        {
            std::unique_lock<std::mutex> lock(mx);
            f = tasks.pop();
        }

        if (f)
            f();

    }
}
int Thread_Pool::getTaskNum() {
    return tasks.getTaskNum();
}
void Thread_Pool::stop() {
        {
            std::unique_lock<std::mutex> lock(mx);
            IsStart = false;
        }    
        cond.notify_all();
        for (auto T : threads) {
            std::cout << "線程 " << T->get_id() << " 已停止。" << std::endl;
            T->join();
            if (T != nullptr)
            {
                delete T;
                T = nullptr;
            }
        }
    std::cout << "所有線程已停止。" << std::endl;
}
Thread_Pool::~Thread_Pool() {
    if (IsStart)
    {
        stop();
    }
}

測試用的main.cpp文件

 

#include<iostream>
#include"Thread_Pool.h"
using namespace std;
void string_out_one() {
    int n = 500;
    while (n--)
    {
        cout << "One!" << endl;
    }
}
void string_out_two() {
    int n = 500;
    while (n--)
        cout << "Two!" << endl;
}
void string_out_three() {
    int n = 500;
    while(n--)
        cout << "Three!" << endl;
}
void string_out_four() {
    int n = 500;
    while (n--)
        cout << "Four!" << endl;
}
int main() {
    clock_t start, finish;
    double totaltime;
    start = clock();
    {
        /*實驗對比代碼段1開始處*/
        Thread_Pool Pool;
        try {
            Pool.start();
        }
        catch (std::exception e)
        {
            throw e;
            cout << "線程池創建失敗。" << endl;
        }
        Pool.addTasks(move(string_out_one));
        Pool.addTasks(move(string_out_two));
        Pool.addTasks(move(string_out_three));
        Pool.addTasks(move(string_out_four));
    
        /*實驗對比代碼段1結束處*/
        /*實驗對比代碼段2開始處*/
        //string_out_one();
        //string_out_two();
        //string_out_three();
        //string_out_four();
        /*實驗對比代碼段2結束處*/
        finish = clock();
        totaltime = (double)(finish - start) / CLOCKS_PER_SEC;
        cout << "\n此程序的運行時間為" << totaltime << "秒!" << endl;
        getchar();
     }
    getchar();
    return 0;
}

總結下這個線程池,主要是要注意鎖的防止位置,放太多,就變成單線程,執行效率還不如單線程,放太少,可能會造成多線程之間的誤讀,放的位置不對,會造成死鎖。。。是真的麻煩。

兩個想改進的地方,

  一、希望可以在添加任務時確定任務的類型,而不是在Thread_Pool類中就確定task的類型,並且能支持傳入函數形參。

  二、程序的優化做的不是很好吧,雖然不知道但是覺得肯定還有優化空間。

若有不足之處歡迎指出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM