下面假定已經學習過基本的socket編程(socket, bind, listen, accept, connect, recv, send, close),並且對異步/callback有基本的認識。
基本的socket編程是阻塞/同步的,每個操作除非已經完成或者出錯才會返回,這樣對於每一個請求,要使用一個線程或者單獨的進程去處理,系統資源沒法支撐大量的請求。Posix定義了可以使用異步的select系統調用,但是因為其采用了輪詢的方式來判斷某個fd是否變成active,效率不高o(n)。於是各系統提出了基於異步/callback的系統調用,例如linux的epoll,BSD的kqueue,windows的IOCP。由於在內核層面做了支持,所以可以再o(1)的效率查找到active的fd。基本上,libevent就是對這些高效IO的封裝,提供統一個API,簡化開發。
libevent大概是這樣的:
默認情況下是單線程的(可以配置成多線程),每個線程有且只有一個event_base,對應一個struct event_base結構體(以及附於其上的事件管理器),用來schedule托管給它的一系列event,可以和操作系統的進程管理器類比,當然,更加簡單些。當一個事件發生后,event_base會在合適的時間(不一定是立即)去調用綁定在這個事件上的函數(傳入一些預定義的參數,以及綁定時指定的一個參數),知道這個函數執行完,再返回schedule其他事件。
//創建一個event_base struct event_base *base = event_base_new(); assert(base != NULL);
event_base內部有一個循環,循環阻塞在epoll/kqueue等系統調用上,知道有一個/一些事件發生,然后去處理這些事件。當然,這些事件要被綁定到event_base上。每個事件對應一個struct event,可以是監聽一個fd或者Posix信號量之類的。struct event使用event_new來創建和綁定,使用event_add來啟用。
//創建並綁定一個event struct event *listen_event; //參數:event_base, 監聽的fd,事件類型及屬性,綁定的回調函數,給回調函數的參數 listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, (void*)base); //參數:event,超時時間(struct timeval *類型的,NULL表示無超時設置) event_add(listen_event, NULL);
注:libevent支持的事件及屬性包括(使用bitfield實現,所以用|來讓他們合體)
(a) EV_TIMEOUT:超時
(b) EV_READ: 只要網絡緩沖中還有數據,回調函數就會被觸發
(c) EV_WRITE: 只要塞給網絡緩沖的數據被寫完,回調函數就會被觸發
(d) EV_SIGNAL: POSIX信號量,參考manual吧
(f) EV_ET: Edge-Trigger邊緣觸發,參考EPOLL_ET
然后需要啟動event_base的循環,這樣才能開始處理發生的事件。循環的啟動使用event_base_dispatch,循環將一直持續,知道不再有需要關注的事件,或者遇到event_loopbreak()/event_loopexit()函數。
//啟動事件循環 event_base_dispatch(base);
接下來關注下綁定event的回調函數callback_func:傳遞給它的是一個socket fd, 一個event類型及屬性bit_field,以及傳遞給event_new的最后一個參數(去上面幾行回顧一下,把event_base給傳進來了,實際上更多地是分配一個結構體,把相關的數據都放進去,最后丟給event_new,在這里就能取到了)。其原型是:
typedef void(* event_callback_fn(evutil_socket_t sockfd, short event_type, void *arg))
對於一個服務器而言,上面的流程大概是這樣組合的:
1. listener = socket(), bind(), listen(), 設置nonblocking(Posix系統可使用fcntl設置,windows不需要設置,實際上libevent提供了統一的包裝evutil_make_socket_nnblocking)
2. 創建一個event_base
3. 創建一個event, 將該socket托管給event_base,指定要監聽的事件類型,並綁定上相應的回調函數(及需要給它的參數)。對於listener socket來說,只需要監聽EV_READ|EV_PERSIST
4. 啟動該事件
5. 進入事件循環
6. (異步)當有client發起請求的時候,調用該回調函數,進行處理。
問題:為什么不在listen完馬上調用accept,獲得客戶端連接以后再丟給event_base呢?這個問題不知道YFS就這么干的,不過這個要另起一個線程。
回調函數要做什么事情呢?當然是處理client的請求了。首先要accept,獲得一個可以與client通信的sockfd,然后……調用recv/send嗎?錯!大錯特錯!如果直接調用recv/send的話,這個線程就阻塞在這個地方了,如果這個客戶端非常的陰險(比如一直不發消息,或者網絡不好,老是丟包),libevent就只能等它,沒法處理其他的請求了——所以應該創建一個新的event來托管這個sockfd。
在老版本libevent上的實現,比較羅嗦[如果不想詳細了解的話,看下一部分]。
對於服務器希望先從client獲取數據的情況,大致流程是這樣的:
1. 將這個sockfd設置為nonblocking
2. 創建2個event:
event_read, 綁上sockfd的EV_READ|EV_PERSIST,設置回調函數和參數
event_write, 綁上sockfd的EV_WRITE|EV_PERSIST, 設置回調函數和參數
3. 啟動event_read事件
----
4. (異步)等待event_read事件的發生,調用相應的回調函數。這里麻煩來了:回調函數recv讀入的數據,不能直接用send丟給sockfd了事,因為sockfd是nonblocking的,丟給他的話,不能保證正確(因為異步。。。)。所以需要一個自己管理的緩存用來保存讀入的數據中(在accept以后就創建一個struct,作為第2步回調函數的arg傳進來),在合適的事件(比如遇到換行符)啟用event_write事件(event_add(event_write, NULL)),等待EV_WRITE事件的觸發。
5. (異步)當event_write事件的回調函數被調用時,往sockfd寫入數據,然后刪除event_write事件(event_del(event_write)),等待event_read事件的下一次執行。
一個例子可以參考官方文檔里面的[Example: A low-level ROT13 server with Libevent]
由於需要自己管理緩沖區,且過程晦澀難懂,並且不兼容於Windows的IOCP,所以libevent2開始,提供了bufferevent這個神器,用來提供更加優雅、易用的API。struct bufferevent內建了兩個event(read/write)和對應的緩沖區[struct evbuffer *input, *output],並提供相應的函數用來操作緩沖區(或者直接操作bufferevent)。每當有數據被讀入input的時候,read_cb函數被調用;每當output被輸出完的時候,write_cb被調用;在網絡IO操作出現錯誤的情況(連接中斷、超時、其他錯誤),error_cb被調用。於是上一部分的步驟被簡化為:
1. 設置sockfd為nonblocking
2. 使用bufferevent_socket_new創建一個struct bufferevent *bev,關聯該sockfd,托管給event_base
3. 使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg)將EV_READ/EV_WRITE對應的函數
4. 使用bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)來啟用read/write事件
----
5. (異步)
在read_cb里面從input讀取數據,處理完畢后塞到output里(會被自動寫入到sockfd)
在write_cb里面(需要做什么嗎?對於一個echo server來說,read_cb就足夠了)
在error_cb里面處理遇到的錯誤
可以使用bufferevent_set_timeouts(bev, struct timeval *READ, struct timeval *WRITE)來設置讀寫超時, 在error_cb里面處理超時
read_cb和write_cb的原型是
void read_or_write_callback(struct bufferevent *bev, void *arg)
error_cb的原型是
void error_cb(struct bufferevent *bev, short error, void *arg) //這個是event的標准回調函數原型
可以從bev中用libevent的API提取出event_base、sockfd、input/output等相關數據
void read_cb(struct bufferevent *bev, void *arg) { char line[256]; int n; evutil_socket_t fd = bufferevent_getfd(bev); while (n = bufferevent_read(bev, line, 256), n > 0) bufferevent_write(bev, line, n); } void error_cb(struct bufferevent *bev, short event, void *arg) { bufferevent_free(bev); }
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <assert.h> #include <event2/event.h> #include <event2/bufferevent.h> #define LISTEN_PORT 9999 #define LISTEN_BACKLOG 32 void do_accept(evutil_socket_t listener, short event, void *arg); void read_cb(struct bufferevent *bev, void *arg); void error_cb(struct bufferevent *bev, short event, void *arg); void write_cb(struct bufferevent *bev, void *arg); int main(int argc, char *argv[]) { int ret; evutil_socket_t listener; listener = socket(AF_INET, SOCK_STREAM, 0); assert(listener > 0); evutil_make_listen_socket_reuseable(listener); struct sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(LISTEN_PORT); if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) { perror("bind"); return 1; } if (listen(listener, LISTEN_BACKLOG) < 0) { perror("listen"); return 1; } printf ("Listening...\n"); evutil_make_socket_nonblocking(listener); struct event_base *base = event_base_new(); assert(base != NULL); struct event *listen_event; listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); event_add(listen_event, NULL); event_base_dispatch(base); printf("The End."); return 0; } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = (struct event_base *)arg; evutil_socket_t fd; struct sockaddr_in sin; socklen_t slen = sizeof(sin); fd = accept(listener, (struct sockaddr *)&sin, &slen); if (fd < 0) { perror("accept"); return; } if (fd > FD_SETSIZE) { //這個if是參考了那個ROT13的例子,貌似是官方的疏漏,從select-based例子里抄過來忘了改 perror("fd > FD_SETSIZE\n"); return; } printf("ACCEPT: fd = %u\n", fd); struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, read_cb, NULL, error_cb, arg); bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); } void read_cb(struct bufferevent *bev, void *arg) { #define MAX_LINE 256 char line[MAX_LINE+1]; int n; evutil_socket_t fd = bufferevent_getfd(bev); while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) { line[n] = '\0'; printf("fd=%u, read line: %s\n", fd, line); bufferevent_write(bev, line, n); } } void write_cb(struct bufferevent *bev, void *arg) {} void error_cb(struct bufferevent *bev, short event, void *arg) { evutil_socket_t fd = bufferevent_getfd(bev); printf("fd = %u, ", fd); if (event & BEV_EVENT_TIMEOUT) { printf("Timed out\n"); //if bufferevent_set_timeouts() called } else if (event & BEV_EVENT_EOF) { printf("connection closed\n"); } else if (event & BEV_EVENT_ERROR) { printf("some other error\n"); } bufferevent_free(bev); }