Linux編程之線程池的設計與實現(C++98)


假設服務器的硬件資源“充裕”,那么提高服務器性能的一個很直接的方法就是空間換時間,即“浪費”服務器的硬件資源,以換取其運行效率。提升服務器性能的一個重要方法就是采用“池”的思路,即對一組資源在服務器啟動之初就被完全創建好並初始化,這稱為靜態資源分配。當服務器進入正式運行階段,即開始處理客戶端請求時,如果它需要相關資源就可以直接從池中獲取,無需動態分配。很顯然,直接從池中取得所需要資源比動態分配資源的速度快得多,因為分配系統資源的系統調用都是很耗時的。當服務器處理完一個客戶端連接后,可以把相關資源放回池中,無須執行系統調用釋放資源。從最終效果來看,資源分配和回收的系統調用只發生在服務器的啟動和結束,這種“池”的方式避免了中間的任務處理過程對內核的頻繁訪問,提高了服務器的性能。我們常用的線程池和內存池都是基於以上“池”的優勢所設計出來的提升服務器性能的方法,今天打算以C++98設計一個基於Linux系統的簡單線程池。

為什么要采用線程池?

首先想一想,我們一般的服務器都是動態創建子線程來實現並發服務器的,比如每當有一個客戶端請求建立連接時我們就動態調用pthread_create去創建線程去處理該連接請求。這種模式有什么缺點呢?

  • 動態創建線程是比較費時的,這將到導致較慢的客戶響應。
  • 動態創建的子線程通常只用來為一個客戶服務,這將導致系統上產生大量的細微線程,線程切換也會耗費CPU時間。

所以我們為了進一步提升服務器性能,可以采取“池”的思路,把線程的創建放在程序的初始化階段一次完成,這就避免了動態創建線程導致服務器響應請求的性能下降。

線程池的設計思路

  1. 以單例模式設計線程池,保證線程池全劇唯一;
  2. 在獲取線程池實例進行線程池初始化:線程預先創建+任務隊列創建;
  3. 創建一個任務類,我們真實的任務會繼承該類,完成任務執行。

根據以上思路我們可以給出這么一個線程池類的框架:

class ThreadPool
{
private:
    std::queue<Task*> taskQueue;   //任務隊列
    bool isRunning;           //線程池運行標志
    pthread_t* pThreadSet;  //指向線程id集合的指針
    int threadsNum;         //線程數目
    pthread_mutex_t mutex;    //互斥鎖
    pthread_cond_t condition;   //條件變量

    //單例模式,保證全局線程池只有一個
    ThreadPool(int num=10);
    void createThreads();  //創建內存池
    void clearThreads();  //回收線程
    void clearQueue();  //清空任務隊列
    static void* threadFunc(void* arg);
    Task* takeTask();  //工作線程獲取任務

public:
    void addTask(Task* pTask);   //任務入隊
    static ThreadPool* createThreadPool(int num=10); //靜態方法,用於創建線程池實例
    ~ThreadPool();
    int getQueueSize(); //獲取任務隊列中的任務數目
    int getThreadlNum();  //獲取線程池中線程總數目
 
};

下面開始講解一些實現細節。

1.單例模式下的線程池的初始化

首先我們以餓漢單例模式來設計這個線程池,以保證該線程池全局唯一:

  1. 構造函數私有化
  2. 提供一個靜態函數來獲取線程池對象
//餓漢模式,線程安全
ThreadPool* ThreadPool::createThreadPool(int num)
{
    static ThreadPool* pThreadPoolInstance = new ThreadPool(num);
    return pThreadPoolInstance;
}

ThreadPool* pMyPool = ThreadPool::createThreadPool(5);

線程池對象初始化時我們需要做三件事:相關變量的初始化(線程池狀態、互斥鎖、條件變量等)+任務隊列的創建+線程預先創建

ThreadPool::ThreadPool(int num):threadsNum(num)
{
    printf("creating threads pool...\n");
    isRunning = true;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&condition, NULL);
    createThreads();
    printf("created threads pool successfully!\n");
}

線程池的數目根據對象創建時輸入的數目來創建,如果不指定數目,我們就是使用默認數目10個。

void ThreadPool::createThreads()
{
    pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);
    for(int i=0;i<threadsNum;i++)
    {
        pthread_create(&pThreadSet[i], NULL, threadFunc, this);
    }
}

2.任務添加和線程調度

對於每一個服務請求我們都可以看作是一個任務,一個任務來了我們就將它送進線程池中的任務隊列中,並通過條件變量的方式通知線程池中的空閑線程去拿任務去完成。那問題來了,這里的任務在編程的層面上看到底是什么?我們可以將任務看成是一個回調函數,將要執行的函數指針往任務隊列里面送就可以了,我們線程拿到這個指針后運行該函數就等於完成服務請求。基於以上的考慮,我們設計了一個單獨的抽象任務類,讓子類繼承。類里面有個純虛函數run(),用於執行相應操作。

考慮到回調函數需要傳參數進來,所以特意設置了個指針arg來存儲參數地址,到時候我們就可以根據該指針解析出傳入的函數實參是什么了。

任務基類

class Task
{
public:
    Task(void* a = NULL): arg(a)
    {

    }

    void SetArg(void* a)
    {
        arg = a;
    }

    virtual int run()=0;

protected:
    void* arg;

};

typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;



class MyTask: public Task
{
public:
    int run()
    {
        msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%s\n", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};

真正使用該類時就自己定義一個子類繼承Task類,並實現run()函數,並通過SetArg()方法去設置傳入的參數。比如可以這么用:

msg_t msg[10];
MyTask task_A[10];

//模擬生產者生產任務
for(int i=0;i<10;i++)
{
    msg[i].task_id = i;
    sprintf(buf,"qq_task_%d",i);
    msg[i].task_name = buf;
    task_A[i].SetArg(&msg[i]);
    pMyPool->addTask(&task_A[i]);
    sleep(1);
}

現在來到線程池設計中最難搞的地方:線程調度。一個任務來了,究竟怎么讓空閑線程去拿任務去做呢?我們又如何保證空閑的線程不斷地去拿任務呢?

抽象而言,這是一個生產者消費者的模型,系統不斷往任務隊列里送任務,我們通過互斥鎖和條件變量來控制任務的加入和獲取,線程每當空閑時就會去調用takeTask()去拿任務。如果隊列沒任務那么一些沒獲得互斥鎖的線程就會擁塞等待(因為沒鎖),獲得互斥鎖的那個線程會因為沒任務而擁塞等待。一旦有任務就會喚醒這個帶鎖線程拿走任務釋放互斥鎖。看看代碼層面是如何操作的:

加入一個任務

void ThreadPool::addTask(Task* pTask)
{
    pthread_mutex_lock(&mutex);
    taskQueue.push(pTask);
    printf("one task is put into queue! Current queue size is %lu\n",taskQueue.size());
    pthread_mutex_unlock(&mutex);
    pthread_cond_signal(&condition);
}

取走一個任務

Task* ThreadPool::takeTask()
{
    Task* pTask = NULL;
    while(!pTask)
    {
        pthread_mutex_lock(&mutex);
        //線程池運行正常但任務隊列為空,那就等待任務的到來
        while(taskQueue.empty() && isRunning)
        {
            pthread_cond_wait(&condition, &mutex);
        }

        if(!isRunning)
        {
            pthread_mutex_unlock(&mutex);
            break;
        }
        else if(taskQueue.empty())
        {
            pthread_mutex_unlock(&mutex);
            continue;
        }

        pTask = taskQueue.front();
        taskQueue.pop();
        pthread_mutex_unlock(&mutex);

    }

    return pTask;
}

線程中的回調函數。這里注意的是,如果取到的任務為空,我們認為是線程池關閉的信號(線程池銷毀時我們會在析構函數中調用pthread_cond_broadcast(&condition)來通知線程來拿任務,拿到的當然是空指針),我們退出該線程。

void* ThreadPool::threadFunc(void* arg)
{
    ThreadPool* p = (ThreadPool*)arg;
    while(p->isRunning)
    {
        Task* task = p->takeTask();
        //如果取到的任務為空,那么我們結束這個線程
        if(!task)
        {
            //printf("%lu thread will shutdown!\n", pthread_self());
            break;
        }

        printf("take one...\n");

        task->run();
    }
}

3.使用例子和測試

下面給出一個線程池的一個使用例子。可以看出,我首先定義了msg_t的結構體,這是因為我們的服務響應函數是帶參數的,所以我們定義了這個結構體並把其地址作為參數傳進線程池中去(通過SetArg方法)。然后我們也定義了一個任務類MyTask繼承於Task,並重寫了run方法。我們要執行的服務函數就可以寫在run函數之中。當需要往任務隊列投放任務時調用addTask()就可以了,然后線程池會自己安排任務的分發,外界無須關心。所以一個線程池執行任務的過程可以簡化為:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool

#include <stdio.h>
#include "thread_pool.h"
#include <string>
#include <stdlib.h>

typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;

class MyTask: public Task
{
public:
    int run()
    {
        msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%s\n", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};

int main()
{
    ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
    char buf[32] = {0};

    msg_t msg[10];
    MyTask task_A[10];

    //模擬生產者生產任務
    for(int i=0;i<10;i++)
    {
        msg[i].task_id = i;
        sprintf(buf,"qq_task_%d",i);
        msg[i].task_name = buf;
        task_A[i].SetArg(&msg[i]);
        pMyPool->addTask(&task_A[i]);
        sleep(1);
    }

    while(1)
    {
        //printf("there are still %d tasks need to process\n", pMyPool->getQueueSize());
        if (pMyPool->getQueueSize() == 0)
        {
            printf("Now I will exit from main\n");
            break;
        }

        sleep(1);
    }

    delete pMyPool;
    return 0;
}

程序具體運行的邏輯是,我們建立了一個5個線程大小的線程池,然后我們又生成了10個任務,往任務隊列里放。由於線程數小於任務數,所以當每個線程都拿到自己的任務時,任務隊列中還有5個任務待處理,然后有些線程處理完自己的任務了,又去隊列里取任務,直到所有任務被處理完了,循環結束,銷毀線程池,退出程序。

完整的線程池框架和測試例子在我的github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM