關於I/O利用,不同的平台上都有相應的策略,比如select、poll、epoll、kqueue、devpoll、evport、win32。為libevent提供一組庫函數,屏蔽了平台的差異性,底層還是調用的epoll、kqueue、devpoll等函數。libevent會使用優化的策略來選擇使用哪個后端方法,以達到效率最高。
創建event_base
struct event_base *event_base_new(void);
event_base中存放你是監聽是否就緒的event。
在子進程中event_base要重新初始化:
int event_reinit(struct event_base *base);
/* ... add some events to the event_base ... */ if (fork()) { /* In parent */ continue_running_parent(base); /*...*/ } else { /* In child */ event_reinit(base); continue_running_child(base); /*...*/ }
銷毀event_base
void event_base_free(struct event_base *base);
配置event_base
如果你想對event_base有更精准的控制,可以對它進行多項配置。
struct event_config *event_config_new(void); struct event_base *event_base_new_with_config(const struct event_config *cfg); void event_config_free(struct event_config *cfg);
我們知道libevent有很多后端方法:select、poll、epoll、kqueue、devpoll、evport、win32等等。
獲取libevent支持哪些后端方法:
const char **event_get_supported_methods(void);
禁用某個后端方法:
int event_config_avoid_method(struct event_config *cfg, const char *method);
獲取當前event_base中支持哪些后端方法:
const char *event_base_get_method(const struct event_base *base);
可以要求后端方法具有某些特征,不具有該特征的后端方法將不被啟用。
enum event_method_feature { EV_FEATURE_ET = 0x01, EV_FEATURE_O1 = 0x02, EV_FEATURE_FDS = 0x04, }; int event_config_require_features(struct event_config *cfg,enum event_method_feature feature);
EV_FEATURE_ET要求后端方法支持Edge Trigger,此時select不會被啟用,而epoll是滿足的。
EV_FEATURE_O1要求增加、刪除一個事件,或者使一個事件變為就緒只需要O(1)的時間。
EV_FEATURE_FDS要求后端方法支持任意的文件描述符,不僅限於sockets。
enum event_method_feature event_base_get_features(const struct event_base *base);
把上面的知識綜合起來,給一段示例代碼:
#include<stdio.h> #include<event2/event.h> void main(){ struct event_base *eb1,*eb2; struct event_config *ec; enum event_method_feature f; int i; const char **support=event_get_supported_methods(); printf("libevent support backend methods:"); for(i=0;support[i]!=NULL;++i){ printf("%s\t",support[i]); } printf("\n"); eb2=event_base_new(); printf("current event_base support method:%s\n",event_base_get_method(eb2)); f=event_base_get_features(eb2); if(f&EV_FEATURE_ET) printf("Edge-Triggered events are supported.\n"); if(f&EV_FEATURE_O1) printf("O(1) event notification is supported.\n"); if(f&EV_FEATURE_FDS) printf("All FD types are supported.\n"); event_base_free(eb2); printf("\n"); ec=event_config_new(); event_config_require_features(ec,EV_FEATURE_ET); event_config_avoid_method(ec,"select"); eb1=event_base_new_with_config(ec); const char* have=event_base_get_method(eb1); printf("current event_base support method:%s\n",have); f=event_base_get_features(eb1); if(f&EV_FEATURE_ET) printf("Edge-Triggered events are supported.\n"); if(f&EV_FEATURE_O1) printf("O(1) event notification is supported.\n"); if(f&EV_FEATURE_FDS) printf("All FD types are supported.\n"); event_config_free(ec); event_base_free(eb1); }
運行輸出:
libevent support backend methods:epoll poll select
current event_base support method:epoll
Edge-Triggered events are supported.
O(1) event notification is supported.
current event_base support method:epoll
Edge-Triggered events are supported.
O(1) event notification is supported.
可以看到在默認情況下創建event_base時,method feature就是EV_FEATURE_ET|EV_FEATURE_O1,這樣在epoll、poll、 select三者當中只有epoll滿足。友情提示epoll不滿足EV_FEATURE_FDS,即epoll只能用於socket。
優先級
默認情況下libevent只支持一個優先級別,你可以設置讓它支持多個級別。
int event_base_priority_init(struct event_base *base, int n_priorities);
優先級編號為[0 , n_priorities-1]。
查看當前event_base支持幾個優先級別:
int event_base_get_npriorities(struct event_base *base);
event
當一些條件滿足時,事件就會觸發:
- fd已准備好讀取或寫入。
- fd馬上就准備好讀取或寫入----Edge-Triggered。
- timeout
- 信號中斷
- 用戶觸發的事件
event觸發后用戶要定義相應的回調函數。一個event需要和一個event_base關聯起來以完成初始化。把一個event添加到event_base后它就變成了pending狀態,一旦事件被觸發event變成了active狀態,並且此時回調函數會被執行。注意只有event從pending狀態變為active狀態時回調函數才會被執行。默認情況下變為active狀態后是變不回pending狀態的。如果event被設置成persistent,回調函數執行后它就又變成了pending狀態。在沒有設置persistent時,你可以先把event從evnt_base中delete,使之從active變為non-pending,然后再add到event_base中,從non-pending變為pending。
int event_add(struct event *ev, const struct timeval *tv);
對一個non-pending的event實行event_add時它就變成了pending狀態,注意參數中要提供timeout。
int event_del(struct event *ev);
對已初始化的event實行event_del使之變為non-pending、non-active狀態。
可以讓libevent支持POSIX風格的信號機制:
struct event* event_new(base, signum, EV_SIGNAL|EV_PERSIST, cb, arg)
當有信息signum到來時就會觸發回調函數cb。
在極少數的情況下你可能需要強制觸發一個事件:
void event_active(struct event *ev, int what, short ncalls);
至此一個event發生的5種基本情況下介紹完了,下面看一個些具體的接口。
#define EV_TIMEOUT 0x01 #define EV_READ 0x02 #define EV_WRITE 0x04 #define EV_SIGNAL 0x08 #define EV_PERSIST 0x10 #define EV_ET 0x20 typedef void (*event_callback_fn)(evutil_socket_t, short, void *); struct event *event_new(struct event_base *base, evutil_socket_t fd,short what, event_callback_fn cb,void *arg); int event_add(struct event *ev, const struct timeval *tv); int event_del(struct event *ev); void event_free(struct event *event);
在event_new()函數中的參數what就是用來指定EV_READ、EV_SIGNAL等的。
下面來做個最簡單的實驗,還是socket的例子,服務端接收兩個客戶端的連接,然后監聽這兩個套接口上的讀事件發生。創建事件時只設置EV_READ標志。
#include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<errno.h> #include<string.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<event2/event.h> #define MYPORT 5000 #define BACKLOG 2 //TCP層接收鏈接池的緩沖隊列大小 #define BUF_SIZE 200 //用於讀寫網絡數據的內存緩沖大小 int fd_A[BACKLOG]; //存放處於連接中的socket描述符 int conn_amount; //目前的TCP連接數量 void ev_callback(evutil_socket_t fd, short tag, void *arg){ printf("read from file descriptor:%d:",fd); char buf[BUF_SIZE]={0}; int n=read(fd,buf,sizeof(buf)); buf[n]='\0'; printf("%s",buf); printf("\n"); } void timeout_callback(evutil_socket_t fd, short tag, void *arg){ printf("fd %d timeout.\n",fd); } void main(){ int sock_fd; int connfd[2]={-1}; struct sockaddr_in server_addr; struct sockaddr_in client_addr; socklen_t sin_size; int yes=1; char buf[BUF_SIZE]; int ret; int i; struct event_base *eb; struct event *ev[2]; if((sock_fd=socket(PF_INET,SOCK_STREAM,0))==-1){ perror("socket"); exit(1); } if(setsockopt(sock_fd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int))==-1){ perror("socket"); exit(1); } server_addr.sin_family=AF_INET; server_addr.sin_port=htons(MYPORT); server_addr.sin_addr.s_addr=htonl(INADDR_ANY); memset(server_addr.sin_zero,'\0',sizeof(server_addr.sin_zero)); if(bind(sock_fd,(struct sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("bind"); exit(1); } if(listen(sock_fd,BACKLOG)==-1){ perror("listen"); exit(1); } printf("listen on port %d\n",MYPORT); sin_size=sizeof(client_addr); for(i=0;i<2;i++){ connfd[i]=accept(sock_fd,(struct sockaddr*)&client_addr,&sin_size); printf("%d connection built.\n",connfd[i]); } eb=event_base_new(); struct timeval tv={1,0}; //1秒 for(i=0;i<2;i++){ ev[i]=event_new(eb, connfd[i],EV_READ,ev_callback,NULL); event_add(ev[i],NULL); } event_base_dispatch(eb); printf("loop exit.\n"); for(i=0;i<2;i++){ close(connfd[i]); event_free(ev[i]); } event_base_free(eb); }
客戶端代碼:
#!/usr/bin/perl use IO::Socket; my $host="127.0.0.1"; my $port=5000; my $socket=IO::Socket::INET->new("$host:$port") or die "create socket error $@"; sleep(5); #建立連接后先休息5秒,再發送數據 my $msg_out="1234567890"; print $socket $msg_out; print "now send over,go to sleep...\n"; while(1){ #循環,每隔5秒發送一次數據 sleep(5); print "5 seconds gone...send another line\n"; print $socket $msg_out; }
服務端輸出:
listen on port 5000
4 connection built.
5 connection built.
read from file descriptor:4:1234567890
read from file descriptor:5:1234567890
loop exit.
client在不停地向server發送數據,這什么server只接收了一次數據event_base就自動退出了?請看下文講的event_base_loop()。
補充一些知識:在關閉套接口時使用close()后則該端口既不能用於讀取數據,也不能用於發送數據。而shutdown可以有更多選擇,它那允許只關閉一個方向(讀或寫)的通信或關閉雙向通信。上述代碼中只要server端close了套接口
server端第83行創建event時使用標記EV_READ|EV_ET,並且我們故意把BUF_SIZE的大小改為5(client每次向server發送10個字節的字符串)。關於ET我們在講epoll的時候已經說過了。看一下運行結果,在意料之中:
listen on port 5000
4 connection built.
5 connection built.
read from file descriptor:4:12345
read from file descriptor:5:12345
loop exit.
下面的試驗還把BUF_SIZE改為200。
把83、84行改為:
ev[i]=event_new(eb, connfd[i],EV_READ|EV_TIMEOUT,timeout_callback,NULL); //EV_TIMEOUT標記可有可無
event_add(ev[i],&tv);
則在客戶端打印出“now send over,go to sleep...”之前(此時client還沒有向server發送數據),服務端就輸出:
listen on port 5000
4 connection built.
5 connection built.
fd 4 timeout.
fd 5 timeout.
loop exit.
為了方便,libevent定義了一些純粹由timeout觸發事件的函數調用。
#define evtimer_new(base, callback, arg) \ event_new((base), -1, 0, (callback), (arg)) #define evtimer_add(ev, tv) \ event_add((ev),(tv)) #define evtimer_del(ev) \ event_del(ev) #define evtimer_pending(ev, what, tv_out) \ event_pending((ev), (what), (tv_out))
想由一個信號來觸發事件可以調用:
#define evsignal_new(base, signum, callback, arg) \ event_new(base, signum, EV_SIGNAL|EV_PERSIST, cb, arg)
比如
struct event *hup_event; struct event_base *base = event_base_new(); /* call sighup_function on a HUP signal */ hup_event = evsignal_new(base, SIGHUP, sighup_function, NULL);
有一些方便的宏定義可以使用:
#define evsignal_add(ev, tv) \ event_add((ev),(tv)) #define evsignal_del(ev) \ event_del(ev) #define evsignal_pending(ev, what, tv_out) \ event_pending((ev), (what), (tv_out))
注意,你只能往一個event_base中安裝信號才有效。
在創建event時,有時候想把event自身作為回調函數的參數傳遞進去,可以用:
void *event_self_cbarg();
例如你想在event觸發到達一定數量后,把后來的event從event_base中刪除。
#include <event2/event.h> static int n_calls = 0; void cb_func(evutil_socket_t fd, short what, void *arg) { struct event *me = arg; printf("cb_func called %d times so far.\n", ++n_calls); if (n_calls > 100) event_del(me); } void run(struct event_base *base) { struct timeval one_sec = { 1, 0 }; struct event *ev; /* We're going to set up a repeating timer to get called called 100 times. */ ev = event_new(base, -1, EV_PERSIST, cb_func, event_self_cbarg()); event_add(ev, &one_sec); event_base_dispatch(base); }
上文已提到可以設置event_base總共有多少個優先級,設置某個event的優先級可以用:
int event_priority_set(struct event *event, int priority);
查看event的狀態
int event_pending(const struct event *ev, short what, struct timeval *tv_out);
如果event處於pending狀態,則返回true;否則返回false。
@param what the requested event type; any of EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL
@param tv_out if this field is not NULL, and the event has a timeout, this field is set to hold the time at which the timeout will expire.
short event_get_events(const struct event *ev);
獲取event的flag,EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL等。
void event_get_assignment(const struct event *event, struct event_base **base_out, evutil_socket_t *fd_out, short *events_out, event_callback_fn *callback_out, void **arg_out);
給一個已經創建好的event重新賦值。
還有一些函數其功能是“顧名思義”的:
#define event_get_signal(ev) /* ... */ evutil_socket_t event_get_fd(const struct event *ev); struct event_base *event_get_base(const struct event *ev); event_callback_fn event_get_callback(const struct event *ev); void *event_get_callback_arg(const struct event *ev);
循環監聽
#define EVLOOP_ONCE 0x01 #define EVLOOP_NONBLOCK 0x02 #define EVLOOP_NO_EXIT_ON_EMPTY 0x04 int event_base_loop(struct event_base *base, int flags); int event_base_dispatch(struct event_base *base);
先說明一下,我在編譯的時候說上面這些宏定義找不到,我grep了安裝頭文件也確實找不到。
dispath相當於使用默認flag的loop,默認的flag是EVLOOP_ONCE。
EVLOOP_ONCE當event_base注冊的事件都被觸發過一次后,循環監聽就退出了。你可以設置EVLOOP_NO_EXIT_ON_EMPTY讓event_base為空后不退出。而EVLOOP_NONBLOCK不會刻意等待事件被觸發,它會立即檢察事件是否被觸發。
退出循環監聽
當所有向event_base中add的event全部被delete時,loop自動退出。你也可以強制讓loop退出。
int event_base_loopexit(struct event_base *base, const struct timeval *timeout); int event_base_loopbreak(struct event_base *base);
使用loopbreak()時立即退出loop。使用loopexit()時,如果timeout到期,則會檢查是否有回調函數正在執行,如果有則繼續把它執行完,如果沒有則退出loop。所以使用loopbreak()和timeout為NULL的loopexit()還是有區別的。
你還可以事后查看loop是以哪種方式退出的:
int event_base_got_exit(struct event_base *base); int event_base_got_break(struct event_base *base);
查看event_base的狀態
event_base的狀態就是event_base中所有event的狀態。你可以把這些信息dump到一個文件里。
void event_base_dump_events(struct event_base *base, FILE *f);
可移植性
libevent是一整套函數調用,它不僅對I/O利用的后端方法進行了屏蔽,而且它對一些基本數據類型(如上文中已經看到的evutil_socket_t)、系統調用(如event_base_gettimeofday_cached)、socket函數進行了重新定義。下面簡單列舉一二。
int evutil_closesocket(evutil_socket_t s); #define EVUTIL_CLOSESOCKET(s) evutil_closesocket(s)
關閉套接口。
int evutil_make_socket_nonblocking(evutil_socket_t sock);
設置套接口非阻塞。