基於Linux/C++簡單線程池的實現


我們知道Java語言對於多線程的支持十分豐富,JDK本身提供了很多性能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我還沒有看,這里先占個坑)還有很多第三方庫的實現。這里我重復“造輪子”的目的還是為了深入理解C++和Linux線程基礎概念,主要以學習的目的。

首先,為什么要使用線程池。因為線程的創建、和清理都是需要耗費系統資源的。我們知道Linux中線程實際上是由輕量級進程實現的,相對於純理論上的線程這個開銷還是有的。假設某個線程的創建、運行和銷毀的時間分別為T1、T2、T3,當T1+T3的時間相對於T2不可忽略時,線程池的就有必要引入了,尤其是處理數百萬級的高並發處理時。線程池提升了多線程程序的性能,因為線程池里面的線程都是現成的而且能夠重復使用,我們不需要臨時創建大量線程,然后在任務結束時又銷毀大量線程。一個理想的線程池能夠合理地動態調節池內線程數量,既不會因為線程過少而導致大量任務堆積,也不會因為線程過多了而增加額外的系統開銷。

其實線程池的原理非常簡單,它就是一個非常典型的生產者消費者同步問題。根據剛才描述的線程池的功能,可以看出線程池至少有兩個主要動作,一個是主程序不定時地向線程池添加任務,另一個是線程池里的線程領取任務去執行。且不論任務和執行任務是個什么概念,但是一個任務肯定只能分配給一個線程執行。這樣就可以簡單猜想線程池的一種可能的架構了:主程序執行入隊操作,把任務添加到一個隊列里面;池子里的多個工作線程共同對這個隊列試圖執行出隊操作,這里要保證同一時刻只有一個線程出隊成功,搶奪到這個任務,其他線程繼續共同試圖出隊搶奪下一個任務。所以在實現線程池之前,我們需要一個隊列。這里的生產者就是主程序,生產任務(增加任務),消費者就是工作線程,消費任務(執行、減少任務)。因為這里涉及到多個線程同時訪問一個隊列的問題,所以我們需要互斥鎖來保護隊列,同時還需要條件變量來處理主線程通知任務到達、工作線程搶奪任務的問題。

一般來說實現一個線程池主要包括以下4個組成部分:

  1. 線程管理器:用於創建並管理線程池。
  2. 工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處於空閑狀態。
  3. 任務接口:每個任務必須實現的接口。當線程池的任務隊列中有可執行任務時,被空間的工作線程調去執行(線程的閑與忙的狀態是通過互斥量實現的),把任務抽象出來形成一個接口,可以做到線程池與具體的任務無關。
  4. 任務隊列:用來存放沒有處理的任務。提供一種緩沖機制。實現這種結構有很多方法,常用的有隊列和鏈表結構。

流程圖如下:

ool.h

#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H

#include <vector>
#include <string>
#include <pthread.h>

using namespace std;

/*執行任務的類:設置任務數據並執行*/
class CTask {
protected:
	string m_strTaskName;	//任務的名稱
  	void* m_ptrData;	//要執行的任務的具體數據

public:
  	CTask() = default;
  	CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {}
  	virtual int Run() = 0;
  	void setData(void* data);	//設置任務數據
  
  	virtual ~CTask() {}
  	
};

/*線程池管理類*/
class CThreadPool {
private:
  	static vector<CTask*> m_vecTaskList;	//任務列表
  	static bool shutdown;	//線程退出標志
  	int m_iThreadNum;	//線程池中啟動的線程數
  	pthread_t *pthread_id;
  
  	static pthread_mutex_t m_pthreadMutex;	//線程同步鎖
  	static pthread_cond_t m_pthreadCond;	//線程同步條件變量
  
protected:
  	static void* ThreadFunc(void *threadData);	//新線程的線程回調函數
  	static int MoveToIdle(pthread_t tid);	//線程執行結束后,把自己放入空閑線程中
  	static int MoveToBusy(pthread_t tid);	//移入到忙碌線程中去
  	int Create();	//創建線程池中的線程
  
public:
  	CThreadPool(int threadNum);
  	int AddTask(CTask *task);	//把任務添加到任務隊列中
  	int StopAll();	//使線程池中的所有線程退出
  	int getTaskSize();	//獲取當前任務隊列中的任務數
};

#endif

2 thread_pool.cpp

#include "thread_pool.h"
#include <cstdio>

void CTask::setData(void* data) {
  	m_ptrData = data;
}

//靜態成員初始化
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

//線程管理類構造函數
CThreadPool::CThreadPool(int threadNum) {
  	this->m_iThreadNum = threadNum;
  	printf("I will create %d threads.\n", threadNum);
	Create();
}

//線程回調函數
void* CThreadPool::ThreadFunc(void* threadData) {
  	pthread_t tid = pthread_self();
  	while(1)
    {
    	pthread_mutex_lock(&m_pthreadMutex);
      	//如果隊列為空,等待新任務進入任務隊列
      	while(m_vecTaskList.size() == 0 && !shutdown)
          	pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
		
		//關閉線程
		if(shutdown)
		{
			pthread_mutex_unlock(&m_pthreadMutex);
			printf("[tid: %lu]\texit\n", pthread_self());
			pthread_exit(NULL);
		}
		
		printf("[tid: %lu]\trun: ", tid);
		vector<CTask*>::iterator iter = m_vecTaskList.begin();
		//取出一個任務並處理之
		CTask* task = *iter;
		if(iter != m_vecTaskList.end())
		{
			task = *iter;
			m_vecTaskList.erase(iter);
		}
		
		pthread_mutex_unlock(&m_pthreadMutex);
		
		task->Run();	//執行任務
		printf("[tid: %lu]\tidle\n", tid);
		
    }
	
	return (void*)0;
}

//往任務隊列里添加任務並發出線程同步信號
int CThreadPool::AddTask(CTask *task) {	
	pthread_mutex_lock(&m_pthreadMutex);	
	m_vecTaskList.push_back(task);	
	pthread_mutex_unlock(&m_pthreadMutex);	
	pthread_cond_signal(&m_pthreadCond);	
	
  	return 0;
}

//創建線程
int CThreadPool::Create() {	
	pthread_id = new pthread_t[m_iThreadNum];
		for(int i = 0; i < m_iThreadNum; i++)
			pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
		
	return 0;
}

//停止所有線程
int CThreadPool::StopAll() {	
	//避免重復調用
	if(shutdown)
		return -1;
	printf("Now I will end all threads!\n\n");
	
	//喚醒所有等待進程,線程池也要銷毀了
	shutdown = true;
	pthread_cond_broadcast(&m_pthreadCond);
	
	//清楚僵屍
	for(int i = 0; i < m_iThreadNum; i++)
		pthread_join(pthread_id[i], NULL);
	
	delete[] pthread_id;
	pthread_id = NULL;
	
	//銷毀互斥量和條件變量
	pthread_mutex_destroy(&m_pthreadMutex);
	pthread_cond_destroy(&m_pthreadCond);
	
	return 0;
}

//獲取當前隊列中的任務數
int CThreadPool::getTaskSize() {	
	return m_vecTaskList.size();
}

3 main.cpp

#include "thread_pool.h"
#include <cstdio>
#include <stdlib.h>
#include <unistd.h>

class CMyTask: public CTask {	
public:
	CMyTask() = default;	
	int Run() {
		printf("%s\n", (char*)m_ptrData);
		int x = rand()%4 + 1;
		sleep(x);	
		return 0;
	}
	~CMyTask() {}
};

int main() {
	CMyTask taskObj;
	char szTmp[] = "hello!";
	taskObj.setData((void*)szTmp);
	CThreadPool threadpool(5);	//線程池大小為5
	
	for(int i = 0; i < 10; i++)
		threadpool.AddTask(&taskObj);
	
	while(1) {
		printf("There are still %d tasks need to handle\n", threadpool.getTaskSize());
		//任務隊列已沒有任務了
		if(threadpool.getTaskSize()==0) {
			//清除線程池
			if(threadpool.StopAll() == -1) {
				printf("Thread pool clear, exit.\n");
				exit(0);
			}
		}
		sleep(2);
      	printf("2 seconds later...\n");
	}	
	return 0;
}

4 Makefile

CC:= g++
TARGET:= threadpool
INCLUDE:= -I./
LIBS:= -lpthread
# C++語言編譯參數  
CXXFLAGS:= -std=c++11 -g -Wall -D_REENTRANT
# C預處理參數
# CPPFLAGS:=
OBJECTS :=thread_pool.o main.o
  
$(TARGET): $(OBJECTS)
	$(CC) -o $(TARGET) $(OBJECTS) $(LIBS)
  
# $@表示所有目標集  
%.o:%.cpp   
	$(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o $@
  
.PHONY : clean
clean:   
	-rm -f $(OBJECTS) $(TARGET)

5 輸出結果

I will create 5 threads.
There are still 10 tasks need to handle
[tid: 140056759576320]	run: hello!
[tid: 140056751183616]	run: hello!
[tid: 140056742790912]	run: hello!
[tid: 140056734398208]	run: hello!
[tid: 140056767969024]	run: hello!
2 seconds later...
There are still 5 tasks need to handle
[tid: 140056742790912]	idle
[tid: 140056742790912]	run: hello!
[tid: 140056767969024]	idle
[tid: 140056767969024]	run: hello!
[tid: 140056751183616]	idle
[tid: 140056751183616]	run: hello!
[tid: 140056759576320]	idle
[tid: 140056759576320]	run: hello!
[tid: 140056751183616]	idle
[tid: 140056751183616]	run: hello!
[tid: 140056734398208]	idle
2 seconds later...
There are still 0 tasks need to handle
Now I will end all threads!
2 seconds later...
[tid: 140056734398208]	exit
[tid: 140056767969024]	idle
[tid: 140056767969024]	exit
[tid: 140056759576320]	idle
[tid: 140056759576320]	exit
[tid: 140056751183616]	idle
[tid: 140056751183616]	exit
[tid: 140056742790912]	idle
[tid: 140056742790912]	exit
2 seconds later...
There are still 0 tasks need to handle
Thread pool clear, exit.

擴展資料:

線程池設計中的驚群問題

C 實現有追求的線程池 探究

高效線程池(threadpool)的實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM