Linux簡單線程池實現(帶源碼)


  這里給個線程池的實現代碼,里面帶有個應用小例子,方便學習使用,代碼 GCC 編譯可用。參照代碼看下面介紹的線程池原理跟容易接受,百度雲下載鏈接:

  http://pan.baidu.com/s/1i3zMHDV

一.線程池簡介

  為什么使用線程池?

  目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。 傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即 時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態,這筆開銷將是不可忽略的。

  線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足。

二.線程池的結構

2.1. 線程池任務結點

  線程池任務結點用來保存用戶投遞過來的任務,並放入線程池中的線程執行。其結構名為worker_t,定義如下:

復制代碼
1 /*線程池任務結點*/
2 struct worker_t
3 {
4     void *(*process) (void *arg);    /**< 回調函數 */
5     int   paratype;                    /**< 函數類型(預留) */
6     void *arg;                        /**< 回調函數參數 */
7     struct worker_t *next;            /**< 連接下一個任務結點 */
8 };
復制代碼

2.2. 線程池控制器

  線程池控制器用來對線程池進行控制管理,包括任務的投遞、線程池狀態的更新與查詢、線程池的銷毀等。其結構名為CThread_pool_t,定義如下:

復制代碼
 1 /*線程池控制器*/
 2 struct CThread_pool_t
 3 {
 4     pthread_mutex_t queue_lock;    /**< 互斥鎖 */
 5     pthread_cond_t queue_ready;    /**< 條件變量 */
 6 
 7     worker_t *queue_head;        /**< 任務結點鏈表,保存所有投遞的任務 */
 8     int shutdown;            /**< 線程池銷毀標志,1 -> 銷毀 */
 9     pthread_t *threadid;        /**< 線程ID */
10     int max_thread_num;        /**< 線程池可容納的最大線程數 */
11     int current_pthread_num;    /**< 當前線程池存放的線程數 */
12     int current_pthread_task_num;    /**< 當前正在執行任務和已分配任務的線程數目和 */
13     int cur_queue_size;        /**< 當前等待隊列的任務數目 */
14     int free_pthread_num;        /**< 線程池內允許存在的最大空閑線程數 */
15 
16     /*向線程池投遞任務*/
17     int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 
18     /*向線程池投遞任務,無空閑線程則阻塞*/
19     int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 
20     /*獲取線程池可容納的最大線程數*/
21     int (*GetMaxThreadNum) (void *pthis); 
22     /*獲取線程池存放的線程數*/
23     int (*GetCurThreadNum) (void *pthis); 
24     /*獲取當前正在執行任務和已分配任務的線程數目和*/
25     int (*GetCurTaskThreadNum) (void *pthis); 
26     /*獲取線程池等待隊列任務數*/
27     int (*GetCurTaskNum) (void *pthis); 
28     /*銷毀線程池*/
29     int (*Destruct) (void *pthis); 
30 };
復制代碼

2.3. 線程池結構

  線程池的運行結構圖如下:

  根據上圖羅列幾個注意的地方:

  (1)圖中的線程池中的“空閑”和“執行”分別表示空閑線程和執行線程,空閑線程指在正在等待任務的線程,同樣執行線程指正在執行任務的線程,兩者是相互轉換的。當用戶投遞任務過來則用空閑線程來執行該任務,且空閑線程狀態轉變為執行線程;當任務執行完后,執行線程狀態轉變為空閑線程;

  (2)創建線程池時,正常情況會先創建一定數量的線程,所有線程初始為空閑線程,線程阻塞等待用戶投遞任務;

  (3)用戶投遞的任務首先放入等待隊列 queue_head 鏈表中,如果線程池中有空閑線程則放入空閑線程中執行,否則根據條件選擇繼續等待空閑線程或者新建一個線程來執行,新建的線程將放入線程池中;

  (4)執行的任務會從等待隊列中脫離,並在任務執行完后釋放任務結點worker_t;

三.線程池控制

  這里就線程池實現代碼中的部分代碼進行解釋,請參照我上面給的線程池代碼,里面的注釋我標注的還是比較詳細的。

3.1. 線程池創建

復制代碼
 1 CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 
 2 {
 3     ......    //略
 4     for (i = 0; i < max_num; i++) 
 5     {  
 6         pool->current_pthread_num++;    /**< 當前池中的線程數 */
 7         pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);    /**< 創建線程 */
 8         usleep(1000);
 9     }   
10 }
復制代碼

  這里創建了 max_num 個線程 ThreadPoolRoutine( ),即上面提到的空閑線程。

3.2. 投遞任務

復制代碼
 1 static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 
 2 { 
 3     ......    //略
 4     worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
 5     newworker->process     = process;    /**< 回調函數,在線程ThreadPoolRoutine()中執行 */
 6     newworker->arg         = arg;        /**< 回調函數參數 */
 7     newworker->next     = NULL;
 8     
 9     pthread_mutex_lock(&(pool->queue_lock)); 
10     ......    //將任務結點放入等待隊列,略
11     pool->cur_queue_size++;        /**< 等待隊列加1 */
12     
13     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;        
14     if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
15     {/**< 如果沒有空閑線程且池中當前線程數不超過可容納最大線程 */
16         int current_pthread_num = pool->current_pthread_num;
17         pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));     /**< 新增線程 */
18         pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
19         pool->current_pthread_num++;    /**< 當前池中線程總數加1 */    
20         
21         pool->current_pthread_task_num++;    /**< 分配任務的線程數加1 */    
22         pthread_mutex_unlock (&(pool->queue_lock)); 
23         pthread_cond_signal (&(pool->queue_ready));    /**< 發送信號給1個處於條件阻塞等待狀態的線程 */    
24         return 0;
25     }
26     
27     pool->current_pthread_task_num++; 
28     pthread_mutex_unlock(&(pool->queue_lock)); 
29     pthread_cond_signal(&(pool->queue_ready));
30     return 0; 
31 }
復制代碼

  投遞任務時先創建一個任務結點保存回調函數和函數參數,並將任務結點放入等待隊列中。代碼第14行判斷條件為真時,則進行新線程創建,realloc( )會在保存原始內存中的數據不變的基礎上新增1個sizeof (pthread_t)大小的內存。之后更新 current_pthread_num 和 current_pthread_task_num ,並發送信號 pthread_cond_signal(&(pool->queue_ready)) 給一個處於條件阻塞等待狀態的線程,即線程 ThreadPoolRoutine( )中的

pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)) 阻塞等待接收信號。重點講互斥鎖和條件變量:

  pthread_mutex_t queue_lock;   /**< 互斥鎖 */

  pthread_cond_t queue_ready;   /**< 條件變量 */

  這兩個變量是線程池實現里很重要的點,建議網上搜索資料學習,這里簡要介紹先代碼中會用到的相關函數功能:

  放張圖吧,這樣直觀點,線程互斥鎖的一些調用函數一般都有接觸過,這里就不作介紹了。

3.3. 執行線程  

復制代碼
 1 static void * ThreadPoolRoutine (void *arg) 
 2 { 
 3     CThread_pool_t *pool = (CThread_pool_t *)arg;
 4     while (1) 
 5     { 
 6         pthread_mutex_lock (&(pool->queue_lock)); /**< 上鎖, pthread_cond_wait()調用會解鎖*/
 7         
 8         while ((pool->cur_queue_size == 0) && (!pool->shutdown))    /**< 隊列沒有等待任務*/
 9         { 
10             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));     /**< 條件鎖阻塞等待條件信號*/
11         } 
12         if (pool->shutdown) 
13         { 
14             pthread_mutex_unlock (&(pool->queue_lock)); 
15             pthread_exit (NULL);     /**< 釋放線程 */
16         } 
17         
18         .....    //略
19         worker_t *worker     = pool->queue_head;    /**< 取等待隊列任務結點頭*/ 
20         pool->queue_head     = worker->next;     /**< 鏈表后移 */ 
21         
22         pthread_mutex_unlock (&(pool->queue_lock)); 
23         (*(worker->process)) (worker->arg);     /**< 執行回調函數 */ 
24         pthread_mutex_lock (&(pool->queue_lock)); 
25         
26         pool->current_pthread_task_num--;    /**< 函數執行結束 */ 
27         free (worker);     /**< 釋放任務結點 */
28         worker = NULL; 
29         
30         if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
31         {
32             pthread_mutex_unlock (&(pool->queue_lock)); 
33             break;    /**< 當池中空閑線程超過 free_pthread_num則將線程釋放回操作系統 */
34         }
35         pthread_mutex_unlock (&(pool->queue_lock));         
36     } 
37     
38     pool->current_pthread_num--;    /**< 當前池中線程數減1 */
39     pthread_exit (NULL);    /**< 釋放線程*/
40     return (void*)NULL;
41 }
復制代碼

  這個就是用來執行投遞任務的線程,在初始創建線程時所有線程都全部阻塞在pthread_cond_wait( )處,此時的線程狀態就為空閑線程,也就是線程被掛起;

  當收到信號並取得互斥鎖時,表明有任務投遞過來,則獲取等待隊列里的任務結點並開始執行回調函數;

  函數執行結束后回去判斷當前等待隊列是否還有任務,有則接下去執行,否則重新阻塞回到空閑線程狀態;

