假設服務器的硬件資源“充裕”,那么提高服務器性能的一個很直接的方法就是空間換時間,即“浪費”服務器的硬件資源,以換取其運行效率。提升服務器性能的一個重要方法就是采用“池”的思路,即對一組資源在服務器啟動之初就被完全創建好並初始化,這稱為靜態資源分配。當服務器進入正式運行階段,即開始處理客戶端請求時,如果它需要相關資源就可以直接從池中獲取,無需動態分配。很顯然,直接從池中取得所需要資源比動態分配資源的速度快得多,因為分配系統資源的系統調用都是很耗時的。當服務器處理完一個客戶端連接后,可以把相關資源放回池中,無須執行系統調用釋放資源。從最終效果來看,資源分配和回收的系統調用只發生在服務器的啟動和結束,這種“池”的方式避免了中間的任務處理過程對內核的頻繁訪問,提高了服務器的性能。我們常用的線程池和內存池都是基於以上“池”的優勢所設計出來的提升服務器性能的方法,今天打算以C++98設計一個基於Linux系統的簡單線程池。
為什么要采用線程池?
首先想一想,我們一般的服務器都是動態創建子線程來實現並發服務器的,比如每當有一個客戶端請求建立連接時我們就動態調用pthread_create去創建線程去處理該連接請求。這種模式有什么缺點呢?
- 動態創建線程是比較費時的,這將到導致較慢的客戶響應。
- 動態創建的子線程通常只用來為一個客戶服務,這將導致系統上產生大量的細微線程,線程切換也會耗費CPU時間。
所以我們為了進一步提升服務器性能,可以采取“池”的思路,把線程的創建放在程序的初始化階段一次完成,這就避免了動態創建線程導致服務器響應請求的性能下降。
線程池的設計思路
- 以單例模式設計線程池,保證線程池全劇唯一;
- 在獲取線程池實例進行線程池初始化:線程預先創建+任務隊列創建;
- 創建一個任務類,我們真實的任務會繼承該類,完成任務執行。
根據以上思路我們可以給出這么一個線程池類的框架:
class ThreadPool
{
private:
std::queue<Task*> taskQueue; //任務隊列
bool isRunning; //線程池運行標志
pthread_t* pThreadSet; //指向線程id集合的指針
int threadsNum; //線程數目
pthread_mutex_t mutex; //互斥鎖
pthread_cond_t condition; //條件變量
//單例模式,保證全局線程池只有一個
ThreadPool(int num=10);
void createThreads(); //創建內存池
void clearThreads(); //回收線程
void clearQueue(); //清空任務隊列
static void* threadFunc(void* arg);
Task* takeTask(); //工作線程獲取任務
public:
void addTask(Task* pTask); //任務入隊
static ThreadPool* createThreadPool(int num=10); //靜態方法,用於創建線程池實例
~ThreadPool();
int getQueueSize(); //獲取任務隊列中的任務數目
int getThreadlNum(); //獲取線程池中線程總數目
};
下面開始講解一些實現細節。
1.單例模式下的線程池的初始化
首先我們以餓漢單例模式來設計這個線程池,以保證該線程池全局唯一:
- 構造函數私有化
- 提供一個靜態函數來獲取線程池對象
//餓漢模式,線程安全
ThreadPool* ThreadPool::createThreadPool(int num)
{
static ThreadPool* pThreadPoolInstance = new ThreadPool(num);
return pThreadPoolInstance;
}
ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
線程池對象初始化時我們需要做三件事:相關變量的初始化(線程池狀態、互斥鎖、條件變量等)+任務隊列的創建+線程預先創建
ThreadPool::ThreadPool(int num):threadsNum(num)
{
printf("creating threads pool...\n");
isRunning = true;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condition, NULL);
createThreads();
printf("created threads pool successfully!\n");
}
線程池的數目根據對象創建時輸入的數目來創建,如果不指定數目,我們就是使用默認數目10個。
void ThreadPool::createThreads()
{
pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);
for(int i=0;i<threadsNum;i++)
{
pthread_create(&pThreadSet[i], NULL, threadFunc, this);
}
}
2.任務添加和線程調度
對於每一個服務請求我們都可以看作是一個任務,一個任務來了我們就將它送進線程池中的任務隊列中,並通過條件變量的方式通知線程池中的空閑線程去拿任務去完成。那問題來了,這里的任務在編程的層面上看到底是什么?我們可以將任務看成是一個回調函數,將要執行的函數指針往任務隊列里面送就可以了,我們線程拿到這個指針后運行該函數就等於完成服務請求。基於以上的考慮,我們設計了一個單獨的抽象任務類,讓子類繼承。類里面有個純虛函數run(),用於執行相應操作。
考慮到回調函數需要傳參數進來,所以特意設置了個指針arg來存儲參數地址,到時候我們就可以根據該指針解析出傳入的函數實參是什么了。
任務基類
class Task
{
public:
Task(void* a = NULL): arg(a)
{
}
void SetArg(void* a)
{
arg = a;
}
virtual int run()=0;
protected:
void* arg;
};
typedef struct
{
int task_id;
std::string task_name;
}msg_t;
class MyTask: public Task
{
public:
int run()
{
msg_t* msg = (msg_t*)arg;
printf("working thread[%lu] : task_id:%d task_name:%s\n", pthread_self(),
msg->task_id, msg->task_name.c_str());
sleep(10);
return 0;
}
};
真正使用該類時就自己定義一個子類繼承Task類,並實現run()函數,並通過SetArg()方法去設置傳入的參數。比如可以這么用:
msg_t msg[10];
MyTask task_A[10];
//模擬生產者生產任務
for(int i=0;i<10;i++)
{
msg[i].task_id = i;
sprintf(buf,"qq_task_%d",i);
msg[i].task_name = buf;
task_A[i].SetArg(&msg[i]);
pMyPool->addTask(&task_A[i]);
sleep(1);
}
現在來到線程池設計中最難搞的地方:線程調度。一個任務來了,究竟怎么讓空閑線程去拿任務去做呢?我們又如何保證空閑的線程不斷地去拿任務呢?
抽象而言,這是一個生產者消費者的模型,系統不斷往任務隊列里送任務,我們通過互斥鎖和條件變量來控制任務的加入和獲取,線程每當空閑時就會去調用takeTask()去拿任務。如果隊列沒任務那么一些沒獲得互斥鎖的線程就會擁塞等待(因為沒鎖),獲得互斥鎖的那個線程會因為沒任務而擁塞等待。一旦有任務就會喚醒這個帶鎖線程拿走任務釋放互斥鎖。看看代碼層面是如何操作的:
加入一個任務
void ThreadPool::addTask(Task* pTask)
{
pthread_mutex_lock(&mutex);
taskQueue.push(pTask);
printf("one task is put into queue! Current queue size is %lu\n",taskQueue.size());
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&condition);
}
取走一個任務
Task* ThreadPool::takeTask()
{
Task* pTask = NULL;
while(!pTask)
{
pthread_mutex_lock(&mutex);
//線程池運行正常但任務隊列為空,那就等待任務的到來
while(taskQueue.empty() && isRunning)
{
pthread_cond_wait(&condition, &mutex);
}
if(!isRunning)
{
pthread_mutex_unlock(&mutex);
break;
}
else if(taskQueue.empty())
{
pthread_mutex_unlock(&mutex);
continue;
}
pTask = taskQueue.front();
taskQueue.pop();
pthread_mutex_unlock(&mutex);
}
return pTask;
}
線程中的回調函數。這里注意的是,如果取到的任務為空,我們認為是線程池關閉的信號(線程池銷毀時我們會在析構函數中調用pthread_cond_broadcast(&condition)來通知線程來拿任務,拿到的當然是空指針),我們退出該線程。
void* ThreadPool::threadFunc(void* arg)
{
ThreadPool* p = (ThreadPool*)arg;
while(p->isRunning)
{
Task* task = p->takeTask();
//如果取到的任務為空,那么我們結束這個線程
if(!task)
{
//printf("%lu thread will shutdown!\n", pthread_self());
break;
}
printf("take one...\n");
task->run();
}
}
3.使用例子和測試
下面給出一個線程池的一個使用例子。可以看出,我首先定義了msg_t的結構體,這是因為我們的服務響應函數是帶參數的,所以我們定義了這個結構體並把其地址作為參數傳進線程池中去(通過SetArg方法)。然后我們也定義了一個任務類MyTask繼承於Task,並重寫了run方法。我們要執行的服務函數就可以寫在run函數之中。當需要往任務隊列投放任務時調用addTask()就可以了,然后線程池會自己安排任務的分發,外界無須關心。所以一個線程池執行任務的過程可以簡化為:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool
#include <stdio.h>
#include "thread_pool.h"
#include <string>
#include <stdlib.h>
typedef struct
{
int task_id;
std::string task_name;
}msg_t;
class MyTask: public Task
{
public:
int run()
{
msg_t* msg = (msg_t*)arg;
printf("working thread[%lu] : task_id:%d task_name:%s\n", pthread_self(),
msg->task_id, msg->task_name.c_str());
sleep(10);
return 0;
}
};
int main()
{
ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
char buf[32] = {0};
msg_t msg[10];
MyTask task_A[10];
//模擬生產者生產任務
for(int i=0;i<10;i++)
{
msg[i].task_id = i;
sprintf(buf,"qq_task_%d",i);
msg[i].task_name = buf;
task_A[i].SetArg(&msg[i]);
pMyPool->addTask(&task_A[i]);
sleep(1);
}
while(1)
{
//printf("there are still %d tasks need to process\n", pMyPool->getQueueSize());
if (pMyPool->getQueueSize() == 0)
{
printf("Now I will exit from main\n");
break;
}
sleep(1);
}
delete pMyPool;
return 0;
}
程序具體運行的邏輯是,我們建立了一個5個線程大小的線程池,然后我們又生成了10個任務,往任務隊列里放。由於線程數小於任務數,所以當每個線程都拿到自己的任務時,任務隊列中還有5個任務待處理,然后有些線程處理完自己的任務了,又去隊列里取任務,直到所有任務被處理完了,循環結束,銷毀線程池,退出程序。
完整的線程池框架和測試例子在我的github。