Linux 多線程編程之 線程池 的原理和一個簡單的C實現,提高對多線程編
程的認知,同步處理等操作,以及如何在實際項目中高效的利用多線程開
發。
1. 線程池介紹
為什么需要線程池???
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務
器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,
但處理時間卻相對較短。
傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創
建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就
是是“即時創建,即時銷毀”的策略。盡管與創建進程相比,創建線程的時
間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執
行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態,
這筆開銷將是不可忽略的。
線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對
多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,
因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的
延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過
適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,
就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從
而可以防止資源不足。
2. 線程池結構
2.1 線程池任務結點結構
線程池任務結點用來保存用戶投遞過來的的任務,並放入線程池中的線程來執行,任務結構如下:
// 線程池任務結點
struct worker_t { void * (* process)(void * arg); /*回調函數*/ int paratype; /*函數類型(預留)*/ void * arg; /*回調函數參數*/ struct worker_t * next; /*鏈接下一個任務節點*/ };
2.2 線程池控制器
線程池控制器用來對線程池進行控制管理,描述當前線程池的最基本信息,包括任務的投遞,線
程池狀態的更新與查詢,線程池的銷毀等,其結構如下:
/*線程控制器*/ struct CThread_pool_t { pthread_mutex_t queue_lock; /*互斥鎖*/ pthread_cond_t queue_ready; /*條件變量*/ worker_t * queue_head; /*任務節點鏈表 保存所有投遞的任務*/ int shutdown; /*線程池銷毀標志 1-銷毀*/ pthread_t * threadid; /*線程ID*/ int max_thread_num; /*線程池可容納最大線程數*/ int current_pthread_num; /*當前線程池存放的線程*/ int current_pthread_task_num; /*當前已經執行任務和已分配任務的線程數目和*/ int current_wait_queue_num; /*當前等待隊列的的任務數目*/ int free_pthread_num; /*線程池允許最大的空閑線程數/*/ /** * function: ThreadPoolAddWorkUnlimit * description: 向線程池投遞任務 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Valr: 0 成功 * -1 失敗 */ int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg); /** * function: ThreadPoolAddWorkLimit * description: 向線程池投遞任務,無空閑線程則阻塞 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Val: 0 成功 * -1 失敗 */ int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg); /** * function: ThreadPoolGetThreadMaxNum * description: 獲取線程池可容納的最大線程數 * input param: pthis 線程池指針 */ int (* GetThreadMaxNum)(void * pthis); /** * function: ThreadPoolGetCurrentThreadNum * description: 獲取線程池存放的線程數 * input param: pthis 線程池指針 * return Val: 線程池存放的線程數 */ int (* GetCurrentThreadNum)(void * pthis); /** * function: ThreadPoolGetCurrentTaskThreadNum * description: 獲取當前正在執行任務和已經分配任務的線程數目和 * input param: pthis 線程池指針 * return Val: 當前正在執行任務和已經分配任務的線程數目和 */ int (* GetCurrentTaskThreadNum)(void * pthis); /** * function: ThreadPoolGetCurrentWaitTaskNum * description: 獲取線程池等待隊列任務數 * input param: pthis 線程池指針 * return Val: 等待隊列任務數 */ int (* GetCurrentWaitTaskNum)(void * pthis); /** * function: ThreadPoolDestroy * description: 銷毀線程池 * input param: pthis 線程池指針 * return Val: 0 成功 * -1 失敗 */ int (* Destroy)(void * pthis); };
2.3 線程池運行結構
解釋:
1) 圖中的線程池中的"空閑"和"執行"分別表示空閑線程和執行線程,空閑線程指在正在等待任務的線程,
同樣執行線程指正在執行任務的線程, 兩者是相互轉換的。當用戶投遞任務過來則用空閑線程來執行
該任務,且空閑線程狀態轉換為執行線程;當任務執行完后,執行線程狀態轉變為空閑線程。
2) 創建線程池時,正常情況會創建一定數量的線程, 所有線程初始化為空閑線程,線程阻塞等待用戶
投遞任務。
3) 用戶投遞的任務首先放入等待隊列queue_head 鏈表中, 如果線程池中有空閑線程則放入空閑線程中
執行,否則根據條件選擇繼續等待空閑線程或者新建一個線程來執行,新建的線程將放入線程池中。
4) 執行的任務會從等待隊列中脫離,並在任務執行完后釋放任務結點worker_t
3. 線程池控制 / 部分函數解釋
3.1 線程池創建
創建 max_num 個線程 ThreadPoolRoutine,即空閑線程
/** * function: ThreadPoolConstruct * description: 構建線程池 * input param: max_num 線程池可容納的最大線程數 * free_num 線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統 * return Val: 線程池指針 */ CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num) { int i = 0; CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); if(NULL == pool) return NULL; memset(pool, 0, sizeof(CThread_pool_t)); /*初始化互斥鎖*/ pthread_mutex_init(&(pool->queue_lock), NULL); /*初始化條件變量*/ pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_num; // 線程池可容納的最大線程數 pool->current_wait_queue_num = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = free_num; // 線程池允許存在最大空閑線程 pool->threadid = NULL; pool->threadid = (pthread_t *)malloc(max_num*sizeof(pthread_t)); /*該函數指針賦值*/ pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destroy = ThreadPoolDestroy; pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; for(i=0; i<max_num; i++) { pool->current_pthread_num++; // 當前池中的線程數 /*創建線程*/ pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool); usleep(1000); } return pool; }
3.2 投遞任務
/** * function: ThreadPoolAddWorkLimit * description: 向線程池投遞任務,無空閑線程則阻塞 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Val: 0 成功 * -1 失敗 */ int ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg) { // int FreeThreadNum = 0; // int CurrentPthreadNum = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; /*為添加的任務隊列節點分配內存*/ worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); if(NULL == newworker) return -1; newworker->process = process; // 回調函數,在線程ThreadPoolRoutine()中執行 newworker->arg = arg; // 回調函數參數 newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); /*插入新任務隊列節點*/ worker_t * member = pool->queue_head; // 指向任務隊列鏈表整體 if(member != NULL) { while(member->next != NULL) // 隊列中有節點 member = member->next; // member指針往后移動 member->next = newworker; // 插入到隊列鏈表尾部 } else pool->queue_head = newworker; // 插入到隊列鏈表頭 assert(pool->queue_head != NULL); pool->current_wait_queue_num++; // 等待隊列加1 /*空閑的線程= 當前線程池存放的線程 - 當前已經執行任務和已分配任務的線程數目和*/ int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; /*如果沒有空閑線程且池中當前線程數不超過可容納最大線程*/ if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { //-> 條件為真進行新線程創建 int CurrentPthreadNum = pool->current_pthread_num; /*新增線程*/ pool->threadid = (pthread_t *)realloc(pool->threadid, (CurrentPthreadNum+1) * sizeof(pthread_t)); pthread_create(&(pool->threadid[CurrentPthreadNum]), NULL, ThreadPoolRoutine, (void *)pool); /*當前線程池中線程總數加1*/ pool->current_pthread_num++; /*分配任務線程數加1*/ pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); /*發送信號給一個處與條件阻塞等待狀態的線程*/ pthread_cond_signal(&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); /*發送信號給一個處與條件阻塞等待狀態的線程*/ pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情況 return 0; }
投遞任務時先創建一個任務結點保存回調函數和函數參數,並將任務結點放入等待隊列中,在代碼中
注釋"//->條件為真創建新線程",realloc() 會在保存原始內存中的數據不變的基礎上新增1個sizeof(pthread_t)
大小內存。之后更新current_pthread_num,和current_pthread_task_num;並發送信號
pthread_cond_signal(&(pool->queue_read)),給一個處於條件阻塞等待狀態的線程,即線程ThreadPoolRoutin()
中的pthread_cond_wait(&(pool->queue_read), &(pool->queue_lock))阻塞等待接收信號,重點講互
斥鎖和添加變量:
pthread_mutex_t queue_lock; /**< 互斥鎖*/
pthread_cond_t queue_ready; /**< 條件變量*/
這兩個變量時線程池實現中很重要的點,這里簡要介紹代碼中會用到的相關函數功能;
3.3 執行線程
/** * function: ThreadPoolRoutine * description: 線程池中執行的線程 * input param: arg 線程池指針 */ void * ThreadPoolRoutine(void * arg) { CThread_pool_t * pool = (CThread_pool_t *)arg; while(1) { /*上鎖,pthread_cond_wait()調用會解鎖*/ pthread_mutex_lock(&(pool->queue_lock)); /*隊列沒有等待任務*/ while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) { /*條件鎖阻塞等待條件信號*/ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } if(pool->shutdown) { pthread_mutex_unlock(&(pool->queue_lock)); pthread_exit(NULL); // 釋放線程 } assert(pool->current_wait_queue_num != 0); assert(pool->queue_head != NULL); pool->current_wait_queue_num--; // 等待任務減1,准備執行任務 worker_t * worker = pool->queue_head; // 去等待隊列任務節點頭 pool->queue_head = worker->next; // 鏈表后移 pthread_mutex_unlock(&(pool->queue_lock)); (* (worker->process))(worker->arg); // 執行回調函數 pthread_mutex_lock(&(pool->queue_lock)); pool->current_pthread_task_num--; // 函數執行結束 free(worker); // 釋放任務結點 worker = NULL; if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { pthread_mutex_unlock(&(pool->queue_lock)); break; // 當線程池中空閑線程超過 free_pthread_num 則將線程釋放回操作系統 } pthread_mutex_unlock(&(pool->queue_lock)); } pool->current_pthread_num--; // 當前線程數減1 pthread_exit(NULL); // 釋放線程 return (void *)NULL; }
這個就是用來執行任務的線程,在初始化創建線程時所有線程都全部阻塞在pthread_cond_wait()處
此時的線程就為空閑線程,也就是線程被掛起,當收到信號並取得互斥鎖時, 表明任務投遞過來
則獲取等待隊列里的任務結點並執行回調函數; 函數執行結束后回去判斷當前等待隊列是否還有任
務,有則接下去執行,否則重新阻塞回到空閑線程狀態。
4. 完整代碼實現
4.1 CThreadPool.h 文件
/** * 線程池頭文件 * **/ #ifndef _CTHREADPOOL_H_ #define _CTHREADPOOL_H_ #include <pthread.h> /*線程池可容納最大線程數*/ #define DEFAULT_MAX_THREAD_NUM 100 /*線程池允許最大的空閑線程,超過則將線程釋放回操作系統*/ #define DEFAULT_FREE_THREAD_NUM 10 typedef struct worker_t worker_t; typedef struct CThread_pool_t CThread_pool_t; /*線程池任務節點*/ struct worker_t { void * (* process)(void * arg); /*回調函數*/ int paratype; /*函數類型(預留)*/ void * arg; /*回調函數參數*/ struct worker_t * next; /*鏈接下一個任務節點*/ }; /*線程控制器*/ struct CThread_pool_t { pthread_mutex_t queue_lock; /*互斥鎖*/ pthread_cond_t queue_ready; /*條件變量*/ worker_t * queue_head; /*任務節點鏈表 保存所有投遞的任務*/ int shutdown; /*線程池銷毀標志 1-銷毀*/ pthread_t * threadid; /*線程ID*/ int max_thread_num; /*線程池可容納最大線程數*/ int current_pthread_num; /*當前線程池存放的線程*/ int current_pthread_task_num; /*當前已經執行任務和已分配任務的線程數目和*/ int current_wait_queue_num; /*當前等待隊列的的任務數目*/ int free_pthread_num; /*線程池允許最大的空閑線程數/*/ /** * function: ThreadPoolAddWorkUnlimit * description: 向線程池投遞任務 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Valr: 0 成功 * -1 失敗 */ int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg); /** * function: ThreadPoolAddWorkLimit * description: 向線程池投遞任務,無空閑線程則阻塞 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Val: 0 成功 * -1 失敗 */ int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg); /** * function: ThreadPoolGetThreadMaxNum * description: 獲取線程池可容納的最大線程數 * input param: pthis 線程池指針 */ int (* GetThreadMaxNum)(void * pthis); /** * function: ThreadPoolGetCurrentThreadNum * description: 獲取線程池存放的線程數 * input param: pthis 線程池指針 * return Val: 線程池存放的線程數 */ int (* GetCurrentThreadNum)(void * pthis); /** * function: ThreadPoolGetCurrentTaskThreadNum * description: 獲取當前正在執行任務和已經分配任務的線程數目和 * input param: pthis 線程池指針 * return Val: 當前正在執行任務和已經分配任務的線程數目和 */ int (* GetCurrentTaskThreadNum)(void * pthis); /** * function: ThreadPoolGetCurrentWaitTaskNum * description: 獲取線程池等待隊列任務數 * input param: pthis 線程池指針 * return Val: 等待隊列任務數 */ int (* GetCurrentWaitTaskNum)(void * pthis); /** * function: ThreadPoolDestroy * description: 銷毀線程池 * input param: pthis 線程池指針 * return Val: 0 成功 * -1 失敗 */ int (* Destroy)(void * pthis); }; /** * function: ThreadPoolConstruct * description: 構建線程池 * input param: max_num 線程池可容納的最大線程數 * free_num 線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統 * return Val: 線程池指針 */ CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num); /** * function: ThreadPoolConstructDefault * description: 創建線程池,以默認的方式初始化,未創建線程 * * return Val: 線程池指針 */ CThread_pool_t * ThreadPoolConstructDefault(void); #endif // _CTHREADPOOL_H_
4.2 CThreadPool.c 文件
/** * 線程池實現 * **/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include "CThreadPool.h" void * ThreadPoolRoutine(void * arg); /** * function: ThreadPoolAddWorkLimit * description: 向線程池投遞任務,無空閑線程則阻塞 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Val: 0 成功 * -1 失敗 */ int ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg) { // int FreeThreadNum = 0; // int CurrentPthreadNum = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; /*為添加的任務隊列節點分配內存*/ worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); if(NULL == newworker) return -1; newworker->process = process; // 回調函數,在線程ThreadPoolRoutine()中執行 newworker->arg = arg; // 回調函數參數 newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); /*插入新任務隊列節點*/ worker_t * member = pool->queue_head; // 指向任務隊列鏈表整體 if(member != NULL) { while(member->next != NULL) // 隊列中有節點 member = member->next; // member指針往后移動 member->next = newworker; // 插入到隊列鏈表尾部 } else pool->queue_head = newworker; // 插入到隊列鏈表頭 assert(pool->queue_head != NULL); pool->current_wait_queue_num++; // 等待隊列加1 /*空閑的線程= 當前線程池存放的線程 - 當前已經執行任務和已分配任務的線程數目和*/ int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; /*如果沒有空閑線程且池中當前線程數不超過可容納最大線程*/ if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { int CurrentPthreadNum = pool->current_pthread_num; /*新增線程*/ pool->threadid = (pthread_t *)realloc(pool->threadid, (CurrentPthreadNum+1) * sizeof(pthread_t)); pthread_create(&(pool->threadid[CurrentPthreadNum]), NULL, ThreadPoolRoutine, (void *)pool); /*當前線程池中線程總數加1*/ pool->current_pthread_num++; /*分配任務線程數加1*/ pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); /*發送信號給一個處與條件阻塞等待狀態的線程*/ pthread_cond_signal(&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); /*發送信號給一個處與條件阻塞等待狀態的線程*/ pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情況 return 0; } /** * function: ThreadPoolAddWorkUnlimit * description: 向線程池投遞任務 * input param: pthis 線程池指針 * process 回調函數 * arg 回調函數參數 * return Valr: 0 成功 * -1 失敗 */ int ThreadPoolAddWorkUnlimit(void * pthis, void * (* process)(void * arg), void * arg) { // int FreeThreadNum = 0; // int CurrentPthreadNum = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; /*給新任務隊列節點分配內存*/ worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); if(NULL == newworker) return -1; newworker->process = process; // 回調函數 newworker->arg = arg; // 回調函數參數 newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); /*新節點插入任務隊列鏈表操作*/ worker_t * member = pool->queue_head; if(member != NULL) { while(member->next != NULL) member = member->next; member->next = newworker; // 插入隊列鏈表尾部 } else pool->queue_head = newworker; // 插入到頭(也就是第一個節點,之前鏈表沒有節點) assert(pool->queue_head != NULL); pool->current_wait_queue_num++; // 當前等待隊列的的任務數目+1 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; /*只判斷是否沒有空閑線程*/ if(0 == FreeThreadNum) { int CurrentPthreadNum = pool->current_pthread_num; pool->threadid = (pthread_t *)realloc(pool->threadid, (CurrentPthreadNum+1)*sizeof(pthread_t)); pthread_create(&(pool->threadid[CurrentPthreadNum]),NULL, ThreadPoolRoutine, (void *)pool); pool->current_pthread_num++; if(pool->current_pthread_num > pool->max_thread_num) pool->max_thread_num = pool->current_pthread_num; pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); return 0; } /** * function: ThreadPoolGetThreadMaxNum * description: 獲取線程池可容納的最大線程數 * input param: pthis 線程池指針 * return val: 線程池可容納的最大線程數 */ int ThreadPoolGetThreadMaxNum(void * pthis) { int num = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); num = pool->max_thread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /** * function: ThreadPoolGetCurrentThreadNum * description: 獲取線程池存放的線程數 * input param: pthis 線程池指針 * return Val: 線程池存放的線程數 */ int ThreadPoolGetCurrentThreadNum(void * pthis) { int num = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); num = pool->current_pthread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /** * function: ThreadPoolGetCurrentTaskThreadNum * description: 獲取當前正在執行任務和已經分配任務的線程數目和 * input param: pthis 線程池指針 * return Val: 當前正在執行任務和已經分配任務的線程數目和 */ int ThreadPoolGetCurrentTaskThreadNum(void * pthis) { int num = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); num = pool->current_pthread_task_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /** * function: ThreadPoolGetCurrentWaitTaskNum * description: 獲取線程池等待隊列任務數 * input param: pthis 線程池指針 * return Val: 等待隊列任務數 */ int ThreadPoolGetCurrentWaitTaskNum(void * pthis) { int num = 0; CThread_pool_t * pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); num = pool->current_wait_queue_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /** * function: ThreadPoolDestroy * description: 銷毀線程池 * input param: pthis 線程池指針 * return Val: 0 成功 * -1 失敗 */ int ThreadPoolDestroy(void * pthis) { int i; CThread_pool_t * pool = (CThread_pool_t *)pthis; if(pool->shutdown) // 已銷毀 return -1; pool->shutdown = 1; // 銷毀標志置位 /*喚醒所有pthread_cond_wait()等待線程*/ pthread_cond_broadcast(&(pool->queue_ready)); for(i=0; i<pool->current_pthread_num; i++) pthread_join(pool->threadid[i], NULL); // 等待所有線程執行結束 free(pool->threadid); // 釋放 /*銷毀任務隊列鏈表*/ worker_t * head = NULL; while(pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } /*銷毀鎖*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free(pool); pool = NULL; return 0; } /** * function: ThreadPoolRoutine * description: 線程池中運行的線程 * input param: arg 線程池指針 */ void * ThreadPoolRoutine(void * arg) { CThread_pool_t * pool = (CThread_pool_t *)arg; while(1) { /*上鎖,pthread_cond_wait()調用會解鎖*/ pthread_mutex_lock(&(pool->queue_lock)); /*隊列沒有等待任務*/ while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) { /*條件鎖阻塞等待條件信號*/ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } if(pool->shutdown) { pthread_mutex_unlock(&(pool->queue_lock)); pthread_exit(NULL); // 釋放線程 } assert(pool->current_wait_queue_num != 0); assert(pool->queue_head != NULL); pool->current_wait_queue_num--; // 等待任務減1,准備執行任務 worker_t * worker = pool->queue_head; // 去等待隊列任務節點頭 pool->queue_head = worker->next; // 鏈表后移 pthread_mutex_unlock(&(pool->queue_lock)); (* (worker->process))(worker->arg); // 執行回調函數 pthread_mutex_lock(&(pool->queue_lock)); pool->current_pthread_task_num--; // 函數執行結束 free(worker); // 釋放任務結點 worker = NULL; if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { pthread_mutex_unlock(&(pool->queue_lock)); break; // 當線程池中空閑線程超過 free_pthread_num 則將線程釋放回操作系統 } pthread_mutex_unlock(&(pool->queue_lock)); } pool->current_pthread_num--; // 當前線程數減1 pthread_exit(NULL); // 釋放線程 return (void *)NULL; } /** * function: ThreadPoolConstruct * description: 構建線程池 * input param: max_num 線程池可容納的最大線程數 * free_num 線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統 * return Val: 線程池指針 */ CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num) { int i = 0; CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); if(NULL == pool) return NULL; memset(pool, 0, sizeof(CThread_pool_t)); /*初始化互斥鎖*/ pthread_mutex_init(&(pool->queue_lock), NULL); /*初始化條件變量*/ pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_num; // 線程池可容納的最大線程數 pool->current_wait_queue_num = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = free_num; // 線程池允許存在最大空閑線程 pool->threadid = NULL; pool->threadid = (pthread_t *)malloc(max_num*sizeof(pthread_t)); /*該函數指針賦值*/ pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destroy = ThreadPoolDestroy; pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; for(i=0; i<max_num; i++) { pool->current_pthread_num++; // 當前池中的線程數 /*創建線程*/ pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool); usleep(1000); } return pool; } /** * function: ThreadPoolConstructDefault * description: 創建線程池,以默認的方式初始化,未創建線程 * * return Val: 線程池指針 */ CThread_pool_t * ThreadPoolConstructDefault(void) { CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); if(NULL == pool) return NULL; memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = DEFAULT_MAX_THREAD_NUM; // 默認值 pool->current_wait_queue_num = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = DEFAULT_FREE_THREAD_NUM; // 默認值 pool->threadid = NULL; /*該函數指針賦值*/ pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destroy = ThreadPoolDestroy; pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; return pool; }
4.3 測試 main.c 文件
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include <string.h> #include "CThreadPool.h" void * thread_1(void * arg); void * thread_2(void * arg); void * thread_3(void * arg); void DisplayPoolStatus(CThread_pool_t * pPool); int nKillThread = 0; int main() { CThread_pool_t * pThreadPool = NULL; pThreadPool = ThreadPoolConstruct(5, 1); int nNumInput = 5; char LogInput[] = "OK!"; DisplayPoolStatus(pThreadPool); /*可用AddWorkLimit()替換看執行的效果*/ pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_1, (void *)NULL); /* * 沒加延遲發現連續投遞任務時pthread_cond_wait()會收不到信號pthread_cond_signal() !! * 因為AddWorkUnlimit()進去后調用pthread_mutex_lock()把互斥鎖鎖上,導致pthread_cond_wait() * 收不到信號!!也可在AddWorkUnlimit()里面加個延遲,一般情況可能也遇不到這個問題 */ usleep(10); pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_2, (void *)nNumInput); usleep(10); pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_3, (void *)LogInput); usleep(10); DisplayPoolStatus(pThreadPool); nKillThread = 1; usleep(100); /**< 先讓線程退出 */ DisplayPoolStatus(pThreadPool); nKillThread = 2; usleep(100); DisplayPoolStatus(pThreadPool); nKillThread = 3; usleep(100); DisplayPoolStatus(pThreadPool); pThreadPool->Destroy((void*)pThreadPool); return 0; } void * thread_1(void * arg) { printf("Thread 1 is running !\n"); while(nKillThread != 1) usleep(10); return NULL; } void *
thread_2(void * arg) { int nNum = (int)arg; printf("Thread 2 is running !\n"); printf("Get Number %d\n", nNum); while(nKillThread != 2) usleep(10); return NULL; } void *
thread_3(void * arg) { char * pLog = (char *)arg; printf("Thread 3 is running !\n"); printf("Get String %s\n", pLog); while(nKillThread != 3) usleep(10); return NULL; } void
DisplayPoolStatus(CThread_pool_t * pPool) { static int nCount = 1; printf("****************************\n"); printf("nCount = %d\n", nCount++); printf("max_thread_num = %d\n", pPool->GetThreadMaxNum((void *)pPool)); printf("current_pthread_num = %d\n", pPool->GetCurrentThreadNum((void *)pPool)); printf("current_pthread_task_num = %d\n", pPool->GetCurrentTaskThreadNum((void *)pPool)); printf("current_wait_queue_num = %d\n", pPool->GetCurrentWaitTaskNum((void *)pPool)); printf("****************************\n"); }
4.4 Makefile
簡單寫一個makefile
CC = gcc CFLAGS = -g -Wall -o2 LIB = -lpthread RUNE = $(CC) $(CFLAGS) $(object) -o $(exe) $(LIB) RUNO = $(CC) $(CFLAGS) -c $< -o $@ $(LIB) .RHONY:clean object = main.o CThreadPool.o exe = CThreadpool $(exe):$(object) $(RUNE) %.o:%.c CThreadPool.h $(RUNO) %.o:%.c $(RUNO) clean: -rm -rf *.o CThreadpool *~
注意:使用模式規則,能引入用戶自定義變量,為多個文件建立相同的規則,規則中的相關
文件前必須用“%”表明。關於Makefile的一些規則解釋見另一篇
5. 參考
感謝下面博主的貢獻,特別致謝(死去的龍7)博主!!!
天行健,君子以自強不息~ 祝諸位幸福安好!!!Thanks again.
死去的龍7:https://www.cnblogs.com/deadlong7/p/4155663.html
青山小和尚:https://blog.csdn.net/qq_36359022/article/details/78796784
developerWorks:https://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/
6. 后記
無極生太極
太極生兩儀
兩儀生四象
四象生八卦
八卦:qian乾 xun巽 li離 gen艮 dui兌 kan坎 zhen震 kun坤
宇宙從混沌未分的“無極”而來,無極動而生太極,太極分陰陽兩儀,在由
陰陽分化出太陰、太陽、少陰、少陽這四象,四象分化而為八卦, 八卦
代表着世界的八種基本屬性,可以用“天地風山水火雷澤”來概括《說卦》
認為:
乾,鍵也
坤,順也
震,動也
巽,入也
坎,陷也
離,麗也
艮,止也
兌,說也
八卦又分出六十四卦,但六十四卦並不代表事務演化過程的終結。六十四
卦最后兩卦為“既濟” 和 “未濟”,象征事務發展到最后必然有一個結果,但
這個結果作為一個"節點“,以它為開始將展開另一次全新的演變, 所以
“物不可窮也,故受之以未濟終焉",