3.4. 線程銷毀

  線程銷毀主要做的就是銷毀線程和釋放動態內存,自己看代碼就懂了。 

完整代碼:
xxx.c
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <pthread.h> 
#include <assert.h> 
#include <string.h>
#include "lib_thread_pool.h"


/*---------------constants/macro definition---------------------*/

/*---------------global variables definition-----------------------*/

/*---------------functions declaration--------------------------*/
static void * ThreadPoolRoutine (void *arg); 

/*---------------functions definition---------------------------*/

/****************************************************************
* function name         : ThreadPoolAddWorkLimit
* functional description    : 向線程池投遞任務,無空閑線程則阻塞
* input parameter        : pthis    線程池指針
                      process    回調函數
                      arg        回調函數的參數
* output parameter    : 
* return value            : 0 - 成功;-1 - 失敗 
* history                : 
*****************************************************************/
static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
    if (NULL == newworker)
    {
        return -1;
    }
    newworker->process     = process;    /**< 回調函數,在線程ThreadPoolRoutine()中執行 */
    newworker->arg         = arg;        /**< 回調函數參數 */
    newworker->next     = NULL;

    pthread_mutex_lock(&(pool->queue_lock)); 
    
    worker_t *member = pool->queue_head;    /**< 等待隊列任務鏈表 */
    if (member != NULL) 
    { 
        while (member->next != NULL) 
        {
            member = member->next; 
        }
        member->next = newworker;    /**< 放入鏈表尾 */
    } 
    else 
    { 
        pool->queue_head = newworker;    /**< 放入鏈表頭 */
    } 
    
    assert (pool->queue_head != NULL); 
    pool->cur_queue_size++;        /**< 等待隊列加1 */

    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;        
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
    {/**< 如果沒有空閑線程且池中當前線程數不超過可容納最大線程 */
        int current_pthread_num = pool->current_pthread_num;
        pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));     /**< 新增線程 */
        pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
        pool->current_pthread_num++;    /**< 當前池中線程總數加1 */    
        
        pool->current_pthread_task_num++;    /**< 分配任務的線程數加1 */    
        pthread_mutex_unlock (&(pool->queue_lock)); 
        pthread_cond_signal (&(pool->queue_ready));    /**< 發送信號給1個處於條件阻塞等待狀態的線程 */    
        return 0;
    }

    pool->current_pthread_task_num++; 
    pthread_mutex_unlock(&(pool->queue_lock)); 
    pthread_cond_signal(&(pool->queue_ready));
//    usleep(10);    //看情況加
    return 0; 
} 

/****************************************************************
* function name         : ThreadPoolAddWorkUnlimit
* functional description    : 向線程池投遞任務
* input parameter        : pthis    線程池指針
                      process    回調函數
                      arg        回調函數的參數
* output parameter    : 
* return value            : 0 - 成功;-1 - 失敗 
* history                : 
*****************************************************************/
static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
    
    if (NULL == newworker)
    {
        return -1;
    }
    newworker->process     = process;    
    newworker->arg         = arg;         
    newworker->next     = NULL;        

    pthread_mutex_lock(&(pool->queue_lock)); 
    
    worker_t *member = pool->queue_head; 
    if (member != NULL)     
    { 
        while (member->next != NULL) 
        {
            member = member->next; 
        }
        member->next = newworker; 
    } 
    else 
    { 
        pool->queue_head = newworker; 
    } 
    
    assert (pool->queue_head != NULL); 
    pool->cur_queue_size++; 

    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    if(0 == FreeThreadNum)    /**< 只判斷是否沒有空閑線程 */
    {
        int current_pthread_num = pool->current_pthread_num;
        pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); 
        pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
        pool->current_pthread_num++;
        if (pool->current_pthread_num > pool->max_thread_num)
        {
            pool->max_thread_num = pool->current_pthread_num;
        }
        
        pool->current_pthread_task_num++;
        pthread_mutex_unlock (&(pool->queue_lock)); 
        pthread_cond_signal (&(pool->queue_ready)); 
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    pthread_cond_signal(&(pool->queue_ready));     
//    usleep(10);    //看情況加
    return 0; 
} 

/****************************************************************
* function name         : ThreadPoolGetThreadMaxNum
* functional description    : 獲取線程池可容納的最大線程數
* input parameter        : pthis    線程池指針
* output parameter    : 
* return value            : 線程池可容納的最大線程數
* history                : 
*****************************************************************/
static int ThreadPoolGetThreadMaxNum(void* pthis) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock)); 
    int num = pool->max_thread_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    
    return num; 
} 

/****************************************************************
* function name         : ThreadPoolGetCurrentThreadNum
* functional description    : 獲取線程池存放的線程數
* input parameter        : pthis    線程池指針
* output parameter    : 
* return value            : 線程池存放的線程數
* history                : 
*****************************************************************/
static int ThreadPoolGetCurrentThreadNum(void* pthis) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock)); 
    int num = pool->current_pthread_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    
    return num; 
} 

/****************************************************************
* function name         : ThreadPoolGetCurrentTaskThreadNum
* functional description    : 獲取當前正在執行任務和已分配任務的線程數目和
* input parameter        : pthis    線程池指針
* output parameter    : 
* return value            : 當前正在執行任務和已分配任務的線程數目和
* history                : 
*****************************************************************/
static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock)); 
    int num = pool->current_pthread_task_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    
    return num; 
} 

/****************************************************************
* function name         : ThreadPoolGetCurrentTaskNum
* functional description    : 獲取線程池等待隊列任務數
* input parameter        : pthis    線程池指針
* output parameter    : 
* return value            : 等待隊列任務數
* history                : 
*****************************************************************/
static int ThreadPoolGetCurrentTaskNum(void* pthis) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock)); 
    int num = pool->cur_queue_size;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    
    return num; 
} 

/****************************************************************
* function name         : ThreadPoolDestroy
* functional description    : 銷毀線程池
* input parameter        : pthis    線程池指針
* output parameter    : 
* return value            : 0 - 成功;-1 - 失敗
* history                : 
*****************************************************************/
static int ThreadPoolDestroy (void *pthis) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)pthis;
    
    if (pool->shutdown) /**< 已銷毀 */
    {
        return -1;
    }
    pool->shutdown = 1;    /**< 銷毀標志置位 */
    
    pthread_cond_broadcast (&(pool->queue_ready)); /**< 喚醒所有pthread_cond_wait()等待線程 */
    int i; 
    for (i = 0; i < pool->current_pthread_num; i++) 
    {
        pthread_join (pool->threadid[i], NULL); /**< 等待所有線程執行結束 */
    }
    
    free (pool->threadid);    /**< 釋放 */
    worker_t *head = NULL; 
    
    while (pool->queue_head != NULL) 
    { 
        head = pool->queue_head; 
        pool->queue_head = pool->queue_head->next; 
        free (head);    /**< 釋放 */
    } 
    
    pthread_mutex_destroy(&(pool->queue_lock));    /**< 銷毀 */
    pthread_cond_destroy(&(pool->queue_ready)); /**< 銷毀 */     
    free (pool);    /**< 釋放 */ 
    pool=NULL;     
    return 0; 
} 


/****************************************************************
* function name         : ThreadPoolRoutine
* functional description    : 線程池中運行的線程
* input parameter        : arg    線程池指針
* output parameter    : 
* return value            : NULL
* history                : 
*****************************************************************/
static void * ThreadPoolRoutine (void *arg) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *)arg;
    
    while (1) 
    { 
        pthread_mutex_lock (&(pool->queue_lock)); /**< 上鎖, pthread_cond_wait()調用會解鎖*/
        
        while ((pool->cur_queue_size == 0) && (!pool->shutdown))    /**< 隊列沒有等待任務*/
        { 
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));     /**< 條件鎖阻塞等待條件信號*/
        } 
        if (pool->shutdown) 
        { 
            pthread_mutex_unlock (&(pool->queue_lock)); 
            pthread_exit (NULL);     /**< 釋放線程 */
        } 

        assert (pool->cur_queue_size != 0); 
        assert (pool->queue_head != NULL); 
         
        pool->cur_queue_size--;     /**< 等待任務減1,准備執行任務*/ 
        worker_t *worker     = pool->queue_head;    /**< 取等待隊列任務結點頭*/ 
        pool->queue_head     = worker->next;     /**< 鏈表后移 */ 
        
        pthread_mutex_unlock (&(pool->queue_lock)); 
        (*(worker->process)) (worker->arg);     /**< 執行回調函數 */ 
        pthread_mutex_lock (&(pool->queue_lock)); 
        
        pool->current_pthread_task_num--;    /**< 函數執行結束 */ 
        free (worker);     /**< 釋放任務結點 */
        worker = NULL; 

        if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
        {
            pthread_mutex_unlock (&(pool->queue_lock)); 
            break;    /**< 當池中空閑線程超過 free_pthread_num則將線程釋放回操作系統 */
        }
        pthread_mutex_unlock (&(pool->queue_lock));         
    } 
    
    pool->current_pthread_num--;    /**< 當前池中線程數減1 */
    pthread_exit (NULL);    /**< 釋放線程*/
    return (void*)NULL;
} 

/****************************************************************
* function name         : ThreadPoolConstruct
* functional description    : 創建線程池
* input parameter        : max_num    線程池可容納的最大線程數
                      free_num    線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統
* output parameter    : 
* return value            : 線程池指針
* history                : 
*****************************************************************/
CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
    if (NULL == pool)
    {
        return NULL;
    }
    memset(pool, 0, sizeof(CThread_pool_t));
    
    pthread_mutex_init (&(pool->queue_lock), NULL);    /**< 初始化互斥鎖 */
    pthread_cond_init (&(pool->queue_ready), NULL);    /**< 初始化條件變量 */ 

    pool->queue_head                 = NULL; 
    pool->max_thread_num             = max_num;    /**< 線程池可容納的最大線程數 */
    pool->cur_queue_size             = 0; 
    pool->current_pthread_task_num     = 0;
    pool->shutdown                     = 0; 
    pool->current_pthread_num         = 0;
    pool->free_pthread_num             = free_num;    /**< 線程池允許存在的最大空閑線程 */
    pool->threadid                    = NULL;
    pool->threadid                     = (pthread_t *) malloc (max_num * sizeof (pthread_t)); 
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;    /**< 給函數指針賦值 */
    pool->AddWorkLimit                = ThreadPoolAddWorkLimit;
    pool->Destruct                    = ThreadPoolDestroy;
    pool->GetMaxThreadNum            = ThreadPoolGetThreadMaxNum;
    pool->GetCurThreadNum            = ThreadPoolGetCurrentThreadNum;
    pool->GetCurTaskThreadNum        = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurTaskNum                = ThreadPoolGetCurrentTaskNum;
    
    int i = 0; 
    for (i = 0; i < max_num; i++) 
    {  
        pool->current_pthread_num++;    /**< 當前池中的線程數 */
        pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);    /**< 創建線程 */
        usleep(1000);
    } 

    return pool;
} 

/****************************************************************
* function name         : ThreadPoolConstructDefault
* functional description    : 創建線程池,以默認的方式初始化,未創建線程
* input parameter        : 
* output parameter    : 
* return value            : 線程池指針
* history                : 
*****************************************************************/
CThread_pool_t* ThreadPoolConstructDefault(void) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
    if (NULL == pool)
    {
        return NULL;
    }
    memset(pool, 0, sizeof(CThread_pool_t));
    
    pthread_mutex_init(&(pool->queue_lock), NULL); 
    pthread_cond_init(&(pool->queue_ready), NULL); 

    pool->queue_head                 = NULL; 
    pool->max_thread_num             = DEFAULT_MAX_THREAD_NUM;    /**< 默認值 */
    pool->cur_queue_size             = 0; 
    pool->current_pthread_task_num     = 0;
    pool->shutdown                     = 0; 
    pool->current_pthread_num         = 0;
    pool->free_pthread_num             = DEFAULT_FREE_THREAD_NUM;    /**< 默認值 */
    pool->threadid                    = NULL;
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit                = ThreadPoolAddWorkLimit;
    pool->Destruct                    = ThreadPoolDestroy;
    pool->GetMaxThreadNum            = ThreadPoolGetThreadMaxNum;
    pool->GetCurThreadNum            = ThreadPoolGetCurrentThreadNum;
    pool->GetCurTaskThreadNum        = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurTaskNum                = ThreadPoolGetCurrentTaskNum;
    
    return pool;
}

xxx.h

/************************************************************************
* module            : 線程池頭文件
* file name        : lib_thread_pool.h
* Author             : 
* version            : V1.0
* DATE            : 
* directory         : 
* description        : 
* related document: 
* 
************************************************************************/
/*-----------------------includes-------------------------------*/
#ifndef __PTHREAD_POOL_H__
#define __PTHREAD_POOL_H__

#include <pthread.h>

/*---------------constants/macro definition---------------------*/
#define DEFAULT_MAX_THREAD_NUM        100
#define DEFAULT_FREE_THREAD_NUM        10    
typedef struct worker_t worker_t;
typedef struct CThread_pool_t CThread_pool_t;

/*---------------global variables definition-----------------------*/
/*線程池任務結點*/
struct worker_t
{
    void *(*process) (void *arg);    /**< 回調函數 */
    int   paratype;                    /**< 函數類型(預留) */
    void *arg;                        /**< 回調函數參數 */
    struct worker_t *next;            /**< 連接下一個任務結點 */
};

/*線程池控制器*/
struct CThread_pool_t
{
    pthread_mutex_t queue_lock;    /**< 互斥鎖 */
    pthread_cond_t queue_ready;    /**< 條件變量 */

    worker_t *queue_head;    /**< 任務結點鏈表,保存所有投遞的任務 */
    int shutdown;            /**< 線程池銷毀標志,1 - 銷毀 */
    pthread_t *threadid;    /**< 線程ID */
    int max_thread_num;        /**< 線程池可容納的最大線程數 */
    int current_pthread_num;    /**< 當前線程池存放的線程數 */
    int current_pthread_task_num;    /**< 當前正在執行任務和已分配任務的線程數目和 */
    int cur_queue_size;        /**< 當前等待隊列的任務數目 */
    int    free_pthread_num;    /**< 線程池內允許存在的最大空閑線程數 */
    
    /****************************************************************
    * function name         : ThreadPoolAddWorkLimit
    * functional description    : 向線程池投遞任務
    * input parameter        : pthis    線程池指針
                          process    回調函數
                          arg        回調函數的參數
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失敗 
    * history                : 
    *****************************************************************/
    int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 

    /****************************************************************
    * function name         : ThreadPoolAddWorkUnlimit
    * functional description    : 向線程池投遞任務,無空閑線程則阻塞
    * input parameter        : pthis    線程池指針
                          process    回調函數
                          arg        回調函數的參數
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失敗 
    * history                : 
    *****************************************************************/
    int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 
    
    /****************************************************************
    * function name         : ThreadPoolGetThreadMaxNum
    * functional description    : 獲取線程池可容納的最大線程數
    * input parameter        : pthis    線程池指針
    * output parameter    : 
    * return value            : 線程池可容納的最大線程數
    * history                : 
    *****************************************************************/
    int (*GetMaxThreadNum) (void *pthis); 

    /****************************************************************
    * function name         : ThreadPoolGetCurrentThreadNum
    * functional description    : 獲取線程池存放的線程數
    * input parameter        : pthis    線程池指針
    * output parameter    : 
    * return value            : 線程池存放的線程數
    * history                : 
    *****************************************************************/    
    int (*GetCurThreadNum) (void *pthis); 
    
    /****************************************************************
    * function name         : ThreadPoolGetCurrentTaskThreadNum
    * functional description    : 獲取當前正在執行任務和已分配任務的線程數目和
    * input parameter        : pthis    線程池指針
    * output parameter    : 
    * return value            : 當前正在執行任務和已分配任務的線程數目和
    * history                : 
    *****************************************************************/
    int (*GetCurTaskThreadNum) (void *pthis); 
    
    /****************************************************************
    * function name         : ThreadPoolGetCurrentTaskNum
    * functional description    : 獲取線程池等待隊列任務數
    * input parameter        : pthis    線程池指針
    * output parameter    : 
    * return value            : 等待隊列任務數
    * history                : 
    *****************************************************************/
    int (*GetCurTaskNum) (void *pthis); 
    
    /****************************************************************
    * function name         : ThreadPoolDestroy
    * functional description    : 銷毀線程池
    * input parameter        : pthis    線程池指針
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失敗
    * history                : 
    *****************************************************************/
    int (*Destruct) (void *pthis); 
};

/*---------------functions declaration--------------------------*/
/****************************************************************
* function name         : ThreadPoolConstruct
* functional description    : 創建線程池
* input parameter        : max_num    線程池可容納的最大線程數
                      free_num    線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統
* output parameter    : 
* return value            : 線程池指針
* history                : 
*****************************************************************/
CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num);

/****************************************************************
* function name         : ThreadPoolConstructDefault
* functional description    : 創建線程池,以默認的方式初始化,未創建線程
* input parameter        : 
* output parameter    : 
* return value            : 線程池指針
* history                : 
*****************************************************************/
CThread_pool_t* ThreadPoolConstructDefault(void);

#endif

main.c

#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <pthread.h> 
#include <assert.h> 
#include <string.h>

#include "lib_thread_pool.h"


static void* thread_1(void* arg);
static void* thread_2(void* arg);
static void* thread_3(void* arg);
static void DisplayPoolStatus(CThread_pool_t* pPool);

int nKillThread = 0;

int main()
{
    CThread_pool_t* pThreadPool = NULL;
    pThreadPool = ThreadPoolConstruct(2, 1);
    int nNumInput = 5;
    char LogInput[] = "OK!";

    DisplayPoolStatus(pThreadPool);
    /*可用AddWorkLimit()替換看執行的效果*/
    pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL);
    /*
    * 沒加延遲發現連續投遞任務時pthread_cond_wait()會收不到信號pthread_cond_signal() !!
    * 因為AddWorkUnlimit()進去后調用pthread_mutex_lock()把互斥鎖鎖上,導致pthread_cond_wait()
    * 收不到信號!!也可在AddWorkUnlimit()里面加個延遲,一般情況可能也遇不到這個問題
    */
    usleep(10);    
    pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput);
    usleep(10);
    pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput);
    usleep(10);
    DisplayPoolStatus(pThreadPool);

    nKillThread = 1;
    usleep(100);    /**< 先讓線程退出 */
    DisplayPoolStatus(pThreadPool);
    nKillThread = 2;
    usleep(100);
    DisplayPoolStatus(pThreadPool);
    nKillThread = 3;
    usleep(100);
    DisplayPoolStatus(pThreadPool);

    pThreadPool->Destruct((void*)pThreadPool);
    return 0;
}

static void* thread_1(void* arg)
{
    printf("Thread 1 is running !\n");
    while(nKillThread != 1)
        usleep(10);
    return NULL;
}

static void* thread_2(void* arg)
{
    int nNum = (int)arg;
    
    printf("Thread 2 is running !\n");
    printf("Get Number %d\n", nNum);
    while(nKillThread != 2)
        usleep(10);
    return NULL;
}

static void* thread_3(void* arg)
{
    char *pLog = (char*)arg;
    
    printf("Thread 3 is running !\n");
    printf("Get String %s\n", pLog);
    while(nKillThread != 3)
        usleep(10);
    return NULL;
}

static void DisplayPoolStatus(CThread_pool_t* pPool)
{
    static int nCount = 1;
    
    printf("******************\n");
    printf("nCount = %d\n", nCount++);
    printf("max_thread_num = %d\n", pPool->GetMaxThreadNum((void*)pPool));
    printf("current_pthread_num = %d\n", pPool->GetCurThreadNum((void*)pPool));
    printf("current_pthread_task_num = %d\n", pPool->GetCurTaskThreadNum((void*)pPool));
    printf("cur_queue_size = %d\n", pPool->GetCurTaskNum((void*)pPool));
    printf("******************\n");
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM