linux C 線程池(物不可窮也~)


Linux 多線程編程之 線程池 的原理和一個簡單的C實現,提高對多線程編

程的認知,同步處理等操作,以及如何在實際項目中高效的利用多線程開

發。

 

1.  線程池介紹

為什么需要線程池???

目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務
器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,
但處理時間卻相對較短。

傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創
建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就
是是“即時創建,即時銷毀”的策略。盡管與創建進程相比,創建線程的時
間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執
行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態,
這筆開銷將是不可忽略的。

線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對
多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,
因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的
延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過
適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,
就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從
而可以防止資源不足。

 

2. 線程池結構

2.1 線程池任務結點結構

線程池任務結點用來保存用戶投遞過來的的任務,並放入線程池中的線程來執行,任務結構如下:

// 線程池任務結點
struct
worker_t { void * (* process)(void * arg); /*回調函數*/ int paratype; /*函數類型(預留)*/ void * arg; /*回調函數參數*/ struct worker_t * next; /*鏈接下一個任務節點*/ };

2.2 線程池控制器

線程池控制器用來對線程池進行控制管理,描述當前線程池的最基本信息,包括任務的投遞,線

程池狀態的更新與查詢,線程池的銷毀等,其結構如下:

/*線程控制器*/
struct CThread_pool_t {
    pthread_mutex_t queue_lock;     /*互斥鎖*/
    pthread_cond_t  queue_ready;    /*條件變量*/
    
    worker_t * queue_head;          /*任務節點鏈表 保存所有投遞的任務*/
    int shutdown;                   /*線程池銷毀標志 1-銷毀*/
    pthread_t * threadid;           /*線程ID*/
    
    int max_thread_num;             /*線程池可容納最大線程數*/
    int current_pthread_num;        /*當前線程池存放的線程*/
    int current_pthread_task_num;   /*當前已經執行任務和已分配任務的線程數目和*/
    int current_wait_queue_num;     /*當前等待隊列的的任務數目*/
    int free_pthread_num;           /*線程池允許最大的空閑線程數/*/
    
    /**
     *  function:       ThreadPoolAddWorkUnlimit
     *  description:    向線程池投遞任務
     *  input param:    pthis   線程池指針
     *                  process 回調函數
     *                  arg     回調函數參數
     *  return Valr:    0       成功
     *                  -1      失敗
     */     
    int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolAddWorkLimit
     *  description:    向線程池投遞任務,無空閑線程則阻塞
     *  input param:    pthis   線程池指針
     *                  process 回調函數
     *                  arg     回調函數參數
     *  return Val:     0       成功
     *                  -1      失敗
     */     
    int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolGetThreadMaxNum
     *  description:    獲取線程池可容納的最大線程數
     *  input param:    pthis   線程池指針
     */     
    int (* GetThreadMaxNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentThreadNum
     *  description:    獲取線程池存放的線程數
     *  input param:    pthis   線程池指針
     *  return Val:     線程池存放的線程數
     */     
    int (* GetCurrentThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentTaskThreadNum
     *  description:    獲取當前正在執行任務和已經分配任務的線程數目和
     *  input param:    pthis   線程池指針
     *  return Val:     當前正在執行任務和已經分配任務的線程數目和
     */     
    int (* GetCurrentTaskThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentWaitTaskNum
     *  description:    獲取線程池等待隊列任務數
     *  input param:    pthis   線程池指針
     *  return Val:     等待隊列任務數
     */     
    int (* GetCurrentWaitTaskNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolDestroy
     *  description:    銷毀線程池
     *  input param:    pthis   線程池指針
     *  return Val:     0       成功
     *                  -1      失敗
     */     
    int (* Destroy)(void * pthis);    
};

 

2.3 線程池運行結構

解釋:

1) 圖中的線程池中的"空閑"和"執行"分別表示空閑線程和執行線程,空閑線程指在正在等待任務的線程,

 同樣執行線程指正在執行任務的線程,  兩者是相互轉換的。當用戶投遞任務過來則用空閑線程來執行

 該任務,且空閑線程狀態轉換為執行線程;當任務執行完后,執行線程狀態轉變為空閑線程。

2) 創建線程池時,正常情況會創建一定數量的線程,  所有線程初始化為空閑線程,線程阻塞等待用戶

 投遞任務。

3) 用戶投遞的任務首先放入等待隊列queue_head 鏈表中, 如果線程池中有空閑線程則放入空閑線程中

 執行,否則根據條件選擇繼續等待空閑線程或者新建一個線程來執行,新建的線程將放入線程池中。

4) 執行的任務會從等待隊列中脫離,並在任務執行完后釋放任務結點worker_t 

 

3. 線程池控制 / 部分函數解釋

3.1 線程池創建

 創建 max_num 個線程 ThreadPoolRoutine,即空閑線程

/**
 *  function:       ThreadPoolConstruct
 *  description:    構建線程池
 *  input param:    max_num   線程池可容納的最大線程數
 *                  free_num  線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統
 *  return Val:     線程池指針                 
 */     
CThread_pool_t * 
ThreadPoolConstruct(int max_num, int free_num)
{
    int i = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    if(NULL == pool)
        return NULL;
    
    memset(pool, 0, sizeof(CThread_pool_t));
    
    /*初始化互斥鎖*/
    pthread_mutex_init(&(pool->queue_lock), NULL);
    /*初始化條件變量*/
    pthread_cond_init(&(pool->queue_ready), NULL);
    
    pool->queue_head                = NULL;
    pool->max_thread_num            = max_num; // 線程池可容納的最大線程數
    pool->current_wait_queue_num    = 0;
    pool->current_pthread_task_num  = 0;
    pool->shutdown                  = 0;
    pool->current_pthread_num       = 0;
    pool->free_pthread_num          = free_num; // 線程池允許存在最大空閑線程
    pool->threadid                  = NULL;
    pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    /*該函數指針賦值*/
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    pool->Destroy                   = ThreadPoolDestroy;
    pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    
    for(i=0; i<max_num; i++) {
        pool->current_pthread_num++;    // 當前池中的線程數
        /*創建線程*/
        pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
        usleep(1000);        
    }
    
    return pool;
}

 

3.2 投遞任務

/**
 *  function:       ThreadPoolAddWorkLimit
 *  description:    向線程池投遞任務,無空閑線程則阻塞
 *  input param:    pthis   線程池指針
 *                  process 回調函數
 *                  arg     回調函數參數
 *  return Val:     0       成功
 *                  -1      失敗
 */     
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{ 
    // int FreeThreadNum = 0;
    // int CurrentPthreadNum = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    /*為添加的任務隊列節點分配內存*/
    worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
    if(NULL == newworker) 
        return -1;
    
    newworker->process  = process;  // 回調函數,在線程ThreadPoolRoutine()中執行
    newworker->arg      = arg;      // 回調函數參數
    newworker->next     = NULL;      
    
    pthread_mutex_lock(&(pool->queue_lock));
    
    /*插入新任務隊列節點*/
    worker_t * member = pool->queue_head;   // 指向任務隊列鏈表整體
    if(member != NULL) {
        while(member->next != NULL) // 隊列中有節點
            member = member->next;  // member指針往后移動
            
        member->next = newworker;   // 插入到隊列鏈表尾部
    } else 
        pool->queue_head = newworker; // 插入到隊列鏈表頭
    
    assert(pool->queue_head != NULL);
    pool->current_wait_queue_num++; // 等待隊列加1
    
    /*空閑的線程= 當前線程池存放的線程 - 當前已經執行任務和已分配任務的線程數目和*/
    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    /*如果沒有空閑線程且池中當前線程數不超過可容納最大線程*/
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {  //-> 條件為真進行新線程創建
        int CurrentPthreadNum = pool->current_pthread_num;
        
        /*新增線程*/
        pool->threadid = (pthread_t *)realloc(pool->threadid, 
                                        (CurrentPthreadNum+1) * sizeof(pthread_t));
                                        
        pthread_create(&(pool->threadid[CurrentPthreadNum]),
                                              NULL, ThreadPoolRoutine, (void *)pool);
        /*當前線程池中線程總數加1*/                                   
        pool->current_pthread_num++;
        
        /*分配任務線程數加1*/
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock));
        
        /*發送信號給一個處與條件阻塞等待狀態的線程*/
        pthread_cond_signal(&(pool->queue_ready));
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    /*發送信號給一個處與條件阻塞等待狀態的線程*/
    pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);  //看情況  
    return 0;
}

 投遞任務時先創建一個任務結點保存回調函數和函數參數,並將任務結點放入等待隊列中,在代碼中

 注釋"//->條件為真創建新線程",realloc() 會在保存原始內存中的數據不變的基礎上新增1個sizeof(pthread_t)

 大小內存。之后更新current_pthread_num,和current_pthread_task_num;並發送信號

 pthread_cond_signal(&(pool->queue_read)),給一個處於條件阻塞等待狀態的線程,即線程ThreadPoolRoutin()

 中的pthread_cond_wait(&(pool->queue_read), &(pool->queue_lock))阻塞等待接收信號,重點講互

 斥鎖和添加變量:

  pthread_mutex_t  queue_lock;   /**< 互斥鎖*/

  pthread_cond_t    queue_ready;   /**< 條件變量*/ 

 這兩個變量時線程池實現中很重要的點,這里簡要介紹代碼中會用到的相關函數功能;

 

3.3 執行線程

/**
 *  function:       ThreadPoolRoutine
 *  description:    線程池中執行的線程
 *  input param:    arg  線程池指針
 */     
void * 
ThreadPoolRoutine(void * arg)
{
    CThread_pool_t * pool = (CThread_pool_t *)arg;
    
    while(1) {
        /*上鎖,pthread_cond_wait()調用會解鎖*/
        pthread_mutex_lock(&(pool->queue_lock));
        
        /*隊列沒有等待任務*/
        while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
            /*條件鎖阻塞等待條件信號*/
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
        }
        
        if(pool->shutdown) {
            pthread_mutex_unlock(&(pool->queue_lock));
            pthread_exit(NULL);         // 釋放線程
        }
        
        assert(pool->current_wait_queue_num != 0);
        assert(pool->queue_head != NULL);
        
        pool->current_wait_queue_num--; // 等待任務減1,准備執行任務
        worker_t * worker = pool->queue_head;   // 去等待隊列任務節點頭
        pool->queue_head = worker->next;        // 鏈表后移     
        pthread_mutex_unlock(&(pool->queue_lock));
        
        (* (worker->process))(worker->arg);      // 執行回調函數
        
        pthread_mutex_lock(&(pool->queue_lock));
        pool->current_pthread_task_num--;       // 函數執行結束
        free(worker);   // 釋放任務結點
        worker = NULL;
        
        if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
            pthread_mutex_unlock(&(pool->queue_lock));
            break;  // 當線程池中空閑線程超過 free_pthread_num 則將線程釋放回操作系統
        }
        pthread_mutex_unlock(&(pool->queue_lock));    
    }
    
    pool->current_pthread_num--;    // 當前線程數減1
    pthread_exit(NULL);             // 釋放線程
    
    return (void *)NULL;
}

 這個就是用來執行任務的線程,在初始化創建線程時所有線程都全部阻塞在pthread_cond_wait()處

 此時的線程就為空閑線程,也就是線程被掛起,當收到信號並取得互斥鎖時,     表明任務投遞過來

 則獲取等待隊列里的任務結點並執行回調函數;  函數執行結束后回去判斷當前等待隊列是否還有任

 務,有則接下去執行,否則重新阻塞回到空閑線程狀態。

 

4. 完整代碼實現

4.1 CThreadPool.h 文件

/**
 *  線程池頭文件
 *
 **/

#ifndef _CTHREADPOOL_H_
#define _CTHREADPOOL_H_

#include <pthread.h>

/*線程池可容納最大線程數*/
#define DEFAULT_MAX_THREAD_NUM      100

/*線程池允許最大的空閑線程,超過則將線程釋放回操作系統*/
#define DEFAULT_FREE_THREAD_NUM     10

typedef struct worker_t         worker_t;
typedef struct CThread_pool_t   CThread_pool_t;

/*線程池任務節點*/
struct worker_t {
    void * (* process)(void * arg); /*回調函數*/
    int    paratype;                /*函數類型(預留)*/
    void * arg;                     /*回調函數參數*/
    struct worker_t * next;         /*鏈接下一個任務節點*/
};

/*線程控制器*/
struct CThread_pool_t {
    pthread_mutex_t queue_lock;     /*互斥鎖*/
    pthread_cond_t  queue_ready;    /*條件變量*/
    
    worker_t * queue_head;          /*任務節點鏈表 保存所有投遞的任務*/
    int shutdown;                   /*線程池銷毀標志 1-銷毀*/
    pthread_t * threadid;           /*線程ID*/
    
    int max_thread_num;             /*線程池可容納最大線程數*/
    int current_pthread_num;        /*當前線程池存放的線程*/
    int current_pthread_task_num;   /*當前已經執行任務和已分配任務的線程數目和*/
    int current_wait_queue_num;     /*當前等待隊列的的任務數目*/
    int free_pthread_num;           /*線程池允許最大的空閑線程數/*/
    
    /**
     *  function:       ThreadPoolAddWorkUnlimit
     *  description:    向線程池投遞任務
     *  input param:    pthis   線程池指針
     *                  process 回調函數
     *                  arg     回調函數參數
     *  return Valr:    0       成功
     *                  -1      失敗
     */     
    int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolAddWorkLimit
     *  description:    向線程池投遞任務,無空閑線程則阻塞
     *  input param:    pthis   線程池指針
     *                  process 回調函數
     *                  arg     回調函數參數
     *  return Val:     0       成功
     *                  -1      失敗
     */     
    int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolGetThreadMaxNum
     *  description:    獲取線程池可容納的最大線程數
     *  input param:    pthis   線程池指針
     */     
    int (* GetThreadMaxNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentThreadNum
     *  description:    獲取線程池存放的線程數
     *  input param:    pthis   線程池指針
     *  return Val:     線程池存放的線程數
     */     
    int (* GetCurrentThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentTaskThreadNum
     *  description:    獲取當前正在執行任務和已經分配任務的線程數目和
     *  input param:    pthis   線程池指針
     *  return Val:     當前正在執行任務和已經分配任務的線程數目和
     */     
    int (* GetCurrentTaskThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentWaitTaskNum
     *  description:    獲取線程池等待隊列任務數
     *  input param:    pthis   線程池指針
     *  return Val:     等待隊列任務數
     */     
    int (* GetCurrentWaitTaskNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolDestroy
     *  description:    銷毀線程池
     *  input param:    pthis   線程池指針
     *  return Val:     0       成功
     *                  -1      失敗
     */     
    int (* Destroy)(void * pthis);    
};

/**
 *  function:       ThreadPoolConstruct
 *  description:    構建線程池
 *  input param:    max_num   線程池可容納的最大線程數
 *                  free_num  線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統
 *  return Val:     線程池指針                 
 */     
CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num);

/**
 *  function:       ThreadPoolConstructDefault
 *  description:    創建線程池,以默認的方式初始化,未創建線程
 *
 *  return Val:     線程池指針                 
 */     
CThread_pool_t * ThreadPoolConstructDefault(void);

#endif  // _CTHREADPOOL_H_

 

4.2 CThreadPool.c 文件

/**
 *  線程池實現
 *
 **/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>

#include "CThreadPool.h"

void * ThreadPoolRoutine(void * arg); 

/**
 *  function:       ThreadPoolAddWorkLimit
 *  description:    向線程池投遞任務,無空閑線程則阻塞
 *  input param:    pthis   線程池指針
 *                  process 回調函數
 *                  arg     回調函數參數
 *  return Val:     0       成功
 *                  -1      失敗
 */     
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{ 
    // int FreeThreadNum = 0;
    // int CurrentPthreadNum = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    /*為添加的任務隊列節點分配內存*/
    worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
    if(NULL == newworker) 
        return -1;
    
    newworker->process  = process;  // 回調函數,在線程ThreadPoolRoutine()中執行
    newworker->arg      = arg;      // 回調函數參數
    newworker->next     = NULL;      
    
    pthread_mutex_lock(&(pool->queue_lock));
    
    /*插入新任務隊列節點*/
    worker_t * member = pool->queue_head;   // 指向任務隊列鏈表整體
    if(member != NULL) {
        while(member->next != NULL) // 隊列中有節點
            member = member->next;  // member指針往后移動
            
        member->next = newworker;   // 插入到隊列鏈表尾部
    } else 
        pool->queue_head = newworker; // 插入到隊列鏈表頭
    
    assert(pool->queue_head != NULL);
    pool->current_wait_queue_num++; // 等待隊列加1
    
    /*空閑的線程= 當前線程池存放的線程 - 當前已經執行任務和已分配任務的線程數目和*/
    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    /*如果沒有空閑線程且池中當前線程數不超過可容納最大線程*/
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {
        int CurrentPthreadNum = pool->current_pthread_num;
        
        /*新增線程*/
        pool->threadid = (pthread_t *)realloc(pool->threadid, 
                                        (CurrentPthreadNum+1) * sizeof(pthread_t));
                                        
        pthread_create(&(pool->threadid[CurrentPthreadNum]),
                                              NULL, ThreadPoolRoutine, (void *)pool);
        /*當前線程池中線程總數加1*/                                   
        pool->current_pthread_num++;
        
        /*分配任務線程數加1*/
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock));
        
        /*發送信號給一個處與條件阻塞等待狀態的線程*/
        pthread_cond_signal(&(pool->queue_ready));
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    /*發送信號給一個處與條件阻塞等待狀態的線程*/
    pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);  //看情況  
    return 0;
}

/**
 *  function:       ThreadPoolAddWorkUnlimit
 *  description:    向線程池投遞任務
 *  input param:    pthis   線程池指針
 *                  process 回調函數
 *                  arg     回調函數參數
 *  return Valr:    0       成功
 *                  -1      失敗
 */
int
ThreadPoolAddWorkUnlimit(void * pthis, void * (* process)(void * arg), void * arg)
{
    // int FreeThreadNum = 0;
    // int CurrentPthreadNum = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    /*給新任務隊列節點分配內存*/
    worker_t * newworker = (worker_t *)malloc(sizeof(worker_t));
    if(NULL == newworker)
        return -1;
    
    newworker->process  = process;  // 回調函數
    newworker->arg      = arg;      // 回調函數參數
    newworker->next     = NULL;
    
    pthread_mutex_lock(&(pool->queue_lock));
    
    /*新節點插入任務隊列鏈表操作*/
    worker_t * member = pool->queue_head;
    if(member != NULL) {
        while(member->next != NULL)
            member = member->next;
        
        member->next = newworker;       // 插入隊列鏈表尾部
    } else 
        pool->queue_head = newworker;   // 插入到頭(也就是第一個節點,之前鏈表沒有節點)
    
    assert(pool->queue_head != NULL);
    pool->current_wait_queue_num++;     // 當前等待隊列的的任務數目+1
    
    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    /*只判斷是否沒有空閑線程*/
    if(0 == FreeThreadNum) {
        int CurrentPthreadNum = pool->current_pthread_num;
        pool->threadid = (pthread_t *)realloc(pool->threadid,
                                           (CurrentPthreadNum+1)*sizeof(pthread_t));
        pthread_create(&(pool->threadid[CurrentPthreadNum]),NULL,
                                        ThreadPoolRoutine, (void *)pool);
        pool->current_pthread_num++;
        if(pool->current_pthread_num > pool->max_thread_num)
            pool->max_thread_num = pool->current_pthread_num;
        
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock));
        pthread_cond_signal(&(pool->queue_ready));
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock));
    pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);    
    return 0;   
}

/**
 *  function:       ThreadPoolGetThreadMaxNum
 *  description:    獲取線程池可容納的最大線程數
 *  input param:    pthis   線程池指針
 *  return val:     線程池可容納的最大線程數
 */     
int
ThreadPoolGetThreadMaxNum(void * pthis)
{
    int num = 0;   
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock));
    num = pool->max_thread_num;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    return num;
}

/**
 *  function:       ThreadPoolGetCurrentThreadNum
 *  description:    獲取線程池存放的線程數
 *  input param:    pthis   線程池指針
 *  return Val:     線程池存放的線程數
 */     
int 
ThreadPoolGetCurrentThreadNum(void * pthis)
{
    int num = 0;
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock));
    num = pool->current_pthread_num;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    return num;       
}

/**
 *  function:       ThreadPoolGetCurrentTaskThreadNum
 *  description:    獲取當前正在執行任務和已經分配任務的線程數目和
 *  input param:    pthis   線程池指針
 *  return Val:     當前正在執行任務和已經分配任務的線程數目和
 */   
int
ThreadPoolGetCurrentTaskThreadNum(void * pthis)
{
    int num = 0;
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock));
    num = pool->current_pthread_task_num;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    return num;   
}

/**
 *  function:       ThreadPoolGetCurrentWaitTaskNum
 *  description:    獲取線程池等待隊列任務數
 *  input param:    pthis   線程池指針
 *  return Val:     等待隊列任務數
 */     
int
ThreadPoolGetCurrentWaitTaskNum(void * pthis)
{
    int num = 0;
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    pthread_mutex_lock(&(pool->queue_lock));
    num = pool->current_wait_queue_num;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    return num;   
}

/**
 *  function:       ThreadPoolDestroy
 *  description:    銷毀線程池
 *  input param:    pthis   線程池指針
 *  return Val:     0       成功
 *                  -1      失敗
 */     
int
ThreadPoolDestroy(void * pthis)
{
    int i;
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    if(pool->shutdown)      // 已銷毀
        return -1;
        
    pool->shutdown = 1;     // 銷毀標志置位
    
    /*喚醒所有pthread_cond_wait()等待線程*/
    pthread_cond_broadcast(&(pool->queue_ready));
    for(i=0; i<pool->current_pthread_num; i++)
        pthread_join(pool->threadid[i], NULL);  // 等待所有線程執行結束
    
    free(pool->threadid);   // 釋放
       
    /*銷毀任務隊列鏈表*/
    worker_t * head = NULL;
    while(pool->queue_head != NULL) {
        head = pool->queue_head;
        pool->queue_head = pool->queue_head->next;
        free(head);    
    }
    
    /*銷毀鎖*/
    pthread_mutex_destroy(&(pool->queue_lock));
    pthread_cond_destroy(&(pool->queue_ready));
    
    free(pool);
    pool = NULL;
    
    return 0;
}

/**
 *  function:       ThreadPoolRoutine
 *  description:    線程池中運行的線程
 *  input param:    arg  線程池指針
 */     
void * 
ThreadPoolRoutine(void * arg)
{
    CThread_pool_t * pool = (CThread_pool_t *)arg;
    
    while(1) {
        /*上鎖,pthread_cond_wait()調用會解鎖*/
        pthread_mutex_lock(&(pool->queue_lock));
        
        /*隊列沒有等待任務*/
        while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
            /*條件鎖阻塞等待條件信號*/
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
        }
        
        if(pool->shutdown) {
            pthread_mutex_unlock(&(pool->queue_lock));
            pthread_exit(NULL);         // 釋放線程
        }
        
        assert(pool->current_wait_queue_num != 0);
        assert(pool->queue_head != NULL);
        
        pool->current_wait_queue_num--; // 等待任務減1,准備執行任務
        worker_t * worker = pool->queue_head;   // 去等待隊列任務節點頭
        pool->queue_head = worker->next;        // 鏈表后移     
        pthread_mutex_unlock(&(pool->queue_lock));
        
        (* (worker->process))(worker->arg);      // 執行回調函數
        
        pthread_mutex_lock(&(pool->queue_lock));
        pool->current_pthread_task_num--;       // 函數執行結束
        free(worker);   // 釋放任務結點
        worker = NULL;
        
        if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
            pthread_mutex_unlock(&(pool->queue_lock));
            break;  // 當線程池中空閑線程超過 free_pthread_num 則將線程釋放回操作系統
        }
        pthread_mutex_unlock(&(pool->queue_lock));    
    }
    
    pool->current_pthread_num--;    // 當前線程數減1
    pthread_exit(NULL);             // 釋放線程
    
    return (void *)NULL;
}

/**
 *  function:       ThreadPoolConstruct
 *  description:    構建線程池
 *  input param:    max_num   線程池可容納的最大線程數
 *                  free_num  線程池允許存在的最大空閑線程,超過則將線程釋放回操作系統
 *  return Val:     線程池指針                 
 */     
CThread_pool_t * 
ThreadPoolConstruct(int max_num, int free_num)
{
    int i = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    if(NULL == pool)
        return NULL;
    
    memset(pool, 0, sizeof(CThread_pool_t));
    
    /*初始化互斥鎖*/
    pthread_mutex_init(&(pool->queue_lock), NULL);
    /*初始化條件變量*/
    pthread_cond_init(&(pool->queue_ready), NULL);
    
    pool->queue_head                = NULL;
    pool->max_thread_num            = max_num; // 線程池可容納的最大線程數
    pool->current_wait_queue_num    = 0;
    pool->current_pthread_task_num  = 0;
    pool->shutdown                  = 0;
    pool->current_pthread_num       = 0;
    pool->free_pthread_num          = free_num; // 線程池允許存在最大空閑線程
    pool->threadid                  = NULL;
    pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    /*該函數指針賦值*/
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    pool->Destroy                   = ThreadPoolDestroy;
    pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    
    for(i=0; i<max_num; i++) {
        pool->current_pthread_num++;    // 當前池中的線程數
        /*創建線程*/
        pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
        usleep(1000);        
    }
    
    return pool;
}

/**
 *  function:       ThreadPoolConstructDefault
 *  description:    創建線程池,以默認的方式初始化,未創建線程
 *
 *  return Val:     線程池指針                 
 */     
CThread_pool_t * 
ThreadPoolConstructDefault(void)
{
    CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    if(NULL == pool)
        return NULL;
    
    memset(pool, 0, sizeof(CThread_pool_t));
    
    pthread_mutex_init(&(pool->queue_lock), NULL);
    pthread_cond_init(&(pool->queue_ready), NULL);
    
    pool->queue_head                = NULL;
    pool->max_thread_num            = DEFAULT_MAX_THREAD_NUM; // 默認值
    pool->current_wait_queue_num    = 0;
    pool->current_pthread_task_num  = 0;
    pool->shutdown                  = 0;
    pool->current_pthread_num       = 0;
    pool->free_pthread_num          = DEFAULT_FREE_THREAD_NUM; // 默認值
    pool->threadid                  = NULL;
    /*該函數指針賦值*/
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    pool->Destroy                   = ThreadPoolDestroy;
    pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    
    return pool;
}

 

4.3 測試 main.c 文件

#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <pthread.h> 
#include <assert.h> 
#include <string.h>

#include "CThreadPool.h"


void * thread_1(void * arg);
void * thread_2(void * arg);
void * thread_3(void * arg);
void DisplayPoolStatus(CThread_pool_t * pPool);

int nKillThread = 0;

int main()
{
    CThread_pool_t * pThreadPool = NULL;
    
    pThreadPool = ThreadPoolConstruct(5, 1);
    int nNumInput = 5;
    char LogInput[] = "OK!";

    DisplayPoolStatus(pThreadPool);
    /*可用AddWorkLimit()替換看執行的效果*/
    pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_1, (void *)NULL);
    /*
     * 沒加延遲發現連續投遞任務時pthread_cond_wait()會收不到信號pthread_cond_signal() !!
     * 因為AddWorkUnlimit()進去后調用pthread_mutex_lock()把互斥鎖鎖上,導致pthread_cond_wait()
     * 收不到信號!!也可在AddWorkUnlimit()里面加個延遲,一般情況可能也遇不到這個問題
     */
    usleep(10);    
    pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_2, (void *)nNumInput);
    usleep(10);
    pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_3, (void *)LogInput);
    usleep(10);
    DisplayPoolStatus(pThreadPool);

    nKillThread = 1;
    usleep(100);    /**< 先讓線程退出 */
    DisplayPoolStatus(pThreadPool);
    nKillThread = 2;
    usleep(100);
    DisplayPoolStatus(pThreadPool);
    nKillThread = 3;
    usleep(100);
    DisplayPoolStatus(pThreadPool);

    pThreadPool->Destroy((void*)pThreadPool);
    return 0;
}

void * 
thread_1(void * arg)
{
    printf("Thread 1 is running !\n");
    while(nKillThread != 1)
        usleep(10);
    return NULL;
}

void * 
thread_2(void * arg) { int nNum = (int)arg; printf("Thread 2 is running !\n"); printf("Get Number %d\n", nNum); while(nKillThread != 2) usleep(10); return NULL; } void *
thread_3(void * arg) { char * pLog = (char *)arg; printf("Thread 3 is running !\n"); printf("Get String %s\n", pLog); while(nKillThread != 3) usleep(10); return NULL; } void
DisplayPoolStatus(CThread_pool_t * pPool) { static int nCount = 1; printf("****************************\n"); printf("nCount = %d\n", nCount++); printf("max_thread_num = %d\n", pPool->GetThreadMaxNum((void *)pPool)); printf("current_pthread_num = %d\n", pPool->GetCurrentThreadNum((void *)pPool)); printf("current_pthread_task_num = %d\n", pPool->GetCurrentTaskThreadNum((void *)pPool)); printf("current_wait_queue_num = %d\n", pPool->GetCurrentWaitTaskNum((void *)pPool)); printf("****************************\n"); }

 

4.4 Makefile

簡單寫一個makefile

CC = gcc
CFLAGS = -g -Wall -o2
LIB = -lpthread

RUNE = $(CC) $(CFLAGS) $(object) -o $(exe) $(LIB)
RUNO = $(CC) $(CFLAGS) -c $< -o $@ $(LIB)

.RHONY:clean


object = main.o CThreadPool.o
exe = CThreadpool

$(exe):$(object)
    $(RUNE)

%.o:%.c CThreadPool.h
    $(RUNO)
%.o:%.c
    $(RUNO)


clean:
    -rm -rf *.o CThreadpool *~

 注意:使用模式規則,能引入用戶自定義變量,為多個文件建立相同的規則,規則中的相關

 文件前必須用“%”表明。關於Makefile的一些規則解釋見另一篇

 

5. 參考

感謝下面博主的貢獻,特別致謝(死去的龍7)博主!!!

天行健,君子以自強不息~ 祝諸位幸福安好!!!Thanks again.
死去的龍7:https://www.cnblogs.com/deadlong7/p/4155663.html
青山小和尚:https://blog.csdn.net/qq_36359022/article/details/78796784
de
veloperWorks:https://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/

 

6. 后記

 

無極生太極

太極生兩儀

兩儀生四象

四象生八卦

 

八卦:qian乾  xun巽  li離  gen艮  dui兌  kan坎  zhen震   kun坤

宇宙從混沌未分的“無極”而來,無極動而生太極,太極分陰陽兩儀,在由

陰陽分化出太陰、太陽、少陰、少陽這四象,四象分化而為八卦,   八卦

代表着世界的八種基本屬性,可以用“天地風山水火雷澤”來概括《說卦》

認為:

  乾,鍵也

  坤,順也

  震,動也

  巽,入也

  坎,陷也

  離,麗也

  艮,止也

  兌,說也

八卦又分出六十四卦,但六十四卦並不代表事務演化過程的終結。六十四

卦最后兩卦為“既濟” 和 “未濟”,象征事務發展到最后必然有一個結果,但

這個結果作為一個"節點“,以它為開始將展開另一次全新的演變,     所以

物不可窮也,故受之以未濟終焉",


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM