threadpool —— 基於 pthread 實現的簡單線程池(code)


轉載於 : http://blog.csdn.net/jcjc918/article/details/50395528

線程池介紹

線程池可以說是項目中經常會用到的組件,在這里假設讀者都有一定的多線程基礎,如果沒有的話不妨在這里進行了解:POSIX 多線程基礎

線程池是什么?我的簡單理解是有一組預先派生的線程,然后有一個管理員來管理和調度這些線程,你只需不斷把需要完成的任務交給他,他就會調度線程的資源來幫你完成。

那么管理員是怎么做的呢?一種簡單的方式就是,管理員管理一個任務的隊列,如果收到新的任務,就把任務加到隊列尾。每個線程盯着隊列,如果隊列非空,就去隊列頭拿一個任務來處理(每個任務只能被一個線程拿到),處理完了就繼續去隊列取任務。如果沒有任務了,線程就休眠,直到任務隊列不為空。如果這個管理員更聰明一點,他可能會在沒有任務或任務少的時候減少線程的數量,任務處理不過來的時候增加線程的數量,這樣就實現了資源的動態管理。

那么任務是什么呢?以后台服務器為例,每一個用戶的請求就是一個任務,線程不斷的在請求隊列里取出請求,完成后繼續處理下一個請求。

簡單圖示為: 
threadpool

線程池有一個好處就是減少線程創建和銷毀的時間,在任務處理時間比較短的時候這個好處非常顯著,可以提升任務處理的效率。

線程池實現

這里介紹的是線程池的一個簡單實現,在創建的時候預先派生指定數量的線程,然后去任務隊列取添加進來的任務進行處理就好。

作者說之后會添加更多特性,我們作為學習之后就以這個版本為准就好了。

項目主頁:threadpool

數據結構

主要有兩個自定義的數據結構

threadpool_task_t

用於保存一個等待執行的任務。一個任務需要指明:要運行的對應函數及函數的參數。所以這里的 struct 里有函數指針和 void 指針。

typedef struct { void (*function)(void *); void *argument; } threadpool_task_t;
thread_pool_t

一個線程池的結構。因為是 C 語言,所以這里任務隊列是用數組,並維護隊列頭和隊列尾來實現。

struct threadpool_t { pthread_mutex_t lock; /* 互斥鎖 */ pthread_cond_t notify; /* 條件變量 */ pthread_t *threads; /* 線程數組的起始指針 */ threadpool_task_t *queue; /* 任務隊列數組的起始指針 */ int thread_count; /* 線程數量 */ int queue_size; /* 任務隊列長度 */ int head; /* 當前任務隊列頭 */ int tail; /* 當前任務隊列尾 */ int count; /* 當前待運行的任務數 */ int shutdown; /* 線程池當前狀態是否關閉 */ int started; /* 正在運行的線程數 */ };

函數

對外接口
  • threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); 創建線程池,用 thread_count 指定派生線程數,queue_size 指定任務隊列長度,flags 為保留參數,未使用。
  • int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags); 添加需要執行的任務。第二個參數為對應函數指針,第三個為對應函數參數。flags 未使用。
  • int threadpool_destroy(threadpool_t *pool, int flags); 銷毀存在的線程池。flags 可以指定是立刻結束還是平和結束。立刻結束指不管任務隊列是否為空,立刻結束。平和結束指等待任務隊列的任務全部執行完后再結束,在這個過程中不可以添加新的任務。
內部輔助函數
  • static void *threadpool_thread(void *threadpool); 線程池每個線程所執行的函數。
  • int threadpool_free(threadpool_t *pool); 釋放線程池所申請的內存資源。

線程池使用

編譯

參考項目根目錄下的 Makefile, 直接用 make 編譯。

測試用例

項目提供了三個測試用例(見 threadpool/test/),我們可以以此來學習線程池的用法並測試是否正常工作。這里提供其中一個:

#define THREAD 32 #define QUEUE 256 #include <stdio.h> #include <pthread.h> #include <unistd.h> #include <assert.h> #include "threadpool.h" int tasks = 0, done = 0; pthread_mutex_t lock; void dummy_task(void *arg) { usleep(10000); pthread_mutex_lock(&lock); /* 記錄成功完成的任務數 */ done++; pthread_mutex_unlock(&lock); } int main(int argc, char **argv) { threadpool_t *pool; /* 初始化互斥鎖 */ pthread_mutex_init(&lock, NULL); /* 斷言線程池創建成功 */ assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL); fprintf(stderr, "Pool started with %d threads and " "queue size of %d\n", THREAD, QUEUE); /* 只要任務隊列還沒滿,就一直添加 */ while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) { pthread_mutex_lock(&lock); tasks++; pthread_mutex_unlock(&lock); } fprintf(stderr, "Added %d tasks\n", tasks); /* 不斷檢查任務數是否完成一半以上,沒有則繼續休眠 */ while((tasks / 2) > done) { usleep(10000); } /* 這時候銷毀線程池,0 代表 immediate_shutdown */ assert(threadpool_destroy(pool, 0) == 0); fprintf(stderr, "Did %d tasks\n", done); return 0; }

 

源碼注釋

源碼注釋一並放在 github, 點我。

threadpool.h

/* * Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ #ifdef __cplusplus /* 對於 C++ 編譯器,指定用 C 的語法編譯 */ extern "C" { #endif /** * @file threadpool.h * @brief Threadpool Header File */ /** * Increase this constants at your own risk * Large values might slow down your system */ #define MAX_THREADS 64 #define MAX_QUEUE 65536 /* 簡化變量定義 */ typedef struct threadpool_t threadpool_t; /* 定義錯誤碼 */ typedef enum { threadpool_invalid = -1, threadpool_lock_failure = -2, threadpool_queue_full = -3, threadpool_shutdown = -4, threadpool_thread_failure = -5 } threadpool_error_t; typedef enum { threadpool_graceful = 1 } threadpool_destroy_flags_t; /* 以下是線程池三個對外 API */ /** * @function threadpool_create * @brief Creates a threadpool_t object. * @param thread_count Number of worker threads. * @param queue_size Size of the queue. * @param flags Unused parameter. * @return a newly created thread pool or NULL */ /** * 創建線程池,有 thread_count 個線程,容納 queue_size 個的任務隊列,flags 參數沒有使用 */ threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); /** * @function threadpool_add * @brief add a new task in the queue of a thread pool * @param pool Thread pool to which add the task. * @param function Pointer to the function that will perform the task. * @param argument Argument to be passed to the function. * @param flags Unused parameter. * @return 0 if all goes well, negative values in case of error (@see * threadpool_error_t for codes). */ /** * 添加任務到線程池, pool 為線程池指針,routine 為函數指針, arg 為函數參數, flags 未使用 */ int threadpool_add(threadpool_t *pool, void (*routine)(void *), void *arg, int flags); /** * @function threadpool_destroy * @brief Stops and destroys a thread pool. * @param pool Thread pool to destroy. * @param flags Flags for shutdown * * Known values for flags are 0 (default) and threadpool_graceful in * which case the thread pool doesn't accept any new tasks but * processes all pending tasks before shutdown. */ /** * 銷毀線程池,flags 可以用來指定關閉的方式 */ int threadpool_destroy(threadpool_t *pool, int flags); #ifdef __cplusplus } #endif #endif /* _THREADPOOL_H_ */

threadpool.c

/* * Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file threadpool.c * @brief Threadpool implementation file */ #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "threadpool.h" /** * 線程池關閉的方式 */ typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ /** * 線程池一個任務的定義 */ typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; /** * @struct threadpool * @brief The threadpool struct * * @var notify Condition variable to notify worker threads. * @var threads Array containing worker threads ID. * @var thread_count Number of threads * @var queue Array containing the task queue. * @var queue_size Size of the task queue. * @var head Index of the first element. * @var tail Index of the next element. * @var count Number of pending tasks * @var shutdown Flag indicating if the pool is shutting down * @var started Number of started threads */ /** * 線程池的結構定義 * @var lock 用於內部工作的互斥鎖 * @var notify 線程間通知的條件變量 * @var threads 線程數組,這里用指針來表示,數組名 = 首元素指針 * @var thread_count 線程數量 * @var queue 存儲任務的數組,即任務隊列 * @var queue_size 任務隊列大小 * @var head 任務隊列中首個任務位置(注:任務隊列中所有任務都是未開始運行的) * @var tail 任務隊列中最后一個任務的下一個位置(注:隊列以數組存儲,head 和 tail 指示隊列位置) * @var count 任務隊列里的任務數量,即等待運行的任務數 * @var shutdown 表示線程池是否關閉 * @var started 開始的線程數 */ struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ /** * 線程池里每個線程在跑的函數 * 聲明 static 應該只為了使函數只在本文件內有效 */ static void *threadpool_thread(void *threadpool); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } threadpool_t *pool; int i; /* 申請內存創建內存池對象 */ if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } /* Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ /* 申請線程數組和任務隊列所需的內存 */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ /* 初始化互斥鎖和條件變量 */ if((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } /* Start worker threads */ /* 創建指定數量的線程開始運行 */ for(i = 0; i < thread_count; i++) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count++; pool->started++; } return pool; err: if(pool) { threadpool_free(pool); } return NULL; } int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; if(pool == NULL || function == NULL) { return threadpool_invalid; } /* 必須先取得互斥鎖所有權 */ if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } /* 計算下一個可以存儲 task 的位置 */ next = pool->tail + 1; next = (next == pool->queue_size) ? 0 : next; do { /* Are we full ? */ /* 檢查是否任務隊列滿 */ if(pool->count == pool->queue_size) { err = threadpool_queue_full; break; } /* Are we shutting down ? */ /* 檢查當前線程池狀態是否關閉 */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ /* 在 tail 的位置放置函數指針和參數,添加到任務隊列 */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; /* 更新 tail 和 count */ pool->tail = next; pool->count += 1; /* pthread_cond_broadcast */ /* * 發出 signal,表示有 task 被添加進來了 * 如果由因為任務隊列空阻塞的線程,此時會有一個被喚醒 * 如果沒有則什么都不做 */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } /* * 這里用的是 do { ... } while(0) 結構 * 保證過程最多被執行一次,但在中間方便因為異常而跳出執行塊 */ } while(0); /* 釋放互斥鎖資源 */ if(pthread_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if(pool == NULL) { return threadpool_invalid; } /* 取得互斥鎖資源 */ if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ /* 判斷是否已在其他地方關閉 */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* 獲取指定的關閉方式 */ pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ /* 喚醒所有因條件變量阻塞的線程,並釋放互斥鎖 */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ /* 等待所有線程結束 */ for(i = 0; i < pool->thread_count; i++) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } /* 同樣是 do{...} while(0) 結構*/ } while(0); /* Only if everything went well do we deallocate the pool */ if(!err) { /* 釋放內存資源 */ threadpool_free(pool); } return err; } int threadpool_free(threadpool_t *pool) { if(pool == NULL || pool->started > 0) { return -1; } /* Did we manage to allocate ? */ /* 釋放線程 任務隊列 互斥鎖 條件變量 線程池所占內存資源 */ if(pool->threads) { free(pool->threads); free(pool->queue); /* Because we allocate pool->threads after initializing the mutex and condition variable, we're sure they're initialized. Let's lock the mutex just in case. */ pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ /* 取得互斥鎖資源 */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ /* 用 while 是為了在喚醒時重新檢查條件 */ while((pool->count == 0) && (!pool->shutdown)) { /* 任務隊列為空,且線程池沒有關閉時阻塞在這里 */ pthread_cond_wait(&(pool->notify), &(pool->lock)); } /* 關閉的處理 */ if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ /* 取得任務隊列的第一個任務 */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; /* 更新 head 和 count */ pool->head += 1; pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; pool->count -= 1; /* Unlock */ /* 釋放互斥鎖 */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ /* 開始運行任務 */ (*(task.function))(task.argument); /* 這里一個任務運行結束 */ } /* 線程將結束,更新運行線程數 */ pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM