輕量級網絡庫libevent初探


  本文是關於libevent庫第一篇博文,主要由例子來說明如何利用該庫。后續博文再深入研究該庫原理。

libevent庫簡介

  就如libevent官網上所寫的“libevent - an event notification library”,libevent就是一個基於事件通知機制的庫,支持/dev/poll、kqueue、event ports、select、poll和epoll事件機制,也因此它是一個跨操作系統的庫(支持Linux、*BSD、Mac OS X、Solaris、Windows等)。目前應用該庫的有Chromium、Memcached、NTP、tmux等應用。

  libevent 庫實際上沒有更換select()、poll()或其他機制的基礎,而是使用對於每個平台最高效的高性能解決方案,在其實現外加上一個包裝器。

  為了實際處理每個請求,libevent 庫提供一種事件機制,它作為底層網絡后端的包裝器。事件系統讓為連接添加處理函數變得非常簡便,同時降低了底層 I/O 復雜性。這是 libevent 系統的核心。

  libevent 庫的其他組件提供其他功能,包括緩沖的事件系統(用於緩沖發送到客戶端/從客戶端接收的數據)以及 HTTP、DNS 和 RPC 系統的核心實現。

  另外,libevent庫非常輕量級,這讓我們學習它的源碼難度低了不少。關於源碼分析具體可參考:

  Libevent源碼分析

  libevent源碼深度剖析

  如果要生成libevent庫的文檔,可參考博文使用Doxygen生成libevent document(2.0.15)-- CHM格式。 

回顯服務端示例

簡易流程

  創建 libevent 服務器的基本方法是,注冊當發生某一操作(比如接受來自客戶端的連接)時應該執行的函數,然后調用主事件循環event_base_dispatch()。執行過程的控制由 libevent系統處理。注冊事件和將調用的函數之后,事件系統開始自治;在應用程序運行時,可以在事件隊列中添加(注冊)或刪除(取消注冊)事件。事件注冊非常方便,可以通過它添加新事件以處理新打開的連接,從而構建靈活的網絡處理系統。

  例如,可以打開一個監聽套接字,然后注冊一個回調函數,每當需要調用accept()函數以打開新連接時調用這個回調函數,這樣就創建了一個網絡服務器。下邊所示的代碼片段說明了這個基本過程:

 1 int main(int argc, char **argv)
 2 {
 3     /* Declare a socket file descriptor. */
 4     evutil_socket_t listenfd;
 5 
 6     /* Setup listening socket */
 7 
 8     /* Make the listen socket reuseable and non-blocking. */
 9     evutil_make_listen_socket_reuseable(listenfd);
10     evutil_make_socket_nonblocking(listenfd);
11 
12     /* Declare an event_base to host events. */
13     struct event_base *base = event_base_new();
14 
15     /* Register listen event. */
16     struct event *listen_event;
17     listen_event = event_new(base, listenfd, EV_READ | EV_PERSIST, do_accetp, (void *)base);
18     event_add(listen_event, NULL);
19 
20     /* Start the event loop. */
21     event_base_dispatch(base);
22 
23     /* End. */
24     close(listenfd);
25 return 0; 26 }

  下邊詳細介紹上邊程序中用到的libevent中的API:

  1)evutil_socket_t 定義於Util.h頭文件中,用於跨平台表示socket的ID(在Linux下表示的是其文件描述符),如下所示:

/**
 * A type wide enough to hold the output of "socket()" or "accept()".  On
 * Windows, this is an intptr_t; elsewhere, it is an int. */
#ifdef WIN32
#define evutil_socket_t intptr_t
#else
#define evutil_socket_t int
#endif
View Code

  2)evutil_make_listen_socket_reuseable 函數聲明於Util.h,實現於Evutil.c,用於跨平台將socket設置為可重用(實際上是將端口設為可重用,具體可參照博文Linux 套接字編程中的 5 個隱患中的第3個隱患),具體定義如下:

int
evutil_make_listen_socket_reuseable(evutil_socket_t sock)
{
#ifndef WIN32
    int one = 1;
    /* REUSEADDR on Unix means, "don't hang on to this address after the
     * listener is closed."  On Windows, though, it means "don't keep other
     * processes from binding to this address while we're using it. */
    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
        (ev_socklen_t)sizeof(one));
#else
    return 0;
#endif
}
View Code

  同樣,evutil_make_socket_nonblocking函數也聲明於Util.h,實現於Evutil.c,用於跨平台將socket設置為非阻塞,具體定義如下:

int
evutil_make_socket_nonblocking(evutil_socket_t fd)
{
#ifdef WIN32
    {
        u_long nonblocking = 1;
        if (ioctlsocket(fd, FIONBIO, &nonblocking) == SOCKET_ERROR) {
            event_sock_warn(fd, "fcntl(%d, F_GETFL)", (int)fd);
            return -1;
        }
    }
#else
    {
        int flags;
        if ((flags = fcntl(fd, F_GETFL, NULL)) < 0) {
            event_warn("fcntl(%d, F_GETFL)", fd);
            return -1;
        }
        if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
            event_warn("fcntl(%d, F_SETFL)", fd);
            return -1;
        }
    }
#endif
    return 0;
}
View Code

   3)event_base結構體定義在event_internal.h中,它記錄了所有的等待和已激活的事件,並當有事件被激活時通知調用者。默認地,我們用event_base_new函數就可以新建一個event_base對象。event_base_new函數的定義如下:

struct event_base *
event_base_new(void)
{
    struct event_base *base = NULL;
    struct event_config *cfg = event_config_new();
    if (cfg) {
        base = event_base_new_with_config(cfg);
        event_config_free(cfg);
    }
    return base;
}

  也就是說實際上該函數調用了event_base_new_with_config來創建event_base對象,所以我們也可以利用event_config_new和event_base_new_with_config定制event_base對象。

  4)event結構體定義在event_struct.h文件中,主要記錄事件的相關屬性。event_new函數用於創建一個event對象,具體定義如下:

struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
    struct event *ev;
    ev = mm_malloc(sizeof(struct event));
    if (ev == NULL)
        return (NULL);
    if (event_assign(ev, base, fd, events, cb, arg) < 0) {
        mm_free(ev);
        return (NULL);
    }

    return (ev);
}
// Parameters:
    // base            the event base to which the event should be attached.
    // fd              the file descriptor or signal to be monitored, or -1.
    // events          desired events to monitor: bitfield of EV_READ, EV_WRITE, EV_SIGNAL, EV_PERSIST, EV_ET.
    // callback        callback function to be invoked when the event occurs
    // callback_arg    an argument to be passed to the callback function
// Returns:
    // a newly allocated struct event that must later be freed with event_free().
View Code

  在上邊程序中,cb是回調函數,其原型如下:

/**
   A callback function for an event.

   It receives three arguments:

   @param fd An fd or signal
   @param events One or more EV_* flags
   @param arg A user-supplied argument.

   @see event_new()
 */
typedef void (*event_callback_fn)(evutil_socket_t, short, void *);

  5)event_base_dispatch函數開啟事件輪詢(event_base_loop提供同樣功能,不過更為靈活,實際event_base_dispatch只是event_base_loop的特例),定義如下:

int
event_base_dispatch(struct event_base *event_base)
{
    return (event_base_loop(event_base, 0));
}

實際例子 

   一個完整的服務器端的程序如下:

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <errno.h>
  4 #include <assert.h>
  5 #include <unistd.h>
  6 #include <netinet/in.h>
  7 #include <sys/socket.h>
  8 #include <arpa/inet.h>
  9 #include <sys/types.h>
 10 
 11 #include <event2/event.h>
 12 #include <event2/bufferevent.h>
 13 
 14 #define SERV_PORT 9877
 15 #define LISTEN_BACKLOG 32
 16 #define MAX_LINE 1024
 17 
 18 void do_accetp(evutil_socket_t listenfd, short event, void *arg);
 19 void read_cb(struct bufferevent *bev, void *arg);
 20 void error_cb(struct bufferevent *bev, short event, void *arg);
 21 void write_cb(struct bufferevent *bev, void *arg);
 22 
 23 int main(int argc, int **argv)
 24 {
 25     evutil_socket_t listenfd;
 26     if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
 27     {
 28         perror("socket\n");
 29         return 1;
 30     }
 31 
 32     struct sockaddr_in servaddr;
 33     bzero(&servaddr, sizeof(servaddr));
 34     servaddr.sin_family = AF_INET;
 35     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 36     servaddr.sin_port = htons(SERV_PORT);
 37 
 38     if(bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
 39     {
 40         perror("bind\n");
 41         return 1;
 42     }
 43     if(listen(listenfd, LISTEN_BACKLOG) < 0)
 44     {
 45         perror("listen\n");
 46         return 1;
 47     }
 48 
 49     printf("Listening...\n");
 50 
 51     evutil_make_listen_socket_reuseable(listenfd);
 52     evutil_make_socket_nonblocking(listenfd);
 53 
 54     struct event_base *base = event_base_new();
 55     if(base == NULL)
 56     {
 57         perror("event_base\n");
 58         return 1;
 59     }
 60     const char *eventMechanism = event_base_get_method(base);
 61     printf("Event mechanism used is %s\n", eventMechanism);
 62 
 63     struct event *listen_event;
 64     listen_event = event_new(base, listenfd, EV_READ | EV_PERSIST, do_accetp, (void *)base);
 65     event_add(listen_event, NULL);
 66     event_base_dispatch(base);
 67 
 68     if(close(listenfd) < 0)
 69     {
 70         perror("close\n");
 71         return 1;
 72     }
 73     printf("The End\n");
 74     return 0;
 75 }
 76 
 77 void do_accetp(evutil_socket_t listenfd, short event, void *arg)
 78 {
 79     struct event_base *base = (struct event_base *)arg;
 80     evutil_socket_t fd;
 81     struct sockaddr_in cliaddr;
 82     socklen_t clilen;
 83     fd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen);
 84     if(fd < 0)
 85     {
 86         perror("accept\n");
 87         return;
 88     }
 89     if(fd > FD_SETSIZE)
 90     {
 91         perror("fd > FD_SETSIZE");
 92         if(close(fd) < 0)
 93         {
 94             perror("close\n");
 95             return;
 96         }
 97         return;
 98     }
 99     
100     printf("Accept: fd = %u\n", fd);
101     
102     struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
103     bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
104     bufferevent_enable(bev, EV_READ | EV_WRITE | EV_PERSIST);
105 }
106     
107 void read_cb(struct bufferevent *bev, void *arg)
108 {
109     char line[MAX_LINE + 1];
110     int n;
111     evutil_socket_t fd = bufferevent_getfd(bev);
112 
113     while((n = bufferevent_read(bev, line, MAX_LINE)) > 0)
114     {
115         line[n] = '\0';
116         printf("fd = %u, read line: %s", fd, line);
117         bufferevent_write(bev, line, n);
118     }
119 }
120 
121 void error_cb(struct bufferevent *bev, short event, void *arg)
122 {
123     evutil_socket_t fd = bufferevent_getfd(bev);
124     printf("fd = %u, ", fd);
125     if(event & BEV_EVENT_TIMEOUT)
126         printf("Time out.\n");  // if bufferevent_set_timeouts() is called
127     else if(event & BEV_EVENT_EOF)
128         printf("Connection closed.\n");
129     else if(event & BEV_EVENT_ERROR)
130         printf("Some other error.\n");
131     bufferevent_free(bev);
132 }
133 
134 void write_cb(struct bufferevent *bev, void *arg)
135 {
136     // leave blank
137 }
View Code

  注意:在Linux下編譯時需要加libevent靜態庫event,即gcc ... -levent。

  上邊程序中用到的bufferevent值得再說明一下。bufferevent由一個底層的傳輸端口(如套接字)、一個讀取緩沖區和一個寫入緩沖區組成。與通常的事件在底層傳輸端口已經就緒,可以讀取或者寫入的時候執行回調不同的是,bufferevent在讀取或者寫入了足夠量的數據之后調用用戶提供的回調。詳細可參考博文libevent參考手冊第六章:bufferevent:概念和入門

  利用bufferevent的簡易流程如下:

1 struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
2 bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
3 bufferevent_enable(bev, EV_READ | EV_WRITE | EV_PERSIST);

   bufferevent_setcb使得我們可以定制我們自己的回調函數,這里我們只用到了讀和錯誤回調函數。最后,我們要調用bufferevent_enable來使得bufferevent啟動。

客戶端示例

  客戶端用到的libevent的API跟服務端的基本一樣。具體程序如下:

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <errno.h>
  4 #include <unistd.h>
  5 #include <string.h>
  6 #include <netinet/in.h>
  7 #include <sys/socket.h>
  8 #include <arpa/inet.h>
  9 #include <sys/types.h>
 10 
 11 #include <event2/event.h>
 12 #include <event2/bufferevent.h>
 13 
 14 #define SERV_PORT 9877
 15 #define MAX_LINE 1024
 16 
 17 void cmd_msg_cb(int fd, short event, void *arg);
 18 void read_cb(struct bufferevent *bev, void *arg);
 19 void error_cb(struct bufferevent *bev, short event, void *arg);
 20 
 21 int main(int argc, char *argv[])
 22 {
 23     if(argc < 2)
 24     {
 25         perror("usage: echocli <IPadress>");
 26         return 1;
 27     }
 28 
 29     evutil_socket_t sockfd;
 30     if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
 31     {
 32         perror("socket\n");
 33         return 1;
 34     }
 35 
 36     struct sockaddr_in servaddr;
 37     bzero(&servaddr, sizeof(servaddr));
 38     servaddr.sin_family = AF_INET;
 39     servaddr.sin_port = htons(SERV_PORT);
 40     if(inet_pton(AF_INET, argv[1], &servaddr.sin_addr) < 1)
 41     {
 42         perror("inet_ntop\n");
 43         return 1;
 44     }
 45     if(connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
 46     {
 47         perror("connect\n");
 48         return 1;
 49     }
 50     evutil_make_socket_nonblocking(sockfd);
 51 
 52     printf("Connect to server sucessfully!\n");
 53 
 54     struct event_base *base = event_base_new();
 55     if(base == NULL)
 56     {
 57         perror("event_base\n");
 58         return 1;
 59     }
 60     const char *eventMechanism = event_base_get_method(base);
 61     printf("Event mechanism used is %s\n", eventMechanism);
 62     printf("sockfd = %d\n", sockfd);
 63 
 64     struct bufferevent *bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
 65     
 66     struct event *ev_cmd;
 67     ev_cmd = event_new(base, STDIN_FILENO, EV_READ | EV_PERSIST, cmd_msg_cb, (void *)bev);
 68     event_add(ev_cmd, NULL);
 69     
 70     bufferevent_setcb(bev, read_cb, NULL, error_cb, (void *)ev_cmd);
 71     bufferevent_enable(bev, EV_READ | EV_PERSIST);
 72     
 73     event_base_dispatch(base);
 74 
 75     printf("The End.");
 76     return 0;
 77 }
 78 
 79 void cmd_msg_cb(int fd, short event, void *arg)
 80 {
 81     char msg[MAX_LINE];
 82     int nread = read(fd, msg, sizeof(msg));
 83     if(nread < 0)
 84     {
 85         perror("stdio read fail\n");
 86         return;
 87     }
 88 
 89     struct bufferevent *bev = (struct bufferevent *)arg;
 90     bufferevent_write(bev, msg, nread);
 91 }
 92 
 93 void read_cb(struct bufferevent *bev, void *arg)
 94 {
 95     char line[MAX_LINE + 1];
 96     int n;
 97     evutil_socket_t fd = bufferevent_getfd(bev);
 98 
 99     while((n = bufferevent_read(bev, line, MAX_LINE)) > 0)
100     {
101         line[n] = '\0';
102         printf("fd = %u, read from server: %s", fd, line);
103     }
104 }
105 
106 void error_cb(struct bufferevent *bev, short event, void *arg)
107 {
108     evutil_socket_t fd = bufferevent_getfd(bev);
109     printf("fd = %u, ", fd);
110     if(event & BEV_EVENT_TIMEOUT)
111         printf("Time out.\n");  // if bufferevent_set_timeouts() is called
112     else if(event & BEV_EVENT_EOF)
113         printf("Connection closed.\n");
114     else if(event & BEV_EVENT_ERROR)
115         printf("Some other error.\n");
116     bufferevent_free(bev);
117 
118     struct event *ev = (struct event *)arg;
119     event_free(ev);
120 }
View Code

參考資料

  libevent入門

  A tiny introduction to asynchronous IO

  使用 libevent 和 libev 提高網絡應用性能

    libevent參考手冊第六章:bufferevent:概念和入門

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM