引言:上篇文章說到了多進程並發式的服務端模型,如上一篇文章所述,進程的頻繁創建會導致服務器不堪負載,那這一篇博客主要講述的是線程模型和線程池的方式來提高服務端的負載能力。同時比較一下不同的模型的好處與壞處。
(如果不加以說明,我們都是考慮開發是基於GNU/Linux的)在Linux下創建一個線程的方式很簡單,pthread_create() 函數來創建線程,其中的一個參數的回調函數,也就是線程本身的執行體函數。
void *thread_entry( void * args );
這里不過多的強調怎樣利用線程等來創建執行體以及其他的系統調用怎樣使用的。
那么,在服務端的線程使用方式一般為三種種:
(1)按需生成(來一個連接生成一個線程)
(2)線程池(預先生成很多線程)
(3)Leader follower(LF)
主要講解第一種和第二種,第三種暫時手上沒有實例代碼,最近也沒寫、
第一種方式的范式大概是這樣:
回調函數:
void *thread_entry( void *args )
{
int fd = *(int *)args ;
do_handler_fd( fd );
}
程序主體:
for(;;){
fd = accept();
pthread_create(...,thread_entry,&fd);
}
這里所展示的只是一個最簡單的方式,但是可以代表多線程的服務器端模型。
大體服務端分為主線程和工作線程,主線程負責accept()連接,而工作線程負責處理業務邏輯和流的讀取等。這樣,即使在工作線程阻塞的情況下,也只是阻塞在線程范圍內,關於這部分內容,可以參考《C++網絡編程》第一卷的第五章。在應用層和內核之間的線程比例為1:1的操作系統線程機制中,一個線程在內核中會有一個內核線程實例,那么就是說,如果這個線程阻塞,不會引起在同一個進程里面的線程也阻塞。現在大多是的操作系統采用的都是 1:1的模型,但是這個比傳統的N:1模型更消耗資源。 N:1模型就是,在應用層級別的多個線程在操作系統中只有一個實例,可以看做一個組,一旦一個線程阻塞,這個工作組的其他線程都會阻塞。
故上述代碼的 do_handler_fd( fd ) 里面的系統調用如果阻塞,不會引起整個進程阻塞,線程的阻塞只是在線程范圍內。所以,主線程可以一直等待客戶連接,而把工作處理過程放到線程中去。
這個是傳統的線程方式,這種方式也會帶來一些問題:
(1)工作開銷過大,線程的頻繁創建的銷毀也是一個很消耗資源的過程,雖然較進程小很多。
(2)對於臨界資源的訪問需要控制加鎖等操作,加大了程序設計的復雜性。
(3)一個線程的崩潰會導致整個進程的崩潰,比如調用了exit() 函數等,雖然阻塞操作只阻塞一個線程,但是其他一些系統調用的失敗或崩潰將導致服務器整個down機。后果不堪設想。
但是在很多地方也提到了,多線程的方式適合IO密集型的程序,比如大文件傳輸等,這樣可以在用戶看來所有的操作都是並行的。
下面來說說線程池的方式,它改進了上述的問題的第一個,頻繁的創建線程。
線程池的基本思想就是預先創建一部分線程,然后等到任務來的時候,通過條件變量或者其他的機制來喚醒一個工作線程。
下面詳細的講述一下前段時間寫的一個簡單的線程池方案。
線程池有一個任務隊列,即由任務對象組成的一組隊列。
我們為這個任務隊列提供兩個接口:
void mc_thread_pool_add_task(void *task , size_t tasksize )
解釋一下這個接口的含義和參數, task 是一個指向任務實例的指針,tasksize 一般取 sizeof( instance_task ) 為的是在加入任務隊列的時候隊列的一些其他操作。為了簡單化,這里沒有提供任務優先級的考慮。
void *mc_thread_pool_get_task()
這個函數用來取得一個指向任務實例的指針,然后可以操作這個任務。
一般情況下,由主線程調用第一個函數,而工作線程調用第二個函數。
我們來看看線程池的結構:
typedef struct _thread_pool_t
{
pthread_mutex_t queue_lock ;
pthread_cond_t task_cond ;
list_t * tasks // treat it as queue thread_task_t type
pthread_t * pthreads ;
int isdestoried;
int workersnum ;
char ready ;
thread_task_handler thread_pool_task_handler;
}thread_pool_t;
/*
* this structure is a global control block of threads poll
* as you can see , queue_lock and task_cond is define to protecte access of this whole poll
* and task_cond is used to signal to threads that the task queue is ready
* tasks is a queue of tasks , each task should posted to this queue and threads
* in this pool can get it , we defined this task as void * to use wildly
* isdestoried is a boolean flag as his/her name
* workersnum is the total number of threads
* ready is a flag also and used to judge if the tasks queue is ready
* thread_pool_task_handler is a function point which points to the task handler you defined
*/
在線程池的結構中,我們定義了兩個變量, queue_lock 和 task_cond
一個是鎖,用來控制線程對於 task 任務隊列的訪問,另一個 task_cond 用來喚醒工作線程。
說說基本原理:工作線程默認情況下是阻塞在 pthread_cond_wait() 系統調用下的,如果有任務到來,我們可用使用 pthread_cond_singal() 來喚醒一個處於阻塞狀態的線程,這樣這個線程就可以執行 mc_thread_pool_get_task() 來取得一個任務,並調用相應的回調函數。
tasks就是上面所說的任務隊列,pthreads是一個pthread_t 的數組,也就是用來標示線程id 的數組。每一次創建線程的時候都會返回線程id,所以我們需要記錄。
ready 是一個flag , 標示是否任務隊列可用。thread_task_handler 是一個函數指針,定義是這樣的:
typedef void ( *thread_task_handler )( void * args ) ;
結構體里的 thread_pool_task_handler 就是在初始化的時候設置的線程的執行體。
下面看看初始化函數:
void mc_thread_pool_ini( mc_thread_pool_t * par_tp , int workersnum ,thread_task_handler par_handler )
{
int err ;
//par_tp = ( thread_pool_t *)malloc( sizeof(thread_pool_t) );
if( par_tp == NULL )
{
fprintf( stderr , "thread_pool_t malloc\n");
return ;
}
par_tp->workersnum = workersnum ;
pthread_mutex_init( &par_tp->queue_lock ,NULL );
pthread_cond_init(&par_tp->task_cond , NULL );
/*
par_tp->queue_lock = PTHREAD_MUTEX_INITIALIZER ;
par_tp->task_cond = PTHREAD_COND_INITIALIZER ;
*/
par_tp->tasks = mc_listcreate() ;
if( par_tp->tasks == NULL )
{
fprintf( stderr , "listcreate() error\n");
//free( par_tp ) ;
return ;
}
par_tp->pthreads = ( pthread_t *)malloc( sizeof( pthread_t )*workersnum );
if( par_tp->pthreads == NULL )
{
fprintf( stderr , "pthreads malloc\n");
//free( par_tp );
mc_freelist( par_tp->tasks ) ;
return NULL ;
}
int i = 0 ;
for( ; i < workersnum ; i++ )
{
fprintf(stderr,"start to create threads\n");
err = pthread_create(&(par_tp->pthreads[i]),NULL,mc_thread_entry,NULL) ;
if( err == -1 )
{
fprintf( stderr , "pthread_create error\n");
//free( par_tp );
mc_freelist( par_tp->tasks ) ;
free(par_tp->pthreads) ;
}
}
par_tp->thread_pool_task_handler = par_handler ;
par_tp->ready = 0 ;
fprintf(stderr,"successed to create threads\n");
}
在初始化函數中,我們傳遞了一個函數執行體的入口點,也就是函數指針給線程池,當我們有任務的時候,一個線程被喚醒,執行相應的回調函數。
其他需要注意的地方是使用 for循環來創建很多的線程,並利用數組方式記錄了線程的id 。
創建線程時候的回調函數並不是我們的參數傳遞的回調函數地址。因為在創建線程好線程的時候,我們需要一個阻塞操作,使得線程處於睡眠狀態,不然函數執行完畢后線程就退出了。所以,創建線程時候的回調函數是這樣的:
static void *mc_thread_entry( void *args )
{
void * task ;
for(;;)
{
pthread_mutex_lock( &mc_global_threads_pool.queue_lock ) ;
fprintf(stderr, " locked to wait task\n");
while( mc_global_threads_pool.ready == 0 )
{
pthread_cond_wait( &mc_global_threads_pool.task_cond , &mc_global_threads_pool.queue_lock ) ;
}
task = mc_thread_pool_get_task() ;
fprintf(stderr, "get a task and ready to unlock \n");
pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ) ;
mc_global_threads_pool.thread_pool_task_handler( task ) ;
}
}
需要注意的一點是,我們要用兩個變量來判斷一個隊列是否就緒,ready 和條件變量本身。
判斷條件是 while() 而不是 if,這樣可以使得線程在沒有工作任務的時候,也就是工作隊列為空的時候阻塞在 pthread_cond_wait 上,關於pthread_cond_wait 的工作機制可以參考IBM developerworks上的很多好文章。
pthread_cond_wait 在發現沒有任務的時候,條件不成立的時候,是會有一個默認的操作的,就是釋放鎖,第二個參數的鎖,使得其他線程可以得到condition 的競爭權利。所以我們在函數體內 pthread_cond_wait 的調用上下有一個加鎖和釋放鎖的操作。
在函數內部有一個 mc_global_threads_pool.thread_pool_task_handler( task ) 這個操作就是線程內部得到了任務后調用回調函數過程。
將任務隊列加入的函數實例如下:
void mc_thread_pool_add_task(void *task , size_t tasksize )
{
pthread_mutex_lock( &mc_global_threads_pool.queue_lock );
fprintf( stderr ,"thread locked and append to list\n");
mc_list_append( mc_global_threads_pool.tasks , task , tasksize ) ;
pthread_mutex_unlock( &mc_global_threads_pool.queue_lock );
fprintf( stderr ,"thread unlocked and successed append to list\n");
mc_global_threads_pool.ready = 1 ;
if( mc_global_threads_pool.ready == 1 )
{
fprintf( stderr ,"signal to threads\n");
pthread_cond_signal( &mc_global_threads_pool.task_cond ) ;
}
}
這里使用了 ready 來判斷是有任務,如果有,使用 pthread_cond_signal 來喚醒一個等待的線程。
取得一個隊列的任務方式很簡單,直接返回隊列的第一個任務:
void *mc_thread_pool_get_task()
{
void * ret_task ;
ret_task = mc_getnode_del( mc_global_threads_pool.tasks , 0 );
if( ret_task == NULL )
{
fprintf(stderr,"get node_del error\n");
}
fprintf( stderr ," got a task\n");
mc_global_threads_pool.ready = 0 ;
if( ret_task == NULL )
{
fprintf(stderr, "getnode_del error\n");
return NULL ;
}
else
return ret_task ;
}
主體框架是這樣的:
定義一個自己的task結構體比如:
typedef struct _thread_task_t
{
int task_num ;
}mc_thread_task_t ;
定義自己的回調函數:
void my_thread_task_handler( void * task )
{
fprintf(stderr,"task->tasknum %d\n",((mc_thread_task_t *)task)->task_num );
/*
* if the task is a event we can like this demo:
* (event_t *)task->handler( (event_t *)task );
* so in event_t structure there should be a callback called handler
*/
}
函數主體就是這樣:
int main()
{
mc_thread_task_t ltask;
ltask.task_num = 1 ;
fprintf(stderr,"begin to ini pool\n");
mc_thread_pool_ini( &mc_global_threads_pool , 20 , my_thread_task_handler );
mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) );
int i = 0 ;
for(;i < 10000; i++)
{
ltask.task_num = i ;
mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) );
sleep(1);
}
return 0;
}
線程池初始化的時候所傳入的結構體就是自己定義的 task 的回調函數。
上述所說的是線程池一個方案。回到我們的服務端模型上來看。
我們的服務端的改寫方式可以換成這樣:
定義只的一個任務結構,比如說,我們定義為:
struct task
{
int fd ;
}
void *task_handler( void *task )
{
int fd = *(int *)task ;
do_handler_fd( fd );
}
好了,我們的服務器主體框架可以是這樣:
mc_thread_pool_ini( &mc_global_threads_pool , N , task_handler ); // 第二個參數為線程池工作線程數
for(;;)
{
fd = accept();
struct task * newtask = ( struct task *)malloc( sizeof(struct task) );
newtask->fd = fd ;
mc_thread_pool_add_task( &newtask,sizeof(struct task*) ); //將newtask 指針加入隊列,而不是實例,可以減少隊列的存儲空間
}
總結:
線程池的方案能夠減少線程創建時候帶來的開銷,但是對於臨界資源的訪問控制等變得更加的復雜,考慮的因素更多。這里沒有完整的貼出線程池的代碼。上述模型在平常使用的過程中適合並發連接數目不大的情況,IO密集型。對於CPU 密集型的服務端,線程池返回會加大資源消耗。下一篇文章我們來看看反應堆模型,異步事件驅動,非阻塞IO,並貼出一個簡單的 epoll 的反應堆。
