我們知道Java語言對於多線程的支持十分豐富,JDK本身提供了很多性能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我還沒有看,這里先占個坑)還有很多第三方庫的實現。這里我重復“造輪子”的目的還是為了深入理解C++和Linux線程基礎概念,主要以學習的目的。
首先,為什么要使用線程池。因為線程的創建、和清理都是需要耗費系統資源的。我們知道Linux中線程實際上是由輕量級進程實現的,相對於純理論上的線程這個開銷還是有的。假設某個線程的創建、運行和銷毀的時間分別為T1、T2、T3,當T1+T3的時間相對於T2不可忽略時,線程池的就有必要引入了,尤其是處理數百萬級的高並發處理時。線程池提升了多線程程序的性能,因為線程池里面的線程都是現成的而且能夠重復使用,我們不需要臨時創建大量線程,然后在任務結束時又銷毀大量線程。一個理想的線程池能夠合理地動態調節池內線程數量,既不會因為線程過少而導致大量任務堆積,也不會因為線程過多了而增加額外的系統開銷。
其實線程池的原理非常簡單,它就是一個非常典型的生產者消費者同步問題。根據剛才描述的線程池的功能,可以看出線程池至少有兩個主要動作,一個是主程序不定時地向線程池添加任務,另一個是線程池里的線程領取任務去執行。且不論任務和執行任務是個什么概念,但是一個任務肯定只能分配給一個線程執行。這樣就可以簡單猜想線程池的一種可能的架構了:主程序執行入隊操作,把任務添加到一個隊列里面;池子里的多個工作線程共同對這個隊列試圖執行出隊操作,這里要保證同一時刻只有一個線程出隊成功,搶奪到這個任務,其他線程繼續共同試圖出隊搶奪下一個任務。所以在實現線程池之前,我們需要一個隊列。這里的生產者就是主程序,生產任務(增加任務),消費者就是工作線程,消費任務(執行、減少任務)。因為這里涉及到多個線程同時訪問一個隊列的問題,所以我們需要互斥鎖來保護隊列,同時還需要條件變量來處理主線程通知任務到達、工作線程搶奪任務的問題。
一般來說實現一個線程池主要包括以下4個組成部分:
- 線程管理器:用於創建並管理線程池。
- 工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處於空閑狀態。
- 任務接口:每個任務必須實現的接口。當線程池的任務隊列中有可執行任務時,被空間的工作線程調去執行(線程的閑與忙的狀態是通過互斥量實現的),把任務抽象出來形成一個接口,可以做到線程池與具體的任務無關。
- 任務隊列:用來存放沒有處理的任務。提供一種緩沖機制。實現這種結構有很多方法,常用的有隊列和鏈表結構。
流程圖如下:
ool.h
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/*執行任務的類:設置任務數據並執行*/
class CTask {
protected:
string m_strTaskName; //任務的名稱
void* m_ptrData; //要執行的任務的具體數據
public:
CTask() = default;
CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {}
virtual int Run() = 0;
void setData(void* data); //設置任務數據
virtual ~CTask() {}
};
/*線程池管理類*/
class CThreadPool {
private:
static vector<CTask*> m_vecTaskList; //任務列表
static bool shutdown; //線程退出標志
int m_iThreadNum; //線程池中啟動的線程數
pthread_t *pthread_id;
static pthread_mutex_t m_pthreadMutex; //線程同步鎖
static pthread_cond_t m_pthreadCond; //線程同步條件變量
protected:
static void* ThreadFunc(void *threadData); //新線程的線程回調函數
static int MoveToIdle(pthread_t tid); //線程執行結束后,把自己放入空閑線程中
static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去
int Create(); //創建線程池中的線程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任務添加到任務隊列中
int StopAll(); //使線程池中的所有線程退出
int getTaskSize(); //獲取當前任務隊列中的任務數
};
#endif
2 thread_pool.cpp
#include "thread_pool.h"
#include <cstdio>
void CTask::setData(void* data) {
m_ptrData = data;
}
//靜態成員初始化
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
//線程管理類構造函數
CThreadPool::CThreadPool(int threadNum) {
this->m_iThreadNum = threadNum;
printf("I will create %d threads.\n", threadNum);
Create();
}
//線程回調函數
void* CThreadPool::ThreadFunc(void* threadData) {
pthread_t tid = pthread_self();
while(1)
{
pthread_mutex_lock(&m_pthreadMutex);
//如果隊列為空,等待新任務進入任務隊列
while(m_vecTaskList.size() == 0 && !shutdown)
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
//關閉線程
if(shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("[tid: %lu]\texit\n", pthread_self());
pthread_exit(NULL);
}
printf("[tid: %lu]\trun: ", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
//取出一個任務並處理之
CTask* task = *iter;
if(iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); //執行任務
printf("[tid: %lu]\tidle\n", tid);
}
return (void*)0;
}
//往任務隊列里添加任務並發出線程同步信號
int CThreadPool::AddTask(CTask *task) {
pthread_mutex_lock(&m_pthreadMutex);
m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
//創建線程
int CThreadPool::Create() {
pthread_id = new pthread_t[m_iThreadNum];
for(int i = 0; i < m_iThreadNum; i++)
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
return 0;
}
//停止所有線程
int CThreadPool::StopAll() {
//避免重復調用
if(shutdown)
return -1;
printf("Now I will end all threads!\n\n");
//喚醒所有等待進程,線程池也要銷毀了
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
//清楚僵屍
for(int i = 0; i < m_iThreadNum; i++)
pthread_join(pthread_id[i], NULL);
delete[] pthread_id;
pthread_id = NULL;
//銷毀互斥量和條件變量
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
//獲取當前隊列中的任務數
int CThreadPool::getTaskSize() {
return m_vecTaskList.size();
}
3 main.cpp
#include "thread_pool.h"
#include <cstdio>
#include <stdlib.h>
#include <unistd.h>
class CMyTask: public CTask {
public:
CMyTask() = default;
int Run() {
printf("%s\n", (char*)m_ptrData);
int x = rand()%4 + 1;
sleep(x);
return 0;
}
~CMyTask() {}
};
int main() {
CMyTask taskObj;
char szTmp[] = "hello!";
taskObj.setData((void*)szTmp);
CThreadPool threadpool(5); //線程池大小為5
for(int i = 0; i < 10; i++)
threadpool.AddTask(&taskObj);
while(1) {
printf("There are still %d tasks need to handle\n", threadpool.getTaskSize());
//任務隊列已沒有任務了
if(threadpool.getTaskSize()==0) {
//清除線程池
if(threadpool.StopAll() == -1) {
printf("Thread pool clear, exit.\n");
exit(0);
}
}
sleep(2);
printf("2 seconds later...\n");
}
return 0;
}
4 Makefile
CC:= g++
TARGET:= threadpool
INCLUDE:= -I./
LIBS:= -lpthread
# C++語言編譯參數
CXXFLAGS:= -std=c++11 -g -Wall -D_REENTRANT
# C預處理參數
# CPPFLAGS:=
OBJECTS :=thread_pool.o main.o
$(TARGET): $(OBJECTS)
$(CC) -o $(TARGET) $(OBJECTS) $(LIBS)
# $@表示所有目標集
%.o:%.cpp
$(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o $@
.PHONY : clean
clean:
-rm -f $(OBJECTS) $(TARGET)
5 輸出結果
I will create 5 threads.
There are still 10 tasks need to handle
[tid: 140056759576320] run: hello!
[tid: 140056751183616] run: hello!
[tid: 140056742790912] run: hello!
[tid: 140056734398208] run: hello!
[tid: 140056767969024] run: hello!
2 seconds later...
There are still 5 tasks need to handle
[tid: 140056742790912] idle
[tid: 140056742790912] run: hello!
[tid: 140056767969024] idle
[tid: 140056767969024] run: hello!
[tid: 140056751183616] idle
[tid: 140056751183616] run: hello!
[tid: 140056759576320] idle
[tid: 140056759576320] run: hello!
[tid: 140056751183616] idle
[tid: 140056751183616] run: hello!
[tid: 140056734398208] idle
2 seconds later...
There are still 0 tasks need to handle
Now I will end all threads!
2 seconds later...
[tid: 140056734398208] exit
[tid: 140056767969024] idle
[tid: 140056767969024] exit
[tid: 140056759576320] idle
[tid: 140056759576320] exit
[tid: 140056751183616] idle
[tid: 140056751183616] exit
[tid: 140056742790912] idle
[tid: 140056742790912] exit
2 seconds later...
There are still 0 tasks need to handle
Thread pool clear, exit.
擴展資料: