linux通過c++實現線程池類


線程池的實現


線程池已基於C++11重寫基於C++11實現線程池的工作原理

前言

初學C++,想封裝點常用的C++類,已經寫好了mutex,cond,thread的類,想用起來寫點東西,於是就決定寫線程池了,這里拙筆記錄下學習筆記.
本文主要內容包括: 線程池的概念使用原因適用場景線程池的實現任務調度邏輯樣例測試.

線程池的概念

線程池是指在一個多線程程序中創建一個線程集合,在執行新的任務的時候不是新建一個線程,而是使用線程池中已創建好的線程,一旦任務執行完畢,線程就會休眠等待新的任務分配下來,線程池中的線程數量取決於機器給進程所能分配的內存大小,以及應用程序的需求.

使用原因及適用場合

1.在服務器端編程中,最原始的方法我們使用順序化的結構,一個服務器只能處理一個客戶,如果同時2個客戶端鏈接上來了,服務器只能先處理了先到達的那個個,這樣第二個客戶端只能等了,影響客戶的響應時間.它只適用於客戶量少的短連接.這時候有方案2.

2.在多線程服務器端編程中,一個服務器如果要處理多條鏈接的客戶端,當鏈接很少的時候我們可以每來一條鏈接創建一個線程。但當並發量很大的時候呢,不停地的增加線程,在某個時間計算機資源可能耗盡.於是有了方案3

3.為了彌補方案2中每個請求創建線程的缺陷,我們使用固定大小線程池,全部IO交給IO復用線程解決(本文不涉及),而任務計算交給線程池.如果任務彼此獨立,IO壓力不大,那么這種方案非常適合.

當然服務器模型遠不止這3種,還有很多方案,本文不涉.

線程池的實現原理

線程池類主要維系兩個隊列:任務隊列,線程隊列

線程池通過take方法從線程隊列提取任務,到一個線程中去執行; 有任務就提取執行,無任務則阻塞線程休眠.

任務隊列可以單獨寫個任務類出來,也可以寫個任務類基類,預留虛任務函數接口,繼承下來泛化.
當然最便利的方法就是直接用函數地址來做任務咯.

	typedef void (*Task)(void);

線程隊列 線程隊列通過我自己寫的線程類實現.

#include <pthread.h>

class Thread{
public:
typedef void (*threadFun_t)(void *arg);

	explicit Thread(const threadFun_t &threadRoutine, void *arg);
	~Thread();
	void start();
	void join();
	static void *threadGuide(void *arg);
	pthread_t getThreadId() const{
		return m_threadId;
	}
private:
	pthread_t m_threadId;
	bool m_isRuning;
	threadFun_t m_threadRoutine;
	void *m_threadArg;
};

Thread::Thread(const threadFun_t &threadRoutine, void *arg)
	:m_isRuning(false),
	 m_threadId(0),
	 m_threadRoutine(threadRoutine),
	 m_threadArg(arg){
}

Thread::~Thread(){
	if(m_isRuning){//如果線程正在執行,則分離此線程.
		CHECK(!pthread_detach(m_threadId));
	}
}

void *Thread::threadGuide(void *arg){
	Thread *p = static_cast<Thread *>(arg);
	p->m_threadRoutine(p->m_threadArg);
	return NULL;
}

void Thread::join(){
	VERIFY(m_isRuning);
	CHECK(!pthread_join(m_threadId, NULL));
	m_isRuning = false;
}

void Thread::start(){

	pthread_attr_t attr;

	CHECK(!pthread_attr_init(&attr));

	//CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));    //set thread separation state property

	CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED));    //Set thread inheritance

	CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER));                //set thread scheduling policy

	CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM));             //Set thread scope

	CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this));

	m_isRuning = true;

}{
		MutexLockGuard lock(m_mutex);
		m_isRuning = false;
	}
}

構造傳入帶一個空類型參數指針(為什么要帶這個空類型指針后面會提)函數指針,通過start()方法創建線程,然后執行threadGuide()方法調用構造時候傳入的函數指針執行咱們想運行的函數實現Thread類.

這兩個隊列在線程池中的定義如下:

private:
	std::vector<Thread *> m_threads;
	std::deque<Task> m_tasks;

任務調度邏輯

任務分配邏輯主要靠兩個條件變量實現,(條件變量本文不做詳述)
1.任務隊列是否空.
2.任務隊列是否滿.
其邏輯如下圖所示:

start()方法是線程池的運行方法.通過它創建線程池.
threadRoutine()就是我們就是線程池中創建的線程,
線程跑起來后,通過 isRunning 控制線程循環是否退出.
stop()方法關閉線程池,回收資源.
循環中判斷 : 有任務則執行,無任務則wait 阻塞等待.

void ThreadPool::start(){
	m_isRuning = true;
	m_threads.reserve(m_threadsSize);
	for(size_t i = 0; i < m_threadsSize; i++){
		m_threads.push_back(new Thread(threadRoutine, this));
		m_threads[i]->start();
	}
}

Thread(threadRoutine, this) 這里就是為什么我線程類要帶一個無符號類型指針參數的原因,因為靜態函數無法調用c++的類成員函數(主要原因是類在編譯期間未實例化沒有明確的地址.)我們只能通過線程池對象的this指針調用它的成員.

任務調度的源碼實現:

ThreadPool::ThreadPool(size_t tasksSize, size_t threadsSize)
	:m_tasksSzie(tasksSize),
	 m_threadsSize(threadsSize),
	 m_mutex(),
	 m_tasksEmpty(m_mutex),
	 m_tasksFull(m_mutex),
	 m_isRuning(false){
}

ThreadPool::~ThreadPool(){
	if(m_isRuning){
		stop();
	}
}

void ThreadPool::threadRoutine(void *arg){
	ThreadPool *p = static_cast<ThreadPool *>(arg);
	while(p->m_isRuning){
		ThreadPool::Task task(p->take());
		if(task){
			task();
		}
	}
}

ThreadPool::Task ThreadPool::take(){
	MutexLockGuard lock(m_mutex);
	while(m_tasks.empty() && m_isRuning){
		m_tasksEmpty.wait();
	}
	if(!m_tasks.empty()){
		Task task = m_tasks.front();
		m_tasks.pop_front();
		m_tasksFull.notify();
		return task;
	}
	return NULL;
}

void ThreadPool::addTask(Task task){
	if(m_threads.empty()){//如果線程池是空的,直接跑任務.
		task();
	}
	else{
		MutexLockGuard lock(m_mutex);
		while(m_tasksSzie > 0 && m_tasks.size() >= m_tasksSzie){
			m_tasksFull.wait();
		}

		m_tasks.push_back(task);
		m_tasksEmpty.notify();
	}
}

void ThreadPool::start(){
	m_isRuning = true;
	m_threads.reserve(m_threadsSize);
	for(size_t i = 0; i < m_threadsSize; i++){
		m_threads.push_back(new Thread(threadRoutine, this));
		m_threads[i]->start();
	}
}

void ThreadPool::stop(){
	{
		MutexLockGuard lock(m_mutex);
		m_isRuning = false;
		m_tasksEmpty.notifyAll();
	}
	for(int i = m_threadsSize - 1; i >= 0; i--){
		m_threads[i]->join();
		delete(m_threads[i]);
		m_threads.pop_back();
	}
}

程序測試

測試代碼:
創建一個有兩個線程,擁有5個任務的任務隊列,執行8個加數任務.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "MutexLock.hh"
#include "Thread.hh"
#include <unistd.h>
#include "Condition.hh"
#include "ThreadPool.hh"
#include <vector>

//threadPool test

MutexLock CntLock;

int cnt = 0;

void test(void){
	unsigned long i = 0xfffffff;
	//MutexLockGuard loo(CntLock);
	//CntLock.lock();
	while(i--);
	printf("%d\n", ++cnt);
	//CntLock.unlock();
	sleep(1);
}

int main()
{
//ThreadPool Test

	ThreadPool tp(5, 2);
	tp.start();

	sleep(3);
	for(int i = 0; i < 8; i++)
		tp.addTask(test);
		
	getchar();

	return 0;
}

簡單test結果:

thread 140068496353024 run task
thread 140068504745728 run task
1
2
thread 140068504745728 run task
thread 140068496353024 run task
3
4
thread 140068496353024 run task
thread 140068504745728 run task
5
6
thread 140068496353024 run task
thread 140068504745728 run task
7
8


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM