這里給個線程池的實現代碼,里面帶有個應用小例子,方便學習使用,代碼 GCC 編譯可用。參照代碼看下面介紹的線程池原理跟容易接受,百度雲下載鏈接:
http://pan.baidu.com/s/1i3zMHDV
一.線程池簡介
為什么使用線程池?
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。 傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即 時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態,這筆開銷將是不可忽略的。
線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足。
二.線程池的結構
2.1. 線程池任務結點
線程池任務結點用來保存用戶投遞過來的任務,並放入線程池中的線程執行。其結構名為worker_t,定義如下:
1 /*線程池任務結點*/ 2 struct worker_t 3 { 4 void *(*process) (void *arg); /**< 回調函數 */ 5 int paratype; /**< 函數類型(預留) */ 6 void *arg; /**< 回調函數參數 */ 7 struct worker_t *next; /**< 連接下一個任務結點 */ 8 };
2.2. 線程池控制器
線程池控制器用來對線程池進行控制管理,包括任務的投遞、線程池狀態的更新與查詢、線程池的銷毀等。其結構名為CThread_pool_t,定義如下:
1 /*線程池控制器*/ 2 struct CThread_pool_t 3 { 4 pthread_mutex_t queue_lock; /**< 互斥鎖 */ 5 pthread_cond_t queue_ready; /**< 條件變量 */ 6 7 worker_t *queue_head; /**< 任務結點鏈表,保存所有投遞的任務 */ 8 int shutdown; /**< 線程池銷毀標志,1 -> 銷毀 */ 9 pthread_t *threadid; /**< 線程ID */ 10 int max_thread_num; /**< 線程池可容納的最大線程數 */ 11 int current_pthread_num; /**< 當前線程池存放的線程數 */ 12 int current_pthread_task_num; /**< 當前正在執行任務和已分配任務的線程數目和 */ 13 int cur_queue_size; /**< 當前等待隊列的任務數目 */ 14 int free_pthread_num; /**< 線程池內允許存在的最大空閑線程數 */ 15 16 /*向線程池投遞任務*/ 17 int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 18 /*向線程池投遞任務,無空閑線程則阻塞*/ 19 int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 20 /*獲取線程池可容納的最大線程數*/ 21 int (*GetMaxThreadNum) (void *pthis); 22 /*獲取線程池存放的線程數*/ 23 int (*GetCurThreadNum) (void *pthis); 24 /*獲取當前正在執行任務和已分配任務的線程數目和*/ 25 int (*GetCurTaskThreadNum) (void *pthis); 26 /*獲取線程池等待隊列任務數*/ 27 int (*GetCurTaskNum) (void *pthis); 28 /*銷毀線程池*/ 29 int (*Destruct) (void *pthis); 30 };
2.3. 線程池結構
線程池的運行結構圖如下:
根據上圖羅列幾個注意的地方:
(1)圖中的線程池中的“空閑”和“執行”分別表示空閑線程和執行線程,空閑線程指在正在等待任務的線程,同樣執行線程指正在執行任務的線程,兩者是相互轉換的。當用戶投遞任務過來則用空閑線程來執行該任務,且空閑線程狀態轉變為執行線程;當任務執行完后,執行線程狀態轉變為空閑線程;
(2)創建線程池時,正常情況會先創建一定數量的線程,所有線程初始為空閑線程,線程阻塞等待用戶投遞任務;
(3)用戶投遞的任務首先放入等待隊列 queue_head 鏈表中,如果線程池中有空閑線程則放入空閑線程中執行,否則根據條件選擇繼續等待空閑線程或者新建一個線程來執行,新建的線程將放入線程池中;
(4)執行的任務會從等待隊列中脫離,並在任務執行完后釋放任務結點worker_t;
三.線程池控制
這里就線程池實現代碼中的部分代碼進行解釋,請參照我上面給的線程池代碼,里面的注釋我標注的還是比較詳細的。
3.1. 線程池創建
1 CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 2 { 3 ...... //略 4 for (i = 0; i < max_num; i++) 5 { 6 pool->current_pthread_num++; /**< 當前池中的線程數 */ 7 pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool); /**< 創建線程 */ 8 usleep(1000); 9 } 10 }
這里創建了 max_num 個線程 ThreadPoolRoutine( ),即上面提到的空閑線程。
3.2. 投遞任務
1 static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 2 { 3 ...... //略 4 worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 5 newworker->process = process; /**< 回調函數,在線程ThreadPoolRoutine()中執行 */ 6 newworker->arg = arg; /**< 回調函數參數 */ 7 newworker->next = NULL; 8 9 pthread_mutex_lock(&(pool->queue_lock)); 10 ...... //將任務結點放入等待隊列,略 11 pool->cur_queue_size++; /**< 等待隊列加1 */ 12 13 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; 14 if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) 15 {/**< 如果沒有空閑線程且池中當前線程數不超過可容納最大線程 */ 16 int current_pthread_num = pool->current_pthread_num; 17 pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); /**< 新增線程 */ 18 pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); 19 pool->current_pthread_num++; /**< 當前池中線程總數加1 */ 20 21 pool->current_pthread_task_num++; /**< 分配任務的線程數加1 */ 22 pthread_mutex_unlock (&(pool->queue_lock)); 23 pthread_cond_signal (&(pool->queue_ready)); /**< 發送信號給1個處於條件阻塞等待狀態的線程 */ 24 return 0; 25 } 26 27 pool->current_pthread_task_num++; 28 pthread_mutex_unlock(&(pool->queue_lock)); 29 pthread_cond_signal(&(pool->queue_ready)); 30 return 0; 31 }
投遞任務時先創建一個任務結點保存回調函數和函數參數,並將任務結點放入等待隊列中。代碼第14行判斷條件為真時,則進行新線程創建,realloc( )會在保存原始內存中的數據不變的基礎上新增1個sizeof (pthread_t)大小的內存。之后更新 current_pthread_num 和 current_pthread_task_num ,並發送信號 pthread_cond_signal(&(pool->queue_ready)) 給一個處於條件阻塞等待狀態的線程,即線程 ThreadPoolRoutine( )中的
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)) 阻塞等待接收信號。重點講互斥鎖和條件變量:
pthread_mutex_t queue_lock; /**< 互斥鎖 */
pthread_cond_t queue_ready; /**< 條件變量 */
這兩個變量是線程池實現里很重要的點,建議網上搜索資料學習,這里簡要介紹先代碼中會用到的相關函數功能:
放張圖吧,這樣直觀點,線程互斥鎖的一些調用函數一般都有接觸過,這里就不作介紹了。
3.3. 執行線程
1 static void * ThreadPoolRoutine (void *arg) 2 { 3 CThread_pool_t *pool = (CThread_pool_t *)arg; 4 while (1) 5 { 6 pthread_mutex_lock (&(pool->queue_lock)); /**< 上鎖, pthread_cond_wait()調用會解鎖*/ 7 8 while ((pool->cur_queue_size == 0) && (!pool->shutdown)) /**< 隊列沒有等待任務*/ 9 { 10 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); /**< 條件鎖阻塞等待條件信號*/ 11 } 12 if (pool->shutdown) 13 { 14 pthread_mutex_unlock (&(pool->queue_lock)); 15 pthread_exit (NULL); /**< 釋放線程 */ 16 } 17 18 ..... //略 19 worker_t *worker = pool->queue_head; /**< 取等待隊列任務結點頭*/ 20 pool->queue_head = worker->next; /**< 鏈表后移 */ 21 22 pthread_mutex_unlock (&(pool->queue_lock)); 23 (*(worker->process)) (worker->arg); /**< 執行回調函數 */ 24 pthread_mutex_lock (&(pool->queue_lock)); 25 26 pool->current_pthread_task_num--; /**< 函數執行結束 */ 27 free (worker); /**< 釋放任務結點 */ 28 worker = NULL; 29 30 if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) 31 { 32 pthread_mutex_unlock (&(pool->queue_lock)); 33 break; /**< 當池中空閑線程超過 free_pthread_num則將線程釋放回操作系統 */ 34 } 35 pthread_mutex_unlock (&(pool->queue_lock)); 36 } 37 38 pool->current_pthread_num--; /**< 當前池中線程數減1 */ 39 pthread_exit (NULL); /**< 釋放線程*/ 40 return (void*)NULL; 41 }
這個就是用來執行投遞任務的線程,在初始創建線程時所有線程都全部阻塞在pthread_cond_wait( )處,此時的線程狀態就為空閑線程,也就是線程被掛起;
當收到信號並取得互斥鎖時,表明有任務投遞過來,則獲取等待隊列里的任務結點並開始執行回調函數;
函數執行結束后回去判斷當前等待隊列是否還有任務,有則接下去執行,否則重新阻塞回到空閑線程狀態;
3.4. 線程銷毀
線程銷毀主要做的就是銷毀線程和釋放動態內存,自己看代碼就懂了。
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include <string.h> #include "lib_thread_pool.h" /*---------------constants/macro definition---------------------*/ /*---------------global variables definition-----------------------*/ /*---------------functions declaration--------------------------*/ static void * ThreadPoolRoutine (void *arg); /*---------------functions definition---------------------------*/ /**************************************************************** * function name : ThreadPoolAddWorkLimit * functional description : 向線程池投遞任務,無空閑線程則阻塞 * input parameter : pthis 線程池指針 process 回調函數 arg 回調函數的參數 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; /**< 回調函數,在線程ThreadPoolRoutine()中執行 */ newworker->arg = arg; /**< 回調函數參數 */ newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; /**< 等待隊列任務鏈表 */ if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; /**< 放入鏈表尾 */ } else { pool->queue_head = newworker; /**< 放入鏈表頭 */ } assert (pool->queue_head != NULL); pool->cur_queue_size++; /**< 等待隊列加1 */ int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {/**< 如果沒有空閑線程且池中當前線程數不超過可容納最大線程 */ int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); /**< 新增線程 */ pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; /**< 當前池中線程總數加1 */ pool->current_pthread_task_num++; /**< 分配任務的線程數加1 */ pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); /**< 發送信號給1個處於條件阻塞等待狀態的線程 */ return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情況加 return 0; } /**************************************************************** * function name : ThreadPoolAddWorkUnlimit * functional description : 向線程池投遞任務 * input parameter : pthis 線程池指針 process 回調函數 arg 回調函數的參數 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; } else { pool->queue_head = newworker; } assert (pool->queue_head != NULL); pool->cur_queue_size++; int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if(0 == FreeThreadNum) /**< 只判斷是否沒有空閑線程 */ { int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; if (pool->current_pthread_num > pool->max_thread_num) { pool->max_thread_num = pool->current_pthread_num; } pool->current_pthread_task_num++; pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情況加 return 0; } /**************************************************************** * function name : ThreadPoolGetThreadMaxNum * functional description : 獲取線程池可容納的最大線程數 * input parameter : pthis 線程池指針 * output parameter : * return value : 線程池可容納的最大線程數 * history : *****************************************************************/ static int ThreadPoolGetThreadMaxNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->max_thread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentThreadNum * functional description : 獲取線程池存放的線程數 * input parameter : pthis 線程池指針 * output parameter : * return value : 線程池存放的線程數 * history : *****************************************************************/ static int ThreadPoolGetCurrentThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentTaskThreadNum * functional description : 獲取當前正在執行任務和已分配任務的線程數目和 * input parameter : pthis 線程池指針 * output parameter : * return value : 當前正在執行任務和已分配任務的線程數目和 * history : *****************************************************************/ static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_task_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentTaskNum * functional description : 獲取線程池等待隊列任務數 * input parameter : pthis 線程池指針 * output parameter : * return value : 等待隊列任務數 * history : *****************************************************************/ static int ThreadPoolGetCurrentTaskNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->cur_queue_size; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolDestroy * functional description : 銷毀線程池 * input parameter : pthis 線程池指針 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ static int ThreadPoolDestroy (void *pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; if (pool->shutdown) /**< 已銷毀 */ { return -1; } pool->shutdown = 1; /**< 銷毀標志置位 */ pthread_cond_broadcast (&(pool->queue_ready)); /**< 喚醒所有pthread_cond_wait()等待線程 */ int i; for (i = 0; i < pool->current_pthread_num; i++) { pthread_join (pool->threadid[i], NULL); /**< 等待所有線程執行結束 */ } free (pool->threadid); /**< 釋放 */ worker_t *head = NULL; while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); /**< 釋放 */ } pthread_mutex_destroy(&(pool->queue_lock)); /**< 銷毀 */ pthread_cond_destroy(&(pool->queue_ready)); /**< 銷毀 */ free (pool); /**< 釋放 */ pool=NULL; return 0; } /**************************************************************** * function name : ThreadPoolRoutine * functional description : 線程池中運行的線程 * input parameter : arg 線程池指針 * output parameter : * return value : NULL * history : *****************************************************************/ static void * ThreadPoolRoutine (void *arg) { CThread_pool_t *pool = (CThread_pool_t *)arg; while (1) { pthread_mutex_lock (&(pool->queue_lock)); /**< 上鎖, pthread_cond_wait()調用會解鎖*/ while ((pool->cur_queue_size == 0) && (!pool->shutdown)) /**< 隊列沒有等待任務*/ { pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); /**< 條件鎖阻塞等待條件信號*/ } if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); pthread_exit (NULL); /**< 釋放線程 */ } assert (pool->cur_queue_size != 0); assert (pool->queue_head != NULL); pool->cur_queue_size--; /**< 等待任務減1,准備執行任務*/ worker_t *worker = pool->queue_head; /**< 取等待隊列任務結點頭*/ pool->queue_head = worker->next; /**< 鏈表后移 */ pthread_mutex_unlock (&(pool->queue_lock)); (*(worker->process)) (worker->arg); /**< 執行回調函數 */ pthread_mutex_lock (&(pool->queue_lock)); pool->current_pthread_task_num--; /**< 函數執行結束 */ free (worker); /**< 釋放任務結點 */ worker = NULL; if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { pthread_mutex_unlock (&(pool->queue_lock)); break; /**< 當池中空閑線程超過 free_pthread_num則將線程釋放回操作系統 */ } pthread_mutex_unlock (&(pool->queue_lock)); } pool->current_pthread_num--; /**< 當前池中線程數減1 */ pthread_exit (NULL); /**< 釋放線程*/ return (void*)NULL; } /**************************************************************** * function name : ThreadPoolConstruct * functional description : 創建線程池 * input parameter : max_num 線程池可容納的最大線程數 free_num 線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統 * output parameter : * return value : 線程池指針 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init (&(pool->queue_lock), NULL); /**< 初始化互斥鎖 */ pthread_cond_init (&(pool->queue_ready), NULL); /**< 初始化條件變量 */ pool->queue_head = NULL; pool->max_thread_num = max_num; /**< 線程池可容納的最大線程數 */ pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = free_num; /**< 線程池允許存在的最大空閑線程 */ pool->threadid = NULL; pool->threadid = (pthread_t *) malloc (max_num * sizeof (pthread_t)); pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; /**< 給函數指針賦值 */ pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; int i = 0; for (i = 0; i < max_num; i++) { pool->current_pthread_num++; /**< 當前池中的線程數 */ pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool); /**< 創建線程 */ usleep(1000); } return pool; } /**************************************************************** * function name : ThreadPoolConstructDefault * functional description : 創建線程池,以默認的方式初始化,未創建線程 * input parameter : * output parameter : * return value : 線程池指針 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstructDefault(void) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = DEFAULT_MAX_THREAD_NUM; /**< 默認值 */ pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = DEFAULT_FREE_THREAD_NUM; /**< 默認值 */ pool->threadid = NULL; pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; return pool; }
xxx.h
/************************************************************************ * module : 線程池頭文件 * file name : lib_thread_pool.h * Author : * version : V1.0 * DATE : * directory : * description : * related document: * ************************************************************************/ /*-----------------------includes-------------------------------*/ #ifndef __PTHREAD_POOL_H__ #define __PTHREAD_POOL_H__ #include <pthread.h> /*---------------constants/macro definition---------------------*/ #define DEFAULT_MAX_THREAD_NUM 100 #define DEFAULT_FREE_THREAD_NUM 10 typedef struct worker_t worker_t; typedef struct CThread_pool_t CThread_pool_t; /*---------------global variables definition-----------------------*/ /*線程池任務結點*/ struct worker_t { void *(*process) (void *arg); /**< 回調函數 */ int paratype; /**< 函數類型(預留) */ void *arg; /**< 回調函數參數 */ struct worker_t *next; /**< 連接下一個任務結點 */ }; /*線程池控制器*/ struct CThread_pool_t { pthread_mutex_t queue_lock; /**< 互斥鎖 */ pthread_cond_t queue_ready; /**< 條件變量 */ worker_t *queue_head; /**< 任務結點鏈表,保存所有投遞的任務 */ int shutdown; /**< 線程池銷毀標志,1 - 銷毀 */ pthread_t *threadid; /**< 線程ID */ int max_thread_num; /**< 線程池可容納的最大線程數 */ int current_pthread_num; /**< 當前線程池存放的線程數 */ int current_pthread_task_num; /**< 當前正在執行任務和已分配任務的線程數目和 */ int cur_queue_size; /**< 當前等待隊列的任務數目 */ int free_pthread_num; /**< 線程池內允許存在的最大空閑線程數 */ /**************************************************************** * function name : ThreadPoolAddWorkLimit * functional description : 向線程池投遞任務 * input parameter : pthis 線程池指針 process 回調函數 arg 回調函數的參數 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); /**************************************************************** * function name : ThreadPoolAddWorkUnlimit * functional description : 向線程池投遞任務,無空閑線程則阻塞 * input parameter : pthis 線程池指針 process 回調函數 arg 回調函數的參數 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); /**************************************************************** * function name : ThreadPoolGetThreadMaxNum * functional description : 獲取線程池可容納的最大線程數 * input parameter : pthis 線程池指針 * output parameter : * return value : 線程池可容納的最大線程數 * history : *****************************************************************/ int (*GetMaxThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentThreadNum * functional description : 獲取線程池存放的線程數 * input parameter : pthis 線程池指針 * output parameter : * return value : 線程池存放的線程數 * history : *****************************************************************/ int (*GetCurThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentTaskThreadNum * functional description : 獲取當前正在執行任務和已分配任務的線程數目和 * input parameter : pthis 線程池指針 * output parameter : * return value : 當前正在執行任務和已分配任務的線程數目和 * history : *****************************************************************/ int (*GetCurTaskThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentTaskNum * functional description : 獲取線程池等待隊列任務數 * input parameter : pthis 線程池指針 * output parameter : * return value : 等待隊列任務數 * history : *****************************************************************/ int (*GetCurTaskNum) (void *pthis); /**************************************************************** * function name : ThreadPoolDestroy * functional description : 銷毀線程池 * input parameter : pthis 線程池指針 * output parameter : * return value : 0 - 成功;-1 - 失敗 * history : *****************************************************************/ int (*Destruct) (void *pthis); }; /*---------------functions declaration--------------------------*/ /**************************************************************** * function name : ThreadPoolConstruct * functional description : 創建線程池 * input parameter : max_num 線程池可容納的最大線程數 free_num 線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統 * output parameter : * return value : 線程池指針 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num); /**************************************************************** * function name : ThreadPoolConstructDefault * functional description : 創建線程池,以默認的方式初始化,未創建線程 * input parameter : * output parameter : * return value : 線程池指針 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstructDefault(void); #endif
main.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include <string.h> #include "lib_thread_pool.h" static void* thread_1(void* arg); static void* thread_2(void* arg); static void* thread_3(void* arg); static void DisplayPoolStatus(CThread_pool_t* pPool); int nKillThread = 0; int main() { CThread_pool_t* pThreadPool = NULL; pThreadPool = ThreadPoolConstruct(2, 1); int nNumInput = 5; char LogInput[] = "OK!"; DisplayPoolStatus(pThreadPool); /*可用AddWorkLimit()替換看執行的效果*/ pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL); /* * 沒加延遲發現連續投遞任務時pthread_cond_wait()會收不到信號pthread_cond_signal() !! * 因為AddWorkUnlimit()進去后調用pthread_mutex_lock()把互斥鎖鎖上,導致pthread_cond_wait() * 收不到信號!!也可在AddWorkUnlimit()里面加個延遲,一般情況可能也遇不到這個問題 */ usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput); usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput); usleep(10); DisplayPoolStatus(pThreadPool); nKillThread = 1; usleep(100); /**< 先讓線程退出 */ DisplayPoolStatus(pThreadPool); nKillThread = 2; usleep(100); DisplayPoolStatus(pThreadPool); nKillThread = 3; usleep(100); DisplayPoolStatus(pThreadPool); pThreadPool->Destruct((void*)pThreadPool); return 0; } static void* thread_1(void* arg) { printf("Thread 1 is running !\n"); while(nKillThread != 1) usleep(10); return NULL; } static void* thread_2(void* arg) { int nNum = (int)arg; printf("Thread 2 is running !\n"); printf("Get Number %d\n", nNum); while(nKillThread != 2) usleep(10); return NULL; } static void* thread_3(void* arg) { char *pLog = (char*)arg; printf("Thread 3 is running !\n"); printf("Get String %s\n", pLog); while(nKillThread != 3) usleep(10); return NULL; } static void DisplayPoolStatus(CThread_pool_t* pPool) { static int nCount = 1; printf("******************\n"); printf("nCount = %d\n", nCount++); printf("max_thread_num = %d\n", pPool->GetMaxThreadNum((void*)pPool)); printf("current_pthread_num = %d\n", pPool->GetCurThreadNum((void*)pPool)); printf("current_pthread_task_num = %d\n", pPool->GetCurTaskThreadNum((void*)pPool)); printf("cur_queue_size = %d\n", pPool->GetCurTaskNum((void*)pPool)); printf("******************\n"); }