[原]淺談幾種服務器端模型——反應堆的設計


引言:持續更新了一段時間的博客,今天把最后一點部分加上,一個簡單的反應堆的實現,基於epoll,工作過程上一篇博文已經有所介紹。

需要再次提到的就是關於反應堆的使用方式:

注冊事件(為需要監聽的fd加入回調函數)----->將事件加入反應堆------>開始事件循環------>事件發生,調用回調函數

 

第一次加入的描述符可以為監聽描述符,即由 socket() 函數創建,當這個描述符有事件發生,意味着有新的連接的到來,調用回調函數handler_accept() 其中這個函數里面涉及到調用 accept()系統調用和為這個新連接分配實例,然后設置這個連接的回調函數,即 handler_read() handler_write() 等,設置完后相應的連接描述符如果有事件發生,即可以調用相應的不同種事件的回調函數。這個是我們的總體思路,如果需要多進程方式,可以創建多進程,然后每個進程不同的反應堆,但是需要注意的是,如果父子進程共享監聽描述符,會引起進程組的驚群現象,就是說,每一個進程都可能會嘗試 accept() 這個新的連接,那么這種方式設計的時候需要為 accept() 加鎖,具體方式可以看Nginx關於這方面的設計,不僅實現了 accept() 鎖,還達到了進程間的負債均衡。

關於連接對象池,可以下次寫一篇博文介紹一下對象池的設計方式。

 

言歸正傳,說說反應堆的設計方式,如果需要方便的話,可以使用libevent..memcached的網絡模塊就是基於這個,相信大家也知道。

從上上篇關於 epoll 的介紹中就可以看到,事件是整個系統設計的核心,我們的整個反應堆都是圍繞一個叫做事件的東西來做相應的處理。

那么,作為應用層的事件,可以這么說來,就是那個地方發生了某件事情,而這個事情是在我們的規定范圍的,然后,我們需要知道這個事件給予了我們什么樣的權利,比如說,我們可以讀,可以寫,可以操作等等.

(下面不加說明,都是以 epoll 為例 )

操作系統為我們提供的接口也是類似的操作

創建一個 epoll -----> 注冊相應的事件 epoll_ctl() -----> 進入事件循環監聽事件(可能超時返回 )epoll_wait() ----->返回事件 ( event_list[i].event && EPOLLIN )等

我們需要做的事情就是,為這個流程的事件做一個封裝,並能夠有效的管理整個事件,而不是離散的處理。

看看 event的封裝:

typedef struct mc_event_s
{
	
	  struct mc_event_s   *next	   ;
	 
	 
	  struct mc_event_s   *prev    ;
	
	 unsigned int min_heap_index  ;
	 
	 int ev_fd		;   // file des of event
	 short revent	;   // event type
	 
	 struct timeval  ev_timeval   ; // event timeout time 
	 mc_ev_callback callback ;// callback of this event 
	 void  *args 			      ;
	 int ev_flags 			      ;
	 
	 mc_event_base_t	*base	  ;
}mc_event_t ;

兩個指針分別指向事件的前部和后部,標准的雙向隊列方式,沒什么可說的。

min_heap_index 是作為超時管理的最小堆的下標,目前還沒有最這方面的設計,可以先忽略。

ev_fd 是作為這個事件的描述符本身, callback 是注冊在事件上的回調函數, ev_flags 是事件的狀態,由宏定義為:

#define MC_EV_INITD	   0x0001
#define MC_EV_ADDED    0x0002
#define MC_EV_ACTIVE   0x0004
#define MC_EV_DELED	   0x0008	

狀態分為 是否初始化,是否加入了隊列,是否加入了激活的隊列,已刪除。

這里注意的是事件分為兩個隊列,一個是已加入的,另一個是激活的。我們在處理的時候會將accept()返回的時間加入到已加入的隊列,當有事件發生,將這個事件加入到激活的事件隊列中,然后依次輪訓處理每一個激活的事件。

整個反應堆需要一個控制塊,也就是反應堆的實例,結構是像這樣:

typedef struct mc_event_base_s
{
	void		 * 	added_list		;
	void	     *	active_list	    ;
	unsigned int 	event_num		;
	unsigned int 	event_active_num;
	
	/*
	 *mc_minheap	    minheap			;
	 */
	int				epoll_fd	    ;  //for epoll only 
	int			    ev_base_stop	;
	int				magic		    ;
	struct timeval	event_time		;	
}mc_event_base_t ;

可以看到,反應堆中維護了兩個隊列, added_list 和 active_list 為的是能夠有效控制所有的事件。

event_num是事件個數, event_active_num 是已激活的事件個數

epoll_fd 是由epoll_create()創建的句柄,這里沒有加入宏定義來區分是否操作系統有 epoll

可以這樣:

#if (HAVE_EPOLL)

    int   epoll_fd ;

#endif

不同的IO多路復用方式不同,操作句柄也不一樣。

ev_base_stop是用來判斷是否停止的標志位, magic 被定義為一個宏:

#define MC_BASE_MAGIC  0x1989

用來判斷整個反應堆是否初始化。

 

為事件的封裝提供了幾種操作,初始化,加入隊列,刪除事件,改變事件類型,循環監聽事件。  

然后將struct mc_event_ops 中的函數指針與實際的操作分開,類似於HOOK 方式,這樣做的目的是為不同的底層IO多路復用提供了統一的接口,如 select(),epoll(),kqueue()等。

typedef  struct mc_event_ops 
{
	void * (*init)( mc_event_base_t *  )  ;
	int	   (*add)( void * , mc_event_t * );
	int	   (*del)( void * , mc_event_t * );
	int	   (*mod)( void * , mc_event_t * );
	int	   (*dispatch)( void * , mc_event_base_t * ,struct timeval ) ;
}mc_event_opt    ;
	/*
	 * Functions point of events option
	 * there points will point to a instance of function
	 * and other module call there function by ops instance 
	 */




extern mc_event_opt mc_event_op_val ;



#define mc_event_ini  mc_event_op_val.init 
#define mc_event_add  mc_event_op_val.add 
#define mc_event_del  mc_event_op_val.del 
#define mc_event_mod  mc_event_op_val.mod
#define mc_event_loop mc_event_op_val.dispatch

  

看看實際的操作事件方式,我們為這幾個操作添加了epoll 的鈎子,具體是下面這樣:

對應mc_event_ops的第一個鈎子

void *mc_epoll_init( mc_event_base_t *meb )
{
	if( meb->magic != MC_BASE_MAGIC )
	{
		fprintf(stderr,"In function mc_epoll_init %d , %s ",__LINE__,__FILE__);
		return NULL ;
	}
	meb->epoll_fd = epoll_create( MC_EVENT_MAX ) ;
	return meb ;
}

方式很簡單,調用 epoll_create() 創建一個 epoll 實例,並把這個實例的句柄賦給反應堆。

第二個加入,對應mc_event_ops的第二個鈎子

int mc_epoll_add( void * arg ,mc_event_t *ev )
{
	if( ev->base->magic != MC_BASE_MAGIC ) 
	{
		fprintf(stderr,"In function mc_epoll_add %d , %s ",__LINE__,__FILE__);
		return -1 ;
	}
	mc_event_base_t *base = ev->base ;
	int epoll_fd = base->epoll_fd ;
	int err ;
	
	
	struct epoll_event epoll_ev  ;	
	epoll_ev.data.ptr = ev ;
	epoll_ev.events = EPOLLIN|EPOLLET ;
	
	if( !( ev->ev_flags & MC_EV_ADDED ) )
	{
		err = epoll_ctl( epoll_fd , EPOLL_CTL_ADD , ev->ev_fd , &epoll_ev ); 
		if( err != 0 )
		{
			perror("epoll_ctl");
			fprintf(stderr, "In function mc_epoll_add the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
			return -1;
		}
		ev->ev_flags |=	MC_EV_ADDED ;
	}
	return 0;
}

具體來說是調用了 epoll_ctl 並設置宏為 EPOLL_CTL_ADD然后設置事件類型

第三個:

int mc_epoll_del( void * arg ,mc_event_t *ev )
{
	if( ev->base->magic != MC_BASE_MAGIC ) 
	{
		fprintf(stderr,"In function mc_epoll_add %d , %s ",__LINE__,__FILE__);
		return -1 ;
	}
	mc_event_base_t *base = ev->base ;
	int epoll_fd = base->epoll_fd ;
	int err ;
	
	if( !(ev->ev_flags & MC_EV_INITD) )
	{
		return -1 ;
	}
	err = epoll_ctl( epoll_fd , EPOLL_CTL_DEL , ev->ev_fd , NULL ); 
	if( err != 0 )
	{
		fprintf(stderr, "In function mc_epoll_del the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1;
	}
	ev->ev_flags = 0x0000 ; 
	
	return 0;
}

第四個:

int mc_epoll_mod(void * arg ,mc_event_t *ev )
{
	if( ev->base->magic != MC_BASE_MAGIC ) 
	{
		fprintf(stderr,"In function mc_epoll_mod %d , %s ",__LINE__,__FILE__);
		return -1 ;
	}
	mc_event_base_t *base = ev->base ;
	int epoll_fd = base->epoll_fd ;
	int err ;
	unsigned int mode ;
	if( !(ev->ev_flags & MC_EV_INITD) )
	{
		return -1 ;
	}
	
	struct epoll_event epoll_ev  ;		
	epoll_ev.data.ptr = ev ;
	if( arg == NULL )
	{
		return 0;
	}
	mode = *(unsigned int *)arg ;
	epoll_ev.events = mode ;
	err = epoll_ctl( epoll_fd , EPOLL_CTL_MOD , ev->ev_fd , &epoll_ev ); 
	if( err != 0 )
	{
		fprintf(stderr, "In function mc_epoll_del the epoll_ctl error in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1;
	}
	return 0;
}

第五個鈎子:

int	 mc_epoll_loop( void * args , mc_event_base_t *base , struct timeval ev_time )
{
	if( base == NULL )
	{
		fprintf(stderr,"base == NULL  in mc_epoll_loop in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1 ;
	}
	
	
	if( base->magic != MC_BASE_MAGIC ) 
	{
		fprintf(stderr,"In function mc_epoll_mod %d , %s ",__LINE__,__FILE__);
		return -1 ;
	}
	
	int nfds ; 
	
	/* we pass args as nevents in this function */
	struct epoll_event *nevents = ( struct epoll_event * )args ;
	
	struct epoll_event epoll_ev  ;	
	nfds = epoll_wait(  base->epoll_fd , nevents , MC_EVENT_MAX , 1) ;
	if( nfds <= -1 )
	{
		fprintf(stderr,"epoll wait function in mc_epoll_loop in file:%s,line:%d\n",__FILE__,__LINE__);
		return nfds ;
	}
	return nfds ;
}

實現方式具體可以看相應的代碼。文章有點長了,很多代碼就不貼出來了。

然后看看最后的一開始提到的幾個操作:

 

注冊事件(為需要監聽的fd加入回調函數)----->將事件加入反應堆------>開始事件循環------>事件發生,調用回調函數

mc_event_base_t * mc_base_new(void)
{
	mc_event_base_t * base = (mc_event_base_t *)malloc( sizeof(mc_event_base_t) );
	if( base == NULL )
	{
		fprintf(stderr,"Init the base moudle in mc_base_new error in file:%s,line:%d\n",__FILE__,__LINE__);
		return NULL ;
	}
	
	/* init the base lists */
	base->added_list = NULL  ;
	base->active_list = NULL ;
	
	base->magic = MC_BASE_MAGIC ;
		
	base->event_num = 0 ; 
	base->event_active_num = 0 ;
	
	base->ev_base_stop = MC_BASE_STOP  ;
	
	base->magic = MC_BASE_MAGIC ;
	
	gettimeofday(&base->event_time,NULL);
	
	mc_event_ini( base ) ;
	return base ;
	
}

int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )
{
	if( ev == NULL )
	{
		fprintf(stderr, " mc_event_set error , ev == NULL or other segment error in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1 ;
	}
	#if (HAVE_EPOLL)	
	unsigned int epoll_flag ;
	#endif
	int err ;
	memset(ev,0,sizeof(mc_event_t));
	ev->revent = revent ;
	ev->ev_fd     = fd 	;
	ev->callback = callback ;
	ev->next = NULL ;
	ev->prev = NULL ;
	if( args == NULL )
		ev->args = NULL  ;
	else
		ev->args = args  ;	
	
	/* This job post to mc_event_post 
	 *if( revent & MC_EV_LISTEN )
	 *{
	 *	err = mc_event_add(NULL , ev );
	 *	if( err != 0 )
	 *		fprintf(stderr,"mc_event_add in mc_event_set \n");
	 *}
	 */
	 
	/* event should post to base */
	if( ev->base == NULL )
		return 0;
		
	#if (HAVE_EPOLL)	
	
	
		if( revent & MC_EV_READ )
		{
			epoll_flag = EPOLLIN|EPOLLET ;
		
			err = mc_event_mod( (void *)&epoll_flag , ev ) ;
			if( err != 0 )
				fprintf(stderr,"mc_event_mod (MC_EVENT_READ ) in mc_event_set in file:%s,line:%d\n",__FILE__,__LINE__);
		}
	
		if( revent & MC_EV_WRITE )
		{
			epoll_flag = EPOLLOUT|EPOLLET ;
			err = mc_event_mod( (void *)&epoll_flag ,ev );
			if( err != 0 )
				fprintf(stderr,"mc_event_mod (MC_EVENT_WRITE) in mc_event_set in file:%s,line:%d\n",__FILE__,__LINE__);
		}
	
		ev->ev_flags |= MC_EV_INITD	;
		#endif
	
		return 0 ;
}

int mc_event_post( mc_event_t *ev , mc_event_base_t * base )
{
	if( ev == NULL || base == NULL )
	{
		fprintf(stderr," In function mc_event_post , the args error , please check your arguments in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1;
	}
	if( base->magic != MC_BASE_MAGIC )
	{
		fprintf(stderr,"The mc_event_base_t * points base non inited in mc_event_post in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1;
	}
	int err ;
	ev->base = base ;
	add_event_to_queue(ev,(mc_event_t **)&(base->added_list));
	base->event_num++;
	
	err = mc_event_add( NULL , ev );
	if( err == -1 )
	{
		fprintf(stderr,"In function mc_event_add error in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1;
	}
}



int mc_dispatch( mc_event_base_t * base )
{
	if( base == NULL )
	{
		fprintf(stderr, "base == NULL in function mc_dispatch in file:%s,line:%d\n",__FILE__,__LINE__);
		return -1 ;
	}
	if( base->magic != MC_BASE_MAGIC ) 
	{
		fprintf(stderr,"In function mc_disptahch noinitlized line:%d , in file:%s ",__LINE__,__FILE__);
		return -1 ;
	}
	
	struct epoll_event *nevents = ( struct epoll_event *)malloc( sizeof( struct epoll_event ) );
	int done = 0 ;
	
	int nevent ;
	int i ;
	mc_event_t *levent ;
	mc_event_t *retevent ;
	while( !done )
	{
		nevent = mc_event_loop( nevents , base , base->event_time) ;
		
		if( nevent == -1 )
		{
			fprintf(stderr,"No event check , return in file:%s line:%d \n",__FILE__,__LINE__);
			goto err1;
		}
		for( i = 0 ; i < nevent ; i++ ) 
		{
			if( nevents[i].events & EPOLLERR || nevents[i].events & EPOLLHUP )
			{
				levent = nevents[i].data.ptr ;
				if( !(levent->ev_flags & MC_EV_INITD) )
					continue ;
				if( (levent->ev_flags & MC_EV_ACTIVE) || (levent->ev_flags & MC_EV_ADDED ) )
					del_event_from_queue( levent ); 
			}
			if( nevents[i].events & EPOLLIN )
			{
				levent = nevents[i].data.ptr ;
				levent->revent = MC_EV_READ  ;
				add_event_to_queue( levent , (mc_event_t **)&(base->active_list) );	
				levent->ev_flags |=	MC_EV_ACTIVE ;	
				base->event_active_num++;	
			}
			else if(nevents[i].events & EPOLLOUT)
			{
				levent = nevents[i].data.ptr ;
				levent->revent = MC_EV_WRITE ;
				add_event_to_queue( levent , (mc_event_t **)&(base->active_list) );	
				levent->ev_flags |=	MC_EV_ACTIVE ;	
				base->event_active_num++;
			}
			else
			{
				fprintf(stderr,"Unknow err in file:%s,line:%d\n",__FILE__,__LINE__);
				goto err1;
			}
		}
		
		retevent  = (mc_event_t *)(base->active_list) ;
		for(i = 0 ;i < nevent ; i++ )
		{	
			fprintf(stderr," %d event(s)\n",nevent);
			if( retevent == NULL )
				break ;
		
			retevent = get_event_and_del( (mc_event_t *)(base->active_list) ) ;
			/* If we want to reuse this event we should set event again */
			retevent->ev_flags = retevent->ev_flags&(~MC_EV_ACTIVE);
			base->event_active_num-- ;
			if( retevent == NULL )
				fprintf(stderr,"event is NULL file:%s,line:%d\n",__FILE__,__LINE__);	
	
			retevent->callback(retevent->ev_fd,retevent->revent,retevent->args) ;
		}
		
	}
		return  0 ;
	err1:
		return -1 ;
}

在 dispatch函數中,我們的每一次 epoll 返回后都會輪訓 active 事件列表,然后調用事件相應的回調函數。

 

附上一開頭所說的一些宏定義:

#define HAVE_EPOLL     1

#define MC_EV_READ	   0x0001
#define MC_EV_WRITE    0x0002
#define MC_EV_SIGNAL   0x0004
#define MC_EV_TIMEOUT  0x0008
#define MC_EV_LISTEN   0x0010

/* the ev_flags value  in mc_event_s */
#define MC_EV_INITD	   0x0001
#define MC_EV_ADDED    0x0002
#define MC_EV_ACTIVE   0x0004
#define MC_EV_DELED	   0x0008	
   

#define MC_BASE_STOP   0x0000
#define MC_BASE_ACTIVE 0x0001

#define MC_BASE_MAGIC  0x1989


#define MC_EVENT_MAX   10240

  

總結:這篇文章不是那么的完善,涉及的內容太多,希望大家諒解,后續會有更新。

貼出了反應堆的設計方式和簡單的思路,僅供參考。系列文章服務器端的模型就到這里。后續可能會寫關於在這個模型上不同的知名的軟件的設計方式,比如說 libevent的,apache ,Nginx 等。  

 

  

  

  

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM