基本的架構是 epoll+線程池。
這篇博文主要從以下幾個方面進行闡述:
(1)reactor模式的一個介紹:(只要是我的理解)
(2)關於線程池的說明。
(3)如何將epoll + 池結合起來實現一個群聊
一. reactor 模式:
從我個人的理解角度,所謂的reactor模式類似於:
場景:銀行, 和三個業務工作人員 ,一個接待,有很多人在等待。
當你進去的時候,銀行的接待會給你一個編號,這就是你第幾個才會被業務工作人員接待。
這個時候,你就進入了等待的狀態。直到輪流到你了,三個業務工作人員中的一個就會幫助你處理你的問題。
類比到計算機就是:作為接待只是給你發了一個編號,她並不關心你要處理什么業務,之后又去給后續進來的人發送編號。她就不在乎什么時候你的業務才會被處理到,至於你的業務被處理的時候,就屬於業務處理人員。當然業務處理人員也不會關心給進來的人發編號,他只關心你當前要處理的業務是什么。
這種模式的優點在於:主線程只監聽當前套接字是否可讀或者可寫,至於你要處理什么事情,就交給工作線程了。
估計現在本來清晰的你,已經被我弄糊塗了,那就用圖說明:
IO模型實現reactor 模式的工作流程:
(1)主線程向epoll內核事件表內注冊socket上的可讀就緒事件。
(2)主線程調用epoll_wait()等待socket上有數據可讀
(3)當socket上有數據可讀,epoll_wait 通知主線程。主線程從socket可讀事件放入請求隊列。
(4)睡眠在請求隊列上的某個可讀工作線程被喚醒,從socket上讀取數據,處理客戶的請求。
然后向 epoll內核事件表里注冊寫的就緒事件
(5)主線程調用epoll_wait()等待數據可寫 。
以上就是關於Reactor模式的一個說明,下面就來看線程池的說明。
(二)線程池:
之所以會出現池這個概念就是:單個任務處理事件比較短,需要處理的任務有比較多。使用線程池可以減少在創建和撤銷線程上花費的時間以及系統資源的開銷。如果不使用線程池有可能創建大量的線程而消耗完系統資源以及過度的切換。
線程池的概念是從一個很簡單的模型開始的,那就是生產者和消費者思想,這個模型我相信很多學過操作系統的人,都知道。主線程就相當於是生產者,而我們自己創建的大量的線程就相當於消費者(工作線程),生產者將自己需要處理的業務放到一個任務隊列中,而工作線程從任務隊列中取出任務,然后又加以處理。我們創建的大量線程通過一個池的東西維護起來,這個池里面包含我們創建的線程,還有那個工作的隊列,互斥鎖,條件變量等等很多的東西。
(三)epoll + 池的結合
#include <arpa/inet.h> #include <unistd.h> #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <pthread.h> #include <fcntl.h> #include <assert.h> #include <errno.h> #include <netinet/in.h> #include "thread_pool.h" #include "thread_pool.c" #define MAX_EVENT_NUMBER 1000 #define SIZE 1024 #define MAX 10 //從主線程向工作線程數據結構 struct fd { int epollfd; int sockfd ; }; //用戶說明 struct user { int sockfd ; //文件描述符 char client_buf [SIZE]; //數據的緩沖區 }; struct user user_client[MAX]; //定義一個全局的客戶數據表 //由於epoll設置的EPOLLONESHOT模式,當出現errno =EAGAIN,就需要重新設置文件描述符(可讀) void reset_oneshot (int epollfd , int fd) { struct epoll_event event ; event.data.fd = fd ; event.events = EPOLLIN|EPOLLET|EPOLLONESHOT ; epoll_ctl (epollfd , EPOLL_CTL_MOD, fd , &event); } //向epoll內核事件表里面添加可寫的事件 int addreadfd (int epollfd , int fd , int oneshot) { struct epoll_event event ; event.data.fd = fd ; event.events |= ~ EPOLLIN ; event.events |= EPOLLOUT ; event.events |= EPOLLET; if (oneshot) { event.events |= EPOLLONESHOT ; //設置EPOLLONESHOT } epoll_ctl (epollfd , EPOLL_CTL_MOD ,fd , &event); } //群聊函數 int groupchat (int epollfd , int sockfd , char *buf) { int i = 0 ; for ( i = 0 ; i < MAX ; i++) { if (user_client[i].sockfd == sockfd) { continue ; } strncpy (user_client[i].client_buf ,buf , strlen (buf)) ; addreadfd (epollfd , user_client[i].sockfd , 1); } } //接受數據的函數,也就是線程的回調函數 int funcation (void *args) { int sockfd = ((struct fd*)args)->sockfd ; int epollfd =((struct fd*)args)->epollfd; char buf[SIZE]; memset (buf , '\0', SIZE); printf ("start new thread to receive data on fd :%d\n", sockfd); //由於我將epoll的工作模式設置為ET模式,所以就要用一個循環來讀取數據,防止數據沒有讀完,而丟失。 while (1) { int ret = recv (sockfd ,buf , SIZE-1 , 0); if (ret == 0) { close (sockfd); break; } else if (ret < 0) { if (errno == EAGAIN) { reset_oneshot (epollfd, sockfd); //重新設置(上面已經解釋了) break; } } else { printf (" read data is %s\n", buf); sleep (5); groupchat (epollfd , sockfd, buf ); } } printf ("end thread receive data on fd : %d\n", sockfd); } //這是重新注冊,將文件描述符從可寫變成可讀 int addagainfd (int epollfd , int fd) { struct epoll_event event; event.data.fd = fd ; event.events |= ~EPOLLOUT ; event.events = EPOLLIN|EPOLLET|EPOLLONESHOT; epoll_ctl (epollfd , EPOLL_CTL_MOD , fd , &event); } //與前面的解釋一樣 int reset_read_oneshot (int epollfd , int sockfd) { struct epoll_event event; event.data.fd = sockfd ; event.events = EPOLLOUT |EPOLLET |EPOLLONESHOT ; epoll_ctl (epollfd, EPOLL_CTL_MOD , sockfd , &event); return 0 ; } //發送讀的數據 int readfun (void *args) { int sockfd = ((struct fd *)args)->sockfd ; int epollfd= ((struct fd*)args)->epollfd ; int ret = send (sockfd, user_client[sockfd].client_buf , strlen (user_client[sockfd].client_buf), 0); //發送數據 if (ret == 0 ) { close (sockfd); printf ("發送數據失敗\n"); return -1 ; } else if (ret == EAGAIN) { reset_read_oneshot (epollfd , sockfd); printf("send later\n"); return -1; } memset (&user_client[sockfd].client_buf , '\0', sizeof (user_client[sockfd].client_buf)); addagainfd (epollfd , sockfd);//重新設置文件描述符 } //套接字設置為非阻塞 int setnoblocking (int fd) { int old_option = fcntl (fd, F_GETFL); int new_option = old_option|O_NONBLOCK; fcntl (fd , F_SETFL , new_option); return old_option ; } int addfd (int epollfd , int fd , int oneshot) { struct epoll_event event; event.data.fd = fd ; event.events = EPOLLIN|EPOLLET ; if (oneshot) { event.events |= EPOLLONESHOT ; } epoll_ctl (epollfd , EPOLL_CTL_ADD ,fd , &event); setnoblocking (fd); return 0 ; } int main(int argc, char *argv[]) { struct sockaddr_in address ; const char *ip = "127.0.0.1"; int port = 8086 ; memset (&address , 0 , sizeof (address)); address.sin_family = AF_INET ; inet_pton (AF_INET ,ip , &address.sin_addr); address.sin_port =htons( port) ; int listenfd = socket (AF_INET, SOCK_STREAM, 0); assert (listen >=0); int reuse = 1; setsockopt (listenfd , SOL_SOCKET , SO_REUSEADDR , &reuse , sizeof (reuse)); //端口重用,因為出現過端口無法綁定的錯誤 int ret = bind (listenfd, (struct sockaddr*)&address , sizeof (address)); assert (ret >=0 ); ret = listen (listenfd , 5); assert (ret >=0); struct epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create (5); //創建內核事件描述符表 assert (epollfd != -1); addfd (epollfd , listenfd, 0); thpool_t *thpool ; //線程池 thpool = thpool_init (5) ; //線程池的一個初始化 while (1) { int ret = epoll_wait (epollfd, events, MAX_EVENT_NUMBER , -1);//等待就緒的文件描述符,這個函數會將就緒的復制到events的結構體數組中。 if (ret < 0) { printf ("poll failure\n"); break ; } int i =0 ; for ( i = 0 ; i < ret ; i++ ) { int sockfd = events[i].data.fd ; if (sockfd == listenfd) { struct sockaddr_in client_address ; socklen_t client_length = sizeof (client_address); int connfd = accept (listenfd , (struct sockaddr*)&client_address,&client_length); user_client[connfd].sockfd = connfd ; memset (&user_client[connfd].client_buf , '\0', sizeof (user_client[connfd].client_buf)); addfd (epollfd , connfd , 1);//將新的套接字加入到內核事件表里面。 } else if (events[i].events & EPOLLIN) { struct fd fds_for_new_worker ; fds_for_new_worker.epollfd = epollfd ; fds_for_new_worker.sockfd = sockfd ; thpool_add_work (thpool, (void*)funcation ,&fds_for_new_worker);//將任務添加到工作隊列中 }else if (events[i].events & EPOLLOUT) { struct fd fds_for_new_worker ; fds_for_new_worker.epollfd = epollfd ; fds_for_new_worker.sockfd = sockfd ; thpool_add_work (thpool, (void*)readfun , &fds_for_new_worker );//將任務添加到工作隊列中 } } } thpool_destory (thpool); close (listenfd); return EXIT_SUCCESS; }
線程池的代碼在這里:
#include <unistd.h> #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <semaphore.h> #include "thread_pool.h" static int thpool_keepalive = 1 ; //線程池保持存活 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER ; //靜態賦值法初始化互斥鎖 thpool_t * thpool_init (int threadsN){ thpool_t *tp_p ; if (!threadsN || threadsN < 1){ threadsN = 1 ; } tp_p = (thpool_t *)malloc (sizeof (thpool_t)) ; if (tp_p == NULL){ fprintf (stderr ,"thpool_init (): could not allocate memory for thread pool\n"); return NULL ; } tp_p->threads = (pthread_t *)malloc (threadsN * sizeof (pthread_t)); if (tp_p->threads == NULL){ fprintf( stderr , "could not allocation memory for thread id\n"); return NULL; } tp_p->threadsN = threadsN ; if (thpool_jobqueue_init (tp_p) == -1){ fprintf (stderr ,"could not allocate memory for job queue\n"); return NULL; } /*初始化信號*/ tp_p->jobqueue->queueSem = (sem_t *)malloc (sizeof (sem_t)); /*定位一個匿名信號量,第二個參數是1表示。這個信號量將在進程內的線程是共享的,第三個參數是信號量的初始值*/ sem_init (tp_p->jobqueue->queueSem, 0 , 0 ); int t ; for (t = 0 ; t < threadsN ; t++){ printf ("Create thread %d in pool\n", t); //第四個參數是傳遞給函數指針的一個參數,這個函數指針就是我們所說的線程指針 if (pthread_create (&(tp_p->threads[t]) , NULL , (void *) thpool_thread_do , (void *)tp_p)){ free (tp_p->threads); free (tp_p->jobqueue->queueSem); free (tp_p->jobqueue); free (tp_p); } } return tp_p ; } /* * 初始化完線程應該處理的事情 * 這里存在兩個信號量, */ void thpool_thread_do (thpool_t *tp_p){ while (thpool_keepalive) { if (sem_wait (tp_p->jobqueue->queueSem)) //如果工作隊列中沒有工作,那么所有的線程都將在這里阻塞,當他調用成功的時候,信號量-1 { fprintf(stderr , "Waiting for semaphore\n"); exit (1); } if (thpool_keepalive) { void *(*func_buff) (void *arg); void *arg_buff; thpool_job_t *job_p; pthread_mutex_lock (&mutex); job_p = thpool_jobqueue_peek (tp_p); func_buff = job_p->function ; arg_buff= job_p->arg ; thpool_jobqueue_removelast (tp_p); pthread_mutex_unlock (&mutex); func_buff (arg_buff); free (job_p); } else { return ; } } return ; } int thpool_add_work (thpool_t *tp_p ,void * (*function_p )(void *), void *arg_p){ thpool_job_t *newjob ; newjob = (thpool_job_t *)malloc (sizeof (thpool_job_t)); if (newjob == NULL) { fprintf (stderr,"couldnot allocate memory for new job\n"); exit (1); } newjob->function = function_p ; newjob->arg = arg_p ; pthread_mutex_lock (&mutex); thpool_jobqueue_add (tp_p ,newjob); pthread_mutex_unlock (&mutex); return 0 ; } void thpool_destory (thpool_t *tp_p){ int t ; thpool_keepalive = 0 ; //讓所有的線程運行的線程都退出循環 for (t = 0 ; t < (tp_p->threadsN) ; t++ ){ //sem_post 會使在這個線程上阻塞的線程,不再阻塞 if (sem_post (tp_p->jobqueue->queueSem) ){ fprintf (stderr,"thpool_destory () : could not bypass sem_wait ()\n"); } } if (sem_destroy (tp_p->jobqueue->queueSem)!= 0){ fprintf (stderr, "thpool_destory () : could not destroy semaphore\n"); } for (t = 0 ; t< (tp_p->threadsN) ; t++) { pthread_join (tp_p->threads[t], NULL); } thpool_jobqueue_empty (tp_p); free (tp_p->threads); free (tp_p->jobqueue->queueSem); free (tp_p->jobqueue); free (tp_p); } int thpool_jobqueue_init (thpool_t *tp_p) { tp_p->jobqueue = (thpool_jobqueue *)malloc (sizeof (thpool_jobqueue)); if (tp_p->jobqueue == NULL) { fprintf (stderr ,"thpool_jobqueue malloc is error\n"); return -1 ; } tp_p->jobqueue->tail = NULL ; tp_p->jobqueue->head = NULL ; tp_p->jobqueue->jobsN = 0 ; return 0 ; } void thpool_jobqueue_add (thpool_t *tp_p , thpool_job_t *newjob_p){ newjob_p->next = NULL ; newjob_p->prev = NULL ; thpool_job_t *oldfirstjob ; oldfirstjob = tp_p->jobqueue->head; switch (tp_p->jobqueue->jobsN) { case 0 : tp_p->jobqueue->tail = newjob_p; tp_p->jobqueue->head = newjob_p; break; default : oldfirstjob->prev= newjob_p ; newjob_p->next = oldfirstjob ; tp_p->jobqueue->head= newjob_p; break; } (tp_p->jobqueue->jobsN)++ ; sem_post (tp_p->jobqueue->queueSem); //原子操作,信號量增加1 ,保證線程安全 int sval ; sem_getvalue (tp_p->jobqueue->queueSem , &sval); //sval表示當前正在阻塞的線程數量 } int thpool_jobqueue_removelast (thpool_t *tp_p){ thpool_job_t *oldlastjob , *tmp; oldlastjob = tp_p->jobqueue->tail ; switch (tp_p->jobqueue->jobsN) { case 0 : return -1 ; break; case 1 : tp_p->jobqueue->head = NULL ; tp_p->jobqueue->tail = NULL ; break; default : tmp = oldlastjob->prev ; tmp->next = NULL ; tp_p->jobqueue->tail = oldlastjob->prev; } (tp_p->jobqueue->jobsN) -- ; int sval ; sem_getvalue (tp_p->jobqueue->queueSem, &sval); return 0 ; } thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p){ return tp_p->jobqueue->tail ; } void thpool_jobqueue_empty (thpool_t *tp_p) { thpool_job_t *curjob; curjob = tp_p->jobqueue->tail ; while (tp_p->jobqueue->jobsN){ tp_p->jobqueue->tail = curjob->prev ; free (curjob); curjob = tp_p->jobqueue->tail ; tp_p->jobqueue->jobsN -- ; } tp_p->jobqueue->tail = NULL ; tp_p->jobqueue->head = NULL ; }
線程池的.h文件
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <pthread.h> #include <semaphore.h> /*Individual job*/ typedef struct thpool_job_t { void (*function)(void* arg); //函數指針 void *arg ; //函數的參數 struct tpool_job_t *next ; //指向下一個任務 struct tpool_job_t *prev ; //指向前一個任務 }thpool_job_t ; /*job queue as doubly linked list*/ typedef struct thpool_jobqueue { thpool_job_t *head ; //隊列的頭指針 thpool_job_t *tail; //對列的尾指針 int jobsN; //隊列中工作的個數 sem_t *queueSem; //原子信號量 }thpool_jobqueue; /*thread pool*/ typedef struct thpool_t { pthread_t *threads ; //線程的ID int threadsN ; //線程的數量 thpool_jobqueue *jobqueue; //工作隊列的指針 }thpool_t; /*線程池中的線程都需要互斥鎖和指向線程池的一個指針*/ typedef struct thread_data{ pthread_mutex_t *mutex_p ; thpool_t *tp_p ; }thread_data; /* * 初始化線程池 * 為線程池, 工作隊列, 申請內存空間,信號等申請內存空間 * @param :將被使用的線程ID * @return :成功返回的線程池結構體,錯誤返回null */ thpool_t *thpool_init (int threadsN); /* * 每個線程要做的事情 * 這是一個無止境循環,當撤銷這線程池的時候,這個循環才會被中斷 *@param: 線程池 *@return:不做任何的事情 */ void thpool_thread_do (thpool_t *tp_p); /* *向工作隊列里面添加任何 *采用來了一個行為和他的參數,添加到線程池的工作對列中去, * 如果你想添加工作函數,需要更多的參數,通過傳遞一個指向結構體的指針,就可以實現一個接口 * ATTENTION:為了不引起警告,你不得不將函數和參數都帶上 * * @param: 添加工作的線程線程池 * @param: 這個工作的處理函數 * @param:函數的參數 * @return : int */ int thpool_t_add_work (thpool_t *tp_p ,void* (*function_p) (void *), void* arg_p ); /* *摧毀線程池 * *這將撤銷這個線程池和釋放所申請的內存空間,當你在調用這個函數的時候,存在有的線程還在運行中,那么 *停止他們現在所做的工作,然后他們被撤銷掉 * @param:你想要撤銷的線程池的指針 */ void thpool_destory (thpool_t *tp_p); /*-----------------------Queue specific---------------------------------*/ /* * 初始化隊列 * @param: 指向線程池的指針 * @return :成功的時候返回是 0 ,分配內存失敗的時候,返回是-1 */ int thpool_jobqueue_init (thpool_t *tp_p); /* *添加任務到隊列 *一個新的工作任務將被添加到隊列,在使用這個函數或者其他向別的類似這樣 *函數 thpool_jobqueue_empty ()之前,這個新的任務要被申請內存空間 * * @param: 指向線程池的指針 * @param:指向一個已經申請內存空間的任務 * @return nothing */ void thpool_jobqueue_add (thpool_t * tp_p , thpool_job_t *newjob_p); /* * 移除對列的最后一個任務 *這個函數將不會被釋放申請的內存空間,所以要保證 * *@param :指向線程池的指針 *@return : 成功返回0 ,如果對列是空的,就返回-1 */ int thpool_jobqueue_removelast (thpool_t *tp_p); /* *對列的最后一個任務 *在隊列里面得到最后一個任務,即使隊列是空的,這個函數依舊可以使用 * *參數:指向線程池結構體的指針 *返回值:得到隊列中最后一個任務的指針,或者在對列是空的情況下,返回是空 */ thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p); /* *移除和撤銷這個隊列中的所有任務 *這個函數將刪除這個隊列中的所有任務,將任務對列恢復到初始化狀態,因此隊列的頭和對列的尾都設置為NULL ,此時隊列中任務= 0 * *參數:指向線程池結構體的指針 * */ void thpool_jobqueue_empty (thpool_t *tp_p); #endif