1 libevent介紹和安裝
介紹
libevent是一個輕量級的基於事件驅動的高性能的開源網絡庫,並且支持多個平台,對多個平台的I/O復用技術進行了封裝,當我們編譯庫的代碼時,編譯的腳本將會根據OS支持的處理事件機制,來編譯相應的代碼,從而在libevent接口上保持一致。
在當前的服務器上,面對的主要問題就是要能處理大量的連接。而通過libevent這個網絡庫,我們就可以調用它的API來很好的解決上面的問題。首先,可以來回顧一下,對這個問題的傳統解決方法。
問題: 如何處理多個客戶端連接
解決方案1:I/O復用技術
這幾種方式都是同步I/O,即當讀寫事件就緒,他們自己需要負責進行讀寫,這個讀寫過程是阻塞的,而異步I/O則不需要自己負責讀寫,只需要通知負責讀寫的程序就可以了。
-
循環
假設當前我服務器有多個網絡連接需要看管,那么我就循環遍歷打開的網絡連接的列表,來判斷是否有要讀取的數據。這種方法的缺點很明顯,那就是 1.速度緩慢(必須遍歷所有的網絡連接) 2.效率低 (處理一個連接時可能發生阻塞,妨礙其他網絡連接的檢查和處理) -
select方式
select對應於內核中的sys_select調用,sys_select首先將第二三四個參數指向的fd_set拷貝到內核,然后對每個被SET的描述符調用進行poll,並記錄在臨時結果中(fdset),如果有事件發生,select會將臨時結果寫到用戶空間並返回;當輪詢一遍后沒有任何事件發生時,如果指定了超時時間,則select會睡眠到超時,睡眠結束后再進行一次輪詢,並將臨時結果寫到用戶空間,然后返回。
select返回后,需要逐一檢查關注的描述符是否被SET(事件是否發生)。(select支持的文件描述符數量太小了,默認是1024)。 -
poll方式
poll與select不同,通過一個pollfd數組向內核傳遞需要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只需要被初始化一次。
poll的實現機制與select類似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,然后對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。
poll返回后,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。 -
epoll方式
epoll通過epoll_create創建一個用於epoll輪詢的描述符,通過epoll_ctl添加/修改/刪除事件,通過epoll_wait檢查事件,epoll_wait的第二個參數用於存放結果。
epoll與select、poll不同,首先,其不用每次調用都向內核拷貝事件描述信息,在第一次調用后,事件信息就會與對應的epoll描述符關聯起來。其次,epoll不是通過輪詢,而是通過在等待的描述符上注冊回調函數,當事件發生時,回調函數負責把發生的事件存儲在就緒事件鏈表中,最后寫到用戶空間。
epoll返回后,該參數指向的緩沖區中即為發生的事件,對緩沖區中每個元素進行處理即可,而不需要像poll、select那樣進行輪詢檢查。
解決方案2:多線程技術或多進程技術
多線程技術和多進程技術也可以處理高並發的數據連接,因為在服務器中可以產生大量的進程和線程和處理我們需要監視的連接。但是,這兩種方式也是有很大的局限性的,比如多進程模型就不適合大量的短連接,因為進程的產生和關閉需要消耗較大的系統性能,同樣,還要進程進程間的通信,在CPU性能不足的情況下不太適合。而多線程技術則不太適合處理長連接,因為當我們建立一個進程時,linux中會消耗8G的棧空間,如果我們的每個連接都杵着不斷開,那么大量連接長連接后,導致的結果就是內存的大量消耗。
解決方案3:常用的上述二者復合使用
上述的兩種方法各具有優缺點,因此,我們可以將上述的方法結合起來,這也是目前使用較多的處理高並發的方法。多進程+I/O復用或者多線程+I/O復用。而在具體的實現上,又可以分為很多的方式。比如多線程+I/O復用技術,我們使用使用一個主線程負責監聽一個端口和接受的描述符是否有讀寫事件產生,如果有,則將事件分發給其他的工作進程去完成,這也是進程池的理念。
在說完上述的高並發的處理方法之后,我們可以來介紹一個libevent的主要特色了。
同樣,lievent也是采用的上述系統提供的select,poll和epoll方法來進行I/O復用,但是針對於多個系統平台上的不同的I/O復用實現方式,libevent進行了重新的封裝,並提供了統一的API接口。libevent在實現上使用了事件驅動這種機制,其本質上是一種Reactor模式。
Reactor模式,是一種事件驅動機制。應用程序需要提供相應的接口並注冊到Reactor上,如果相應的事件發生,Reactor將主動調用應用程序注冊的接口,這些接口又稱為“回調函數”。
在Libevent中也是一樣,向Libevent框架注冊相應的事件和回調函數;當這些事件發生時,Libevent會調用這些回調函數處理相應的事件。
lbevent的事件支持三種,分別是網絡IO、定時器和信號。定時器的數據結構使用最小堆(Min Heap),以提高效率。網絡IO和信號的數據結構采用了雙向鏈表(TAILQ)。
安裝
libevent的安裝很簡單,我是直接從github上clone下一個源碼,然后進行編譯安裝的。
具體的命令是(假設你已經安裝了git):
# git clone https://github.com/nmathewson/Libevent.git
# cd Libevent
# sh autogen.sh
# ./configure && make
# make install
# make verify //驗證安裝
2 Linux下libevent主要API介紹
現在的libevent版本已經到達libevent2了,其增加了多線程的支持,API函數也發生了一些微小的變化。
-
創建事件集
struct event_base *event_base_new(void)
-
創建事件
struct event event_new(struct event_base * ,evutil_socket_t ,short ,event_callback_fn,void)
參數一:事件所在的事件集。
參數二:socket的描述符。
參數三:事件類型,其中EV_READ表示等待讀事件發生,EV_WRITE表示寫事件發生,或者它倆的組合,EV_SIGNAL表示需要等待事件的號碼,如 果不包含上述的標志,就是超時事件或者手動激活的事件。
參數四:事件發生時需要調用的回調函數。
參數五:回調函數的參數值。 -
添加事件和刪除事件
int event_add(struct event * ev,const struct timeval* timeout)
參數一:需要添加的事件
參數二:事件的最大等待事件,如果是NULL的話,就是永久等待int event_del(struct event *)
參數一:需要刪除的事件 -
分配監聽事件
int event_base_dispatch(struct event_base * )
參數一:需要監視的事件集
-
I/O buffer事件
struct bufferevent* bufferevent_socket_new
(struct event_base * base,evutil_socket_t fd,int options)參數一:需要添加到的時間集
參數二:相關的文件描述符
參數三:0或者是相應的BEV_OPT_*可選標志int bufferevent_enable(struct bufferevent * bev,short event)
參數一:需要啟用的bufferevent
參數二:any combination of EV|READ | EV_WRITEint bufferevent_disable(struct bufferevent * bev,short event)
參數說明:同上
size_t bufferevent_read(struct bufferevent *bev,void * data,size_t size)
參數一:讀取的buffer_event事件
參數二:存儲數據的指針
參數三:數據buffer的大小返回值:讀取數據的字節數
int bufferevent_write(struct bufferevent *bev,const void * data,size_t size)
參數一:讀取的buffer_event事件
參數二:存儲數據的指針
參數三:要寫入的數據的大小,字節數
如果你想知道更多的API使用情況,請點擊這里。
3.1 編程實例之聊天室服務器
下面,就基於libevent2編寫一個聊天室服務器。
設計思想:首先創建一個套接字,進而創建一個事件對此端口進行監聽,將所請求的用戶組成一個隊列,並監聽所有的用戶事件,當某個用戶說話了,產生了讀事件,就將該用戶的發言發送給隊列中的其他用戶。
程序分析
需要包含的libevent函數頭:
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
創建一個client結構體,接受連接后存放數據:
struct client {
/* The clients socket. */
int fd;
/* The bufferedevent for this client. */
struct bufferevent *buf_ev;
struct bufferevent *buf_ev;
/*
* This holds the pointers to the next and previous entries in
* the tail queue.
*/
TAILQ_ENTRY(client) entries;
};
先來看下mian函數的處理:
int
main(int argc, char **argv)
{
int listen_fd;
struct sockaddr_in listen_addr;
struct event ev_accept;
int reuseaddr_on;
/* Initialize libevent. */
evbase = event_base_new();
/* Initialize the tailq. */
TAILQ_INIT(&client_tailq_head);
/* Create our listening socket. */
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
err(1, "listen failed");
memset(&listen_addr, 0, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(SERVER_PORT);
if (bind(listen_fd, (struct sockaddr *)&listen_addr,
sizeof(listen_addr)) < 0)
err(1, "bind failed");
if (listen(listen_fd, 5) < 0)
err(1, "listen failed");
reuseaddr_on = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
sizeof(reuseaddr_on));
/* Set the socket to non-blocking, this is essential in event
* based programming with libevent. */
if (setnonblock(listen_fd) < 0)
err(1, "failed to set server socket to non-blocking");
/* We now have a listening socket, we create a read event to
* be notified when a client connects. */
event_assign(&ev_accept, evbase, listen_fd, EV_READ|EV_PERSIST,
on_accept, NULL);
event_add(&ev_accept, NULL);
/* Start the event loop. */
event_base_dispatch(evbase);
return 0;
}
首先,函數初始化了一個用戶隊列tailq,接着創建了一個socket套接字,並將套接字設定為非阻塞模式,接着對一個全局的evbase事件集合,注冊了事件,事件源是listen_fd,回調函數是on_accept,事件發生的情況是EV_READ,而且標志EV_PESIST表明該事件一直存在,而后開啟事件掃描循環event_base_dispatch(evbase)
。
再看一下回調函數on_accpet實現:
void
on_accept(int fd, short ev, void *arg)
{
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
struct client *client;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (setnonblock(client_fd) < 0)
warn("failed to set client socket non-blocking");
/* We've accepted a new client, create a client object. */
client = calloc(1, sizeof(*client));
if (client == NULL)
err(1, "malloc failed");
client->fd = client_fd;
client->buf_ev = bufferevent_socket_new(evbase, client_fd, 0);
bufferevent_setcb(client->buf_ev, buffered_on_read, NULL,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
/* Add the new client to the tailq. */
TAILQ_INSERT_TAIL(&client_tailq_head, client, entries);
printf("Accepted connection from %s\n",
inet_ntoa(client_addr.sin_addr));
}
這個回調函數的作用很顯然,就是接受了一個客戶端的請求,並申請好了一個client
信息,將需要的內容填寫好,在填寫中需要注意的是,又向上述的事件集evbase
中注冊了一個bufferevent
事件client->buf_ev
,並注冊了回調函數buffered_on_read
,buffered_on_error
,這三個函數分別是當接受后的連接發生了讀或者錯誤事件后的執行函數。接着,將用戶的client
結構放入了用戶的隊列tailq
中去。
用戶的buffer可讀后的執行函數:
void
buffered_on_read(struct bufferevent *bev, void *arg)
{
struct client *this_client = arg;
struct client *client;
uint8_t data[8192];
size_t n;
/* Read 8k at a time and send it to all connected clients. */
for (;;) {
n = bufferevent_read(bev, data, sizeof(data));
if (n <= 0) {
/* Done. */
break;
}
/* Send data to all connected clients except for the
* client that sent the data. */
TAILQ_FOREACH(client, &client_tailq_head, entries) {
if (client != this_client) {
bufferevent_write(client->buf_ev, data, n);
}
}
}
}
執行函數的作用很明顯,將libevent管理中的buffer
數據讀取出,存入本地的data
數組內,然后對隊列中的client
進行檢索,如果不是發數據的client
,則將數據寫入該client
的buffer中,發送給該用戶。這里注意的是需要反復讀取buffer
中的數據,防止一個讀取並沒有讀取干凈,直到讀取不到數據為止。
buffer出錯處理函數和上述函數差不多,功能就是出錯后,結束掉保存的client結構,詳細就不說了。
程序源碼: 點擊這里
編譯的時候記得修改Makefile中Libevent文件夾的位置
3.2 編程實例之回顯服務器(純異步IO)
設計思想:所謂回顯服務器就是將客戶端發過來的數據再發回去,這里主要也就是說明libevent的純IO復用實現。實現方法和上面的差不多,甚至可以說更加簡單。
程序和上面的聊天服務器差不多,只是在buffer可讀的事件函數中,不是將用戶的數據發送給其他用戶,而是直接發送給用戶本身。
程序源碼: 點擊這里
3.3 編程實例之回顯服務器(多線程--per connection per thread)
設計思想:上面的方法單純使用libevent的簡單函數來實現服務,但是這里,我們假設我們需要處理的客戶端很少,於是我們可以使用對於每個連接我們分配一個線程這樣的方式來實現對用戶的服務。這種方式簡單有效,一對一服務,就算業務邏輯出現阻塞也不怕。
程序分析
首先定義了一些數據結構,worker數據結構定義的是一個工作者,它包含有一個工作線程,和結束標志,需要獲取的工作隊列,和建立鏈表需要的指針。job數據結構定義的是操作一個job的方法和對象,這回到程序中,實際上就是指的是事件發生后,封裝好的client結構體和處理這個結構體的方法。workqueue數據結構指的是當前的工作隊列中的工作者,以及工作隊列中的待完成的工作,以及互斥鎖和條件變量(因為多個工作進程需要訪問這些資源)。
具體的流程就是,用一個主線程監聽一個套接字,並將套接字接受到的連接accept,並創建一個client數據結構保存該連接的信息,在這個client結構中注冊一個bufferevent事件,注冊到client->evbase上(這時候這是向client中的evbase注冊了一個事件還沒有進行循環這個事件集)。
接着,當監聽到某個client有bufferevent事件發生,主線程就把該client結構體和需要進行的工作方法包裝成一個job結構,然后把這個job扔到workqueue上去,並通知各個工作者。而后,各個工作者開着的線程就被激活了,瘋狂地去workqueue上去搶工作做,某個worker拿到工作后,就可以解包job,根據job的工作說明書(job_function)操作工作對象(client)了。這里,job的工作說明有是循環client中的client->evbase,於是這樣線程就會一直去監視這個連接的狀態,如果有數據就這會調用回調函數進行處理。同時,這個線程也就是阻塞在這里,這對這一個連接負責。
建立workqueue需要的結構體和函數有:
typedef struct worker {
pthread_t thread;
int terminate;
struct workqueue *workqueue;
struct worker *prev;
struct worker *next;
} worker_t;
typedef struct job {
void (*job_function)(struct job *job);
void *user_data;
struct job *prev;
struct job *next;
} job_t;
typedef struct workqueue {
struct worker *workers;
struct job *waiting_jobs;
pthread_mutex_t jobs_mutex;
pthread_cond_t jobs_cond;
} workqueue_t;
int workqueue_init(workqueue_t *workqueue, int numWorkers);
void workqueue_shutdown(workqueue_t *workqueue);
void workqueue_add_job(workqueue_t *workqueue, job_t *job);
主線程的on_accept函數為:
void on_accept(evutil_socket_t fd, short ev, void *arg) {
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
workqueue_t *workqueue = (workqueue_t *)arg;
client_t *client;
job_t *job;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (evutil_make_socket_nonblocking(client_fd) < 0)
{
warn("failed to set client socket to non-blocking");
close(client_fd);
return;
}
/* Create a client object. */
if ((client = malloc(sizeof(*client))) == NULL)
{
warn("failed to allocate memory for client state");
close(client_fd);
return;
}
memset(client, 0, sizeof(*client));
client->fd = client_fd;
/* Add any custom code anywhere from here to the end of this function
* to initialize your application-specific attributes in the client struct.
*/
if ((client->output_buffer = evbuffer_new()) == NULL)
{
warn("client output buffer allocation failed");
closeAndFreeClient(client);
return;
}
if ((client->evbase = event_base_new()) == NULL)
{
warn("client event_base creation failed");
closeAndFreeClient(client);
return;
}
client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, BEV_OPT_CLOSE_ON_FREE);
if ((client->buf_ev) == NULL) {
warn("client bufferevent creation failed");
closeAndFreeClient(client);
return;
}
bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
/* Create a job object and add it to the work queue. */
if ((job = malloc(sizeof(*job))) == NULL) {
warn("failed to allocate memory for job state");
closeAndFreeClient(client);
return;
}
job->job_function = server_job_function;
job->user_data = client;
workqueue_add_job(workqueue, job);
}
job中的工作指南為:
static void server_job_function(struct job *job) {
client_t *client = (client_t *)job->user_data;
//do my job
event_base_dispatch(client->evbase);
closeAndFreeClient(client);
free(job);
}
程序源碼: 點擊這里
3.4 編程實例之回顯服務器(多線程--線程池+異步IO)
設計思想:假設我們的用戶很多,高並發,長連接,那么我們還是來用I/O復用和線程池實現吧,用一個控制線程通過I/O復用負責監聽和分發事件,用一組線程池來進行處理事件,這樣就可以靈活地將控制邏輯和業務邏輯分開了,見下述講解。
程序分析
具體的流程和上面的差不多,用一個主線程監聽一個套接字,並將套接字接受到的連接accept,並創建一個client數據結構保存該連接的信息,在這個client結構中注冊一個bufferevent事件,但是這里,將事件注冊到accept_evbase中,仍然用主線程進行監聽。
而面對監聽后出現的事件,將client和操作client的方法打包成一個job,放到上述的workqueue中去,讓工作進程來完成。這樣的操作和上述的差別在於上述方法將bufferevent注冊到client中的evbase中,用工作線程監聽,而本方法用主線程監聽,工作線程負責處理監聽產生的事件。
這要的差別在於兩個函數 on_accept函數:
void on_accept(evutil_socket_t fd, short ev, void *arg) {
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
client_t *client;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (evutil_make_socket_nonblocking(client_fd) < 0) {
warn("failed to set client socket to non-blocking");
close(client_fd);
return;
}
/* Create a client object. */
if ((client = malloc(sizeof(*client))) == NULL) {
warn("failed to allocate memory for client state");
close(client_fd);
return;
}
memset(client, 0, sizeof(*client));
client->fd = client_fd;
/* Add any custom code anywhere from here to the end of this function
* to initialize your application-specific attributes in the client struct.
*/
if ((client->output_buffer = evbuffer_new()) == NULL) {
warn("client output buffer allocation failed");
closeAndFreeClient(client);
return;
}
//需要注意的是,這里注冊到evbase_accept
client->buf_ev = bufferevent_socket_new(evbase_accept, client_fd,BEV_OPT_CLOSE_ON_FREE);
if ((client->buf_ev) == NULL) {
warn("client bufferevent creation failed");
closeAndFreeClient(client);
return;
}
bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
}
在buffered_on_read中,提交job。
void buffered_on_read(struct bufferevent *bev, void *arg)
{
client_t *client = (client_t *)arg;
job_t *job;
/* Create a job object and add it to the work queue. */
if ((job = malloc(sizeof(*job))) == NULL) {
warn("failed to allocate memory for job state");
closeAndFreeClient(client);
return;
}
job->job_function = server_job_function;
job->user_data = client;
workqueue_add_job(&workqueue, job);
}
在job工作指南server_job_function中就可以做你工作該做的事兒了,根據發來的信息進行數據庫處理,http返回等等。
程序源碼: 點擊這里
4 參考文章
[1] http://www.ibm.com/developerworks/cn/aix/library/au-libev/
[2] http://wenku.baidu.com/link?url=RmSm9M9mc4buqB_j6BGou5GxgyAn14lif18UUsQ8gr7pClAKGJr3civ8-DM0Xrpv4MdVIajykzbg34ZbGjGEizL8fOYE-EOKAATZIV06qwa
[3] http://blog.csdn.net/mafuli007/article/details/7476014
[4] http://blog.csdn.net/sparkliang/article/details/4957667
[5] http://bbs.chinaunix.net/thread-4118501-1-1.html
[6] http://bbs.chinaunix.net/thread-3776236-1-1.html
[7] http://www.zhihu.com/question/20114168
[8] http://www.zhihu.com/question/21516827
[9] http://www.wangafu.net/~nickm/libevent-2.0/doxygen/html/
[10] Libevent中文參考手冊