[原]淺談幾種服務器端模型——反應堆模式(基於epoll的反應堆)


引言:前面一章簡單介紹了關於epoll 的使用方式,這一章介紹一下一個簡單的反應堆模型,沒有實現超時機制的管理。最主要的是要介紹一下關於異步事件反應堆的設計方式。

反應堆的模型圖在上一張可以看到,但是那個是盜來的一張圖,twisted 的反應堆。今天給不熟悉這個部分的朋友介紹一下基於 epoll 的反應堆,過程類似於libevent.

反應堆可以提供幾個操作:

(0)創建一個反應堆:

mc_event_base_t * mc_base_new(void) ;

返回一個操作句柄.  

(1)為某一個需要監聽的文件描述符加入回調函數,並注冊事件類型。

int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )  ;
	/*
	 * Initialize a event , add callback and event type
	 * if the event exists , this function will change the mode of this event
	 * and fd 
	 */

 這里的 revent 由宏定義為幾種類型:

  

#define MC_EV_READ     0x0001
#define MC_EV_WRITE    0x0002
#define MC_EV_SIGNAL   0x0004
#define MC_EV_TIMEOUT  0x0008
#define MC_EV_LISTEN   0x0010

相應的操作可以使用 | 運算來並幾個需要監聽的事件類型。

事件類型定義如下:

typedef struct mc_event_s
{
	
	  struct mc_event_s   *next	   ;
	 
	 
	  struct mc_event_s   *prev    ;
	
	 unsigned int min_heap_index  ;
	 
	 int ev_fd		;   // file des of event
	 short revent	;   // event type
	 
	 struct timeval  ev_timeval   ; // event timeout time 
	 mc_ev_callback callback ;// callback of this event 
	 void  *args 			      ;
	 int ev_flags 			      ;
	 
	 mc_event_base_t	*base	  ;
}mc_event_t ;

事件結構本身后面解釋。 

(2)將需要監聽的並且已經初始化的事件加入反應堆。

int mc_event_post( mc_event_t *ev , mc_event_base_t * base ) ;
	/*
	 * Post this event to event_base 
	 * struct base has two queue , active queue and added queue
	 * this function will post event to added queue , but not in active queue
	 */

將剛才注冊了事件類型和回調函數的事件加入 base, 即將其看做一個反應堆。

(3)最后提供了一個 dispatch 函數,反應堆開始循環,等待事件的發生。如果對應的 fd 上的事件發生,調用相應的回調函數。由第一步注冊。

int mc_dispatch( mc_event_base_t * base ) ;
    /*
     * start loop 
     * and dispatch event by 
     * mc_event_loop
     */

反應堆支持在循環過程中,通過相應的回調函數再注冊事件,類似於熱加入,熱移除。

實現方式很簡單,就是在第一個事件的回調函數上調用 mc_event_set()然后注冊。再加入 base.

base 的結構如下 :

typedef struct mc_event_base_s
{
	void		 * 	added_list		;
	void	     *	active_list	    ;
	unsigned int 	event_num		;
	unsigned int 	event_active_num;
	
	/*
	 *mc_minheap	    minheap			;
	 */
	int				epoll_fd	    ;  //for epoll only 
	int			    ev_base_stop	;
	int				magic		    ;
	struct timeval	event_time		;	
}mc_event_base_t ;

讓我們來看一個簡單的 demo

/*_____________________test bellow ______________________*/
#define mc_sock_fd	int


#define DEFAULT_NET	AF_INET
#define DEFAULT_DATA_GRAM 	SOCK_STREAM
#define DEFAULT_PORT  		(1115)
#define	DEFAULT_BACKLOG		(200)

/* simple connection */
struct _connection
{
	int fd 			  ;
	mc_event_t  read  ;
	mc_event_t  write ;
	char buf[1024]    ;
	mc_event_base_t * base ;
};
void setreuseaddr( mc_sock_fd fd )
{
	int yes = 1 ;
	setsockopt( fd , SOL_SOCKET , SO_REUSEADDR , &yes , sizeof(int) );
}
int mc_socket()
{
	int retsock = socket(DEFAULT_NET,DEFAULT_DATA_GRAM,0) ;
	if( retsock < 0  )
	{
		/* we should add some debug information here
		fprintf(LOGPATH,"socket error\n");
		*/
		return -1 ;
	}
	return retsock ;
}		

int mc_bind(mc_sock_fd listenfd )
{
	struct sockaddr_in serveraddr ;
	bzero(&serveraddr,sizeof(serveraddr));

	serveraddr.sin_family = AF_INET ;
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serveraddr.sin_port = htons(DEFAULT_PORT);
	return bind(listenfd,(struct sockaddr *)&serveraddr , sizeof(serveraddr ));
}

int mc_isten(mc_sock_fd listenfd)
{
	return listen(listenfd,DEFAULT_BACKLOG);
}



void handler_accept( int fd , short revent , void *args )
{
	struct sockaddr_in in_addr ;
	size_t in_len ;
	int s	;
	int done = 0 ;
	struct _connection * lc = (struct _connection *)args ;
	
	in_len = sizeof( in_addr );
	mc_setnonblocking(fd) ;
	while( !done )
	{
		s = accept( fd , (struct sockaddr *)&in_addr , &in_len );
		if( s == -1 )
		{
			if( (errno == EAGAIN )|| (errno == EWOULDBLOCK ) )
			{
				break;
			}
			else
			{
				perror("accept");
				break;
			}
		}
		if( s == 0 )
		{
			fprintf(stderr,"Accept a connection on %d \n",fd );
		}
		done = 1 ;
	}
		mc_setnonblocking(s) ;
		lc->fd = s ;
		mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
		
		
		mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
		mc_event_post( &(lc->write) , lc->base );
		 
		
}
void handler_read( int fd , short revent , void *args )
{
	mc_setnonblocking(fd) ;
	struct _connection * lc ;
	lc  = (struct _connection *)args ;
	read( fd , lc->buf , 1024 );
	mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
}

void handler_write( int fd , short revent , void *args )
{
	mc_setnonblocking(fd) ;
	struct _connection * lc ;
	lc  = (struct _connection *)args ;
	write( fd , lc->buf , 1024 );
	mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
}

void cab( int fd , short revent , void *args )
{
	mc_setnonblocking(fd) ;
	char buf[1024] = "xx00xx00xx00xx00\n";
	write(fd,buf,1024);
}
int main()
{
	mc_event_t mev ;
	mc_event_base_t  *base = mc_base_new() ;
	struct _connection lc ;
	lc.base = base ;
	
	int sockfd = mc_socket() ;
	mc_bind(sockfd);
	mc_isten(sockfd);
	
	mc_event_set( &(lc.read) , MC_EV_READ , sockfd , handler_accept , &lc );
	mc_event_post( &(lc.read) , base );
	mc_dispatch(base);
	return 0;
}

  

首先:封裝的幾個套接口操作沒有考慮錯誤處理,作為簡單的實例。

定義了一個 connection 結構,用於表示每一個到來的連接,這里的 struct _connection 中包含讀寫事件和一個緩沖區,還有指向反應堆的指針和對應注冊的fd

工作過程如下:(集中看  main函數)

(1)創建一個反應堆。

(2)實例化一個 connection

(3)創建套接口,bind,listen 老生常談,這里就不多說了

(4)將這個監聽套接口注冊相應的回調函數,這里我們注冊的是 handler_accept() 函數,回調函數類型都是  void *XXX(  int  , short , void *) ;

       當監聽套接口發生可讀事件時,第一次我們認為是相應的監聽套接口得到了新的連接,所以,第一次調用的時候直接調用注冊了的回調函數 handler_accept().

在handler_accept() 函數中,我們為這個連接的讀寫事件添加了相應的回調函數,並把連接描述符(不是監聽描述符)注冊到這個上。下次這個套接口可讀的時候調用handler_read(),可寫的時候調用handler_write(). 如果需要改變狀態或改變回調函數,只需要一個狀態機或者別的方式來確定需要的回調函數是哪一個,在我們的handler_write() 和 handler_read()中可以改變回調函數,代碼所示。

 

PS:注意一點的是我們的事件是一個實例,不管是在connection結構中或是自己定義,都需要不斷的向操作系統申請空間,如果采用對象池或者connection池的方式,可以減少服務器的負載。

總結:反應堆模式最基本的操作就是:注冊事件(為需要監聽的fd加入回調函數)----->將事件加入反應堆------>開始事件循環------>事件發生,調用回調函數。

異步操作的精髓就是在這里,而不是同步的等待每一個事件。下一章講解這個反應堆的實現,越來越帶感咯.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM