一、一般來說實現一個線程池主要包括以下幾個組成部分:
1)線程管理器 :用於創建並管理線程池 。
2)工作線程 :線程池中實際執行任務的線程 。 在初始化線程時會預先創建好固定數目的線程在池中 ,這些初始化的線程一般是處於空閑狀態 ,不消耗CPU,占用較小的內存空間 。
3)任務接口 :每個任務必須實現的接口 ,當線程池中的可執行的任務時 ,被工作線程調試執行。 把任務抽象出來形成任務接口 ,可以做到線程池與具體的任務無關 。
4)任務隊列 :用來存放沒有處理的任務 ,提 供一種緩沖機制 。 實現這種結構有好幾種方法 ,常用的是隊列 ,主要是利用它先進先出的工作原理;另外一種是鏈表之類的數據結構 ,可以動態為它分配內存空間 ,應用中比較靈活 。
二、常用的線程池模型

如上圖所示,工作隊列由主線程和工作者線程共享,主線程將任務放進工作隊列,工作者線程從工作隊列中取出任務執行。共享工作隊列的操作需在互斥量的保護下安全進行,主線程將任務放進工作隊列時若檢測到當前待執行的工作數目小於工作者線程總數,則需使用條件變量喚醒可能處於等待狀態的工作者線程。當然,還有其他地方可能也會使用到互斥量和條件變量,不再贅述。
三、無鎖化線程池模型
注:上述資料來自於:
四、我的線程池模型
我的線程池模型跟前面無鎖化線程池模型很像,不過,它並不是無鎖的。但是它是Work Stealing Pool.
這種線程模型對應於幾種網絡服務器編程模型中的是:Reactor + thread pool(主線程IO,工作線程計算)。
關於Work Stealing Pool的一些說明:
(1)Steal,翻譯為偷竊,竊取。這里的意思是,如果當前工作線程處理完自己本地任務隊列中的任務時,就會去全局隊列或者其他工程線程的隊列里面查找工作任務,幫助它們完成。
(2)利用Work Staling,可以更好實現負載均衡。因為每個工作線程的任務都是不一樣的,完成的時間也不一樣。
五、線程池具體實現
1. 這個線程池的優點:
(1)當添加任務時,喚醒一個空閑的工作線程,而不是一群線程,所以不會產生驚群現象。
(2)Work stealing pool,每個工作線程有自己的任務隊列,當前完成自己本地的隊列的任務時,會自動去全局隊列里面獲取任務來工作,或者去”偷“其他線程的隊列里面的任務。
(3)當添加任務時,沒有直接就把任務集中放在全局隊列里面,避免工作線程集中去全局隊列里面獲取任務而造成頻繁的鎖開銷。
2. 這個線程的缺點:
這個線程池有一個很明顯的缺陷,就是,如果線程池里只有一個線程時,所添加的工作任務不支持任務遞歸,什么意思呢?就是說,在線程所要執行的工作任務,不能再添加新的工作任務到線程池中,否則,會造成死鎖。
為什么會有這個問題呢?
其實,跟這個線程池的實現有很大關系(這不是廢話嘛),線程在執行任務時,用了加鎖操作,而且只有當當前任務執行完成后才通過信號量的方式通知主線程(等待結果的線程)計算結果已經完成了,所以,如果在任務中遞歸執行添加新的任務在線程池中,就會造成死鎖,因為第一個在執行第一個任務之前就鎖住了線程。
一些可能的解決辦法:
要怎么解決這個問題呢?一個可能性的解決方法是,對應這種內部的任務,另外開一個線程去執行。不過,因為時間的關系,我還沒有試過。
3. 這個線程的一些方面有待以后改進:
(1)在線程去streal 其他線程的工作任務時,是需要給其他線程加鎖了,雖然是從隊列尾端拿數據,而它本身的工作線程是從隊列頭端拿數據,其原因是考慮到一個情況就是,當隊列里面只剩下一個任務時,有可能出現競爭的情況,所以,一個方法是,對於頭尾指針要經過特別的初始化處理。(具體需要查找資料,網絡上有博客說過,往了是哪里了)
(2)這個線程池的隊列用了鎖,但實際上可以用無鎖隊列來實現。網上有人寫過,可以找來參考下。
4. 具體的代碼
因為代碼通俗易懂,也加上了很多注釋,就不多做解釋了。
threadpool.h

struct thread_pool; struct future; /* Create a new thread pool with no more than n threads. */ struct thread_pool * thread_pool_new(int nthreads); void thread_pool_shutdown_and_destroy(struct thread_pool *); typedef void * (* fork_join_task_t) (struct thread_pool *pool, void * data); struct future * thread_pool_submit( struct thread_pool *pool, fork_join_task_t task, void * data); void * future_get(struct future *); void future_free(struct future *);
threadpool.c

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/time.h> #include <errno.h> #include <assert.h> #include <pthread.h> #include <semaphore.h> #include <fcntl.h> #include "threadpool.h" struct future { fork_join_task_t task; void *arg; //parameter void *result; sem_t *sem; int status; //0: not to do, 1: doing, 2: done int local; //1: internal task, 0: external task struct future *prev; struct future *next; }; struct thread_t { pthread_t id; pthread_mutex_t mutex; pthread_cond_t cond; int idle; //1: idle, 0: busy int index; //record the current thread index in pool int current_task_num; //total task number in current thread struct thread_pool *pool; //point to the pool area struct future *head; struct future *tail; }; struct thread_pool { int max_threads; pthread_mutex_t mutex; int shutdown; //1: shutdown, 0: normal struct thread_t *threads; struct future *head; struct future *tail; }; static void *thread_route(void *arg) { assert(arg != NULL); struct thread_t *thread = (struct thread_t *)arg; assert(thread != NULL); struct thread_pool *pool = thread->pool; assert(pool != NULL); struct future *future = NULL; while(1) { pthread_mutex_lock(&thread->mutex); if(future != NULL) { thread->idle = 0; future->status = 1; //doing future->result = future->task(pool, future->arg); future->status = 2; sem_post(future->sem); } while(thread->current_task_num == 0 && pool->shutdown == 0) { //wait for task assigment pthread_cond_wait(&thread->cond, &thread->mutex); } if(pool->shutdown == 1) { //pool is shutdown, destroy the local task list struct future *temp = NULL; while(thread->head != NULL) { temp = thread->head; thread->head = thread->head->next; free(temp); } pthread_mutex_unlock(&thread->mutex); pthread_exit(NULL); } //Fist, get task from local task list to do while(thread->head != NULL) { thread->idle = 0; future = thread->head; thread->head = thread->head->next; if(thread->tail == future) thread->tail = NULL; else thread->head->prev = NULL; //call the callback to do work thread->current_task_num--; future->status = 1; //doing #if 0 if(pool->max_threads == 1 && future->local == 1) { /* * TBD: in case there is only a thread in pool * and the task is local task * we can create a thread to do the task? */ } else #else { future->result = future->task(pool, future->arg); } #endif future->status = 2; sem_post(future->sem); //Let future_get know, the result is ok } pthread_mutex_unlock(&thread->mutex); thread->idle = 1; /* * The local task work are done, go to global task list to get task * or go to other work thread to get task. */ //Step1: Go to globacl task list to get task(From Head) pthread_mutex_lock(&pool->mutex); future = NULL; while(pool->head != NULL && pool->head->status == 0) { //printf("Worker %d get task from global task, current_task %d\n", thread->index, thread->current_task_num); future = pool->head; pool->head = pool->head->next; if(pool->tail == future) pool->tail = NULL; else pool->head->prev = NULL; //Get the future, then put into the local task list? #if 0 pthread_mutex_lock(&thread->mutex); if(thread->head != NULL) { future->next = thread->head; thread->head->prev = future; } else { thread->tail = future; } thread->head = future; thread->current_task_num++; pthread_mutex_unlock(&thread->mutex); if(thread->current_task_num == 9) { //Get 10 tasks, ok, get out, give some changes to other work threads break; } #else //printf("Worked %d get one task from globack task list.\n", thread->index); break; //get one task, break #endif } pthread_mutex_unlock(&pool->mutex); //Step2: Go to other work thread task list to get task(From Tail) if(future == NULL && thread->current_task_num == 0) { //printf("Worker %d can not get task from global task, then try other work threads, current_task %d\n", thread->index, thread->current_task_num); future = pool->head; int i = 0; struct thread_t *other_thread = NULL; for(i=0; i<pool->max_threads; i++) { if(i == thread->index) continue; //myself if(pool->threads[i].current_task_num == 0) continue; //it has no task //lock it? pthread_mutex_lock(&pool->threads[i].mutex); other_thread = (struct thread_t *)&pool->threads[i]; while(other_thread->tail != NULL && other_thread->tail->status == 0) { future = other_thread->tail; other_thread->tail = other_thread->tail->prev; if(future == other_thread->head) other_thread->head = NULL; else other_thread->tail->next = NULL; //Get the future, then put into our local task list? #if 0 pthread_mutex_lock(&thread->mutex); if(thread->head != NULL) { future->next = thread->head; thread->head->prev = future; } else { thread->tail = future; } thread->head = future; thread->current_task_num++; printf("Worker %d get task from other thread task, current_task %d\n", thread->current_task_num); pthread_mutex_unlock(&thread->mutex); if(thread->current_task_num == 4) { //Get 4 tasks, ok, get out, give some changes to other work threads break; } #else //printf("Worked %d get one task from other worker %d.\n", thread->index, i); break; //get one task, break #endif } pthread_mutex_unlock(&pool->threads[i].mutex); } } } } struct thread_pool * thread_pool_new(int nthreads) { struct thread_pool *pool = (struct thread_pool *)malloc(sizeof(struct thread_pool)); assert(pool != NULL); pool->max_threads = nthreads; pool->head = pool->tail = NULL; pthread_mutex_init(&pool->mutex, NULL); pool->threads = (struct thread_t *)malloc(nthreads * sizeof(struct thread_t)); assert(pool->threads != NULL); int i = 0; for(i=0; i<pool->max_threads; i++) { pthread_mutex_init(&pool->threads[i].mutex, NULL); pthread_cond_init(&pool->threads[i].cond, NULL); pool->threads[i].idle = 1; //idle pool->threads[i].index = i; pool->threads[i].pool = pool; //point to the pool area pool->threads[i].current_task_num = 0; pthread_create(&pool->threads[i].id, NULL, thread_route,(void *)(&pool->threads[i])); } return pool; } struct future * thread_pool_submit( struct thread_pool *pool, fork_join_task_t task, void * data) { assert(pool != NULL); struct future *future = (struct future *)malloc(sizeof(struct future)); assert(future); future->task = task; future->arg = data; future->prev = future->next = NULL; future->result = NULL; future->status = 0; future->local = 0; //default is external task int i = 0; unsigned long myself_pid = pthread_self(); for(i=0; i<pool->max_threads; i++) { if(pool->threads[i].id == myself_pid) { future->local = 1; //it is internal task break; } } future->sem = (sem_t *)malloc(sizeof(sem_t)); assert(future->sem != NULL); sem_init(future->sem, 0, 0); //find a idle work thread to put the task struct thread_t * thread = NULL; for(i = 0; i< pool->max_threads; i++) { thread = &pool->threads[i]; pthread_mutex_lock(&thread->mutex); if(thread->idle == 1) { //find it, insert the task from head if(thread->head != NULL) { future->next = thread->head; thread->head->prev = future; } else { thread->tail = future; } thread->head = future; thread->current_task_num++; //Just let work thread know, it has work to do if(thread->current_task_num == 1) { //printf("%s(): Let worker %d to start to work\n", __FUNCTION__, thread->index); pthread_cond_signal(&thread->cond); } pthread_mutex_unlock(&thread->mutex); return future; } pthread_mutex_unlock(&thread->mutex); } //can not find idle work thread, just put it into global task list //printf("%s(): no find idle work thread, just put into global task list\n", __FUNCTION__); pthread_mutex_lock(&pool->mutex); if(pool->head != NULL) { future->next = pool->head; pool->head->prev = future; } else { pool->tail = future; } pool->head = future; pthread_mutex_unlock(&pool->mutex); return future; } void * future_get(struct future *future) { assert(future); sem_wait(future->sem); //wait for the result ready return (void *)future->result; }
完整的代碼,請看我的GitHub:
https://github.com/wolf623/Work-Stealing-Pool
如需轉發,請注明出處: