libevent粘包分包解決方案:bufferevent + evbuffer


轉自:http://blog.sina.com.cn/s/blog_9f1496990102vshz.html

 

原文:http://www.lvtao.net/c/631.html

Libevent介紹

libevent是一個事件觸發的網絡庫,適用於windows、linux、bsd等多種平台,內部使用select、epoll、kqueue等系統調用管理事件機制。著名分布式緩存軟件memcached也是libevent based,而且libevent在使用上可以做到跨平台,而且根據libevent官方網站上公布的數據統計,似乎也有着非凡的性能。

libevent官方網站 http://libevent.org/

英文文檔 http://www.wangafu.net/~nickm/libevent-book/

中文文檔 http://www.cppblog.com/mysileng/category/20374.html

Libevent是基於事件的網絡庫。說的通俗點,例如我的客戶端連接到服務端屬於一個連接的事件,當這個事件觸發的時候就會去處理。

該文章閱讀過程中,請結合下面的socket例子,可能會更加清晰的理解每一個接口的用法。

event_base

1. 創建event_base

event_base是event(事件,后面會講event)的一個集合。event_base中存放你是監聽是否就緒的event。一般情況下一個線程一個event_base,多個線程的情況下需要開多個event_base。

event_base主要是用來管理和實現事件的監聽循環。

一般情況下直接new一個event_base就可以滿足大部分需求了,如果需要配置參數的,可以參見libevent官網。

創建方法:

struct event_base *event_base_new(void);

銷毀方法:

void event_base_free(struct event_base *base);

重新初始化:

int event_reinit(struct event_base *base);

2. 查看IO模型

IO多路復用模型中,有多種方法可以供我們選擇,但是這些模型是在不同的平台下面的: select  poll  epoll  kqueue  devpoll  evport  win32

當我們創建一個event_base的時候,libevent會自動為我們選擇最快的IO多路復用模型,Linux下一般會用epoll模型。

下面這個方法主要是用來獲取IO模型的名稱。

const char *event_base_get_method(const struct event_base *base);

3. 銷毀event_base

void event_base_free(struct event_base *base);

4. 事件循環 event loop

我們上面說到 event_base是一組event的集合,我們也可以將event事件注冊到這個集合中。當需要事件監聽的時候,我們就需要對這個event_base進行循環。

下面這個函數非常重要,會在內部不斷的循環監聽注冊上來的事件。

int event_base_dispatch(struct event_base *base);

返回值:0 表示成功退出  -1 表示存在錯誤信息。

還可以用這個方法:

#define EVLOOP_ONCE             0x01  #define EVLOOP_NONBLOCK         0x02  #define EVLOOP_NO_EXIT_ON_EMPTY 0x04  
  int event_base_loop(struct event_base *base, int flags);

event_base_loop這個方法會比event_base_dispatch這個方法更加靈活一些。

EVLOOP_ONCE: 阻塞直到有一個活躍的event,然后執行完活躍事件的回調就退出。

EVLOOP_NONBLOCK : 不阻塞,檢查哪個事件准備好,調用優先級最高的那一個,然后退出。

0:如果參數填了0,則只有事件進來的時候才會調用一次事件的回調函數,比較常用

事件循環停止的情況:

1. event_base中沒有事件event

2. 調用event_base_loopbreak(),那么事件循環將停止

3. 調用event_base_loopexit(),那么事件循環將停止

4. 程序錯誤,異常退出

兩個退出的方法:

// 這兩個函數成功返回 0 失敗返回 -1  // 指定在 tv 時間后停止事件循環  // 如果 tv == NULL 那么將無延時的停止事件循環  int event_base_loopexit(struct event_base *base,const struct timeval *tv);  
// 立即停止事件循環(而不是無延時的停止)  int event_base_loopbreak(struct event_base *base);

兩個方法區別:

1. event_base_loopexit(base, NULL) 如果當前正在為多個活躍事件調用回調函數,那么不會立即退出,而是等到所有的活躍事件的回調函數都執行完成后才退出事件循環

2. event_base_loopbreak(base) 如果當前正在為多個活躍事件調用回調函數,那么當前正在調用的回調函數會被執行,然后馬上退出事件循環,而並不處理其他的活躍事件了

5. event_base的例子:

#include <<span>stdio.h>  #include <<span>stdlib.h>  #include <<span>unistd.h>  #include <<span>sys/types.h>   #include <<span>sys/socket.h>          #include <<span>netinet/in.h>          #include <<span>arpa/inet.h>  #include <<span>string.h>  #include <<span>fcntl.h>   #include <<span>event2/event.h>  #include <<span>event2/bufferevent.h>  int main() {  
puts("init a event_base!");  
struct event_base *base; //定義一個event_base  
base = event_base_new(); //初始化一個event_base  
const char *x =  event_base_get_method(base); //查看用了哪個IO多路復用模型,linux一下用epoll  
printf("METHOD:%s\n", x);  
int y = event_base_dispatch(base); //事件循環。因為我們這邊沒有注冊事件,所以會直接退出  
event_base_free(base);  //銷毀libevent  
return 1;  
}

返回:

event 事件

event_base是事件的集合,負責事件的循環,以及集合的銷毀。而event就是event_base中的基本單元:事件。

我們舉一個簡單的例子來理解事件。例如我們的socket來進行網絡開發的時候,都會使用accept這個方法來阻塞監聽是否有客戶端socket連接上來,如果客戶端連接上來,則會創建一個線程用於服務端與客戶端進行數據的交互操作,而服務端會繼續阻塞等待下一個客戶端socket連接上來。客戶端連接到服務端實際就是一種事件。

1. 創建一個事件event

struct event *event_new(struct event_base *base, evutil_socket_t fd,short what, event_callback_fn cb,void *arg);

參數:

1. base:即event_base

2. fd:文件描述符。

3. what:event關心的各種條件。

4. cb:回調函數。

5. arg:用戶自定義的數據,可以傳遞到回調函數中去。

libevent是基於事件的,也就是說只有在事件到來的這種條件下才會觸發當前的事件。例如:

1. fd文件描述符已准備好可寫或者可讀

2. fd馬上就准備好可寫和可讀。

3. 超時的情況 timeout

4. 信號中斷

5. 用戶觸發的事件

what參數 event各種條件:

// 超時  #define EV_TIMEOUT 0x01  // event 相關的文件描述符可以讀了  #define EV_READ 0x02  // event 相關的文件描述符可以寫了  #define EV_WRITE 0x04  // 被用於信號檢測(詳見下文)  #define EV_SIGNAL 0x08  // 用於指定 event 為 persistent 持久類型。當事件執行完畢后,不會被刪除,繼續保持pending等待狀態;  // 如果是非持久類型,則回調函數執行完畢后,事件就會被刪除,想要重新使用這個事件,就必須將這個事件繼續添加event_add   #define EV_PERSIST 0x10  // 用於指定 event 會被邊緣觸發  #define EV_ET 0x20

2. 釋放event_free

真正的釋放event的內存。

void event_free(struct event *event);

event_del 清理event的內存。這個方法並不是真正意義上的釋放內存。

當函數會將事件轉為 非pending和非activing的狀態。

int event_del(struct event *event);

3. 注冊event

該方法將用於向event_base注冊事件。

參數:ev 為事件指針;tv 為時間指針。當tv = NULL的時候則無超時時間。

函數返回:0表示成功 -1 表示失敗。

int event_add(struct event *ev, const struct timeval *tv);

tv時間結構例子:

struct timeval five_seconds = {5, 0};  
event_add(ev1, &five_seconds);

4.event_assign

event_new每次都會在堆上分配內存。有些場景下並不是每次都需要在堆上分配內存的,這個時候我們就可以用到event_assign方法。

已經初始化或者處於 pending 的 event,首先需要調用 event_del() 后再調用 event_assign()。這個時候就可以重用這個event了。

// 此函數用於初始化 event(包括可以初始化棧上和靜態存儲區中的 event)  // event_assign() 和 event_new() 除了 event 參數之外,使用了一樣的參數  // event 參數用於指定一個未初始化的且需要初始化的 event  // 函數成功返回 0 失敗返回 -1  int event_assign(struct event *event, struct event_base *base,evutil_socket_t fd, short what,void (*callback)(evutil_socket_t, short, void *), void *arg);  
       
// 類似上面的函數,此函數被信號 event 使用  event_assign(event, base, signum, EV_SIGNAL|EV_PERSIST, callback, arg)

5. 信號事件

信號事件也可以對信號進行事件的處理。用法和event_new類似。只不過處理的是信號而已

// base --- event_base  // signum --- 信號,例如 SIGHUP  // callback --- 信號出現時調用的回調函數  // arg --- 用戶自定義數據  evsignal_new(base, signum, cb, arg)  
       
//將信號 event 注冊到 event_base  evsignal_add(ev, tv)   
       
// 清理信號 event  evsignal_del(ev)

6. event細節

1. 每一個事件event都需要通過event_new初始化生成。event_new生成的事件是在堆上分配的內存。

2. 當一個事件通過event_add被注冊到event_base上的時候,這個事件處於pending(等待狀態),當只有有事件進來的時候,event才會被激活active狀態,相關的回調函數就會被調用。

3. persistent 如果event_new中的what參數選擇了EV_PERSIST,則是持久的類型。持久的類型調用玩回調函數后,會繼續轉為pending狀態,就會繼續等待事件進來。大部分情況下會選擇持久類型的事件。

4. 而非持久的類型的事件,調用玩一次之后,就會變成初始化的狀態。這個時候需要調用event_add 繼續將事件注冊到event_base上之后才能使用。

Socket實例

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   
  //讀取客戶端  void do_read(evutil_socket_t fd, short event, void *arg) {  
    //繼續等待接收數據    
    char buf[1024];  //數據傳送的緩沖區      
    int len;    
    if ((len = recv(fd, buf, 1024, 0)) > 0)  {    
        buf[len] = '\0';      
        printf("%s\n", buf);      
        if (send(fd, buf, len, 0) < 0) {    //將接受到的數據寫回客戶端  
            perror("write");      
        }  
    }   
}  
  
  
//回調函數,用於監聽連接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //創建一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,並且將base_ev傳遞到do_read回調函數中去  
    struct event *ev;  
    ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    event_add(ev, NULL);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置為IP通信      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--允許連接到所有本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //創建服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //創建event_base 事件的集合,多線程的話 每個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路復用的模型,linux一般為epoll  
    printf("METHOD:%s\n", x);  
  
    //創建一個事件,類型為持久性EV_PERSIST,回調函數為do_accept(主要用於監聽連接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //注冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷毀event_base  
    event_base_free(base_ev);    
    return 1;  
}

說明:

1. 必須設置socket為非阻塞模式,否則就會阻塞在那邊,影響整個程序運行

evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  vutil_make_socket_nonblocking(server_socketfd); //設置無阻塞

2. 我們首選建立的事件主要用於監聽客戶端的連入。當客戶端有socket連接到服務器端的時候,回調函數do_accept就會去執行;當空閑的時候,這個事件就會是一個pending等待狀態,等待有新的連接進來,新的連接進來了之后又會繼續執行。

struct event *ev;  
ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);

3. 在do_accept事件中我們創建了一個新的事件,這個事件的回調函數是do_read。主要用來循環監聽客戶端上傳的數據。do_read這個方法會一直循環執行,接收到客戶端數據就會進行處理。

//創建一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  //持久類型,並且將base_ev傳遞到do_read回調函數中去  struct event *ev;  
ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
event_add(ev, NULL);

Bufferevent

上面的socket例子估計經過測試估計大家就會有很多疑問:

1. do_read方法作為一個事件會一直被循環

2. 當客戶端連接斷開的時候,do_read方法還是在循環,根本不知道客戶端已經斷開socket的連接。

3. 需要解決各種粘包和拆包(相關粘包拆包文章)問題

如果要解決這個問題,我們可能要做大量的工作來維護這些socket的連接狀態,讀取狀態。而Libevent的Bufferevent幫我們解決了這些問題。

Bufferevent主要是用來管理和調度IO事件;而Evbuffer(下面一節會講到)主要用來緩沖網絡IO數據。

Bufferevent目前支持TCP協議,而不知道UDP協議。我們這邊也只講TCP協議下的Bufferevent的使用。

我們先看下下面的接口(然后結合下面改進socket的例子,自己動手去實驗一下):

1. 創建Bufferevent API

//創建一個Bufferevent  struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);

參數:

base:即event_base

fd:文件描述符。如果是socket的方法,則socket需要設置為非阻塞的模式。

options:行為選項,下面是行為選項內容

1. BEV_OPT_CLOSE_ON_FREE :當 bufferevent 被釋放同時關閉底層(socket 被關閉等) 一般用這個選項

2. BEV_OPT_THREADSAFE :為 bufferevent 自動分配鎖,這樣能夠在多線程環境中安全使用

3. BEV_OPT_DEFER_CALLBACKS : 當設置了此標志,bufferevent 會延遲它的所有回調(參考前面說的延時回調)

4. BEV_OPT_UNLOCK_CALLBACKS : 如果 bufferevent 被設置為線程安全的,用戶提供的回調被調用時 bufferevent 的鎖會被持有。如果設置了此選項,Libevent 將在調用你的回調時釋放 bufferevent 的鎖

2. 釋放Bufferevent

void bufferevent_free(struct bufferevent *bev);

如果設置了延時回調BEV_OPT_DEFER_CALLBACKS,則釋放會在延時回調調用了回調函數之后,才會真正釋放。

3. 設置Bufferevent的回調函數和相關設置

前面我們說過了,使用了Bufferevent之后,Libevent會幫我們托管三種事件:1. 讀取事件  2. 寫入事件  3. 處理事件

我們先看一下回調函數結構:

1. 讀取和寫入的回調函數結構,其中 ctx為通用傳遞的參數

typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);

2. 事件回調,即連接斷開、錯誤處理等回調。其中ctx為通用傳遞的參數。

events參數為事件,用戶可以在回調函數中拿到這個事件來進行事務處理的判斷:

1. BEV_EVENT_READING   在 bufferevent 上進行讀取操作時出現了一個事件

2. BEV_EVENT_WRITING  在 bufferevent 上進行寫入操作時出現了一個事件

3. BEV_EVENT_ERROR  進行 bufferevent 操作時出錯

4. BEV_EVENT_TIMEOUT  在 bufferevent 上出現了超時

5. BEV_EVENT_EOF  在 bufferevent 上遇到了文件結束符,連接斷開

6. BEV_EVENT_CONNECTED 在 bufferevent 上請求連接完成了

typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);

3. 在bufferevent上設置回調函數。

bufev:bufferevent_socket_new創建的bufferevent

readcb:讀取事件的回調函數,沒有則可以為NULL

writecb:寫入事件的回調函數,沒有則可以為NULL

eventcb:事件函數的回調函數,沒有則可以為NULL,一般我們可以在這里面判斷連接斷開等。

cbarg:公用傳輸的傳遞

通過這個函數,我們就可以設置我們需要的一些回調函數信息。

void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb,  
bufferevent_data_cb writecb,bufferevent_event_cb eventcb,void *cbarg);

取回回調函數:

void bufferevent_getcb(struct bufferevent *bufev,bufferevent_data_cb *readcb_ptr,bufferevent_data_cb *writecb_ptr,  
bufferevent_event_cb *eventcb_ptr,void **cbarg_ptr)

4. 設置Bufferevent事件的類型

bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);

5. 水位設置。

水位設置可以這么理解,bufferevent相當於一個水位容器,其中參數:

events:EV_READ 則為設置讀取事件;EV_WRITE 則為寫入事件。EV_READ |  EV_WRITE 為設置兩者的水位。

lowmark:最低水位,默認為0。這個參數非常重要,例如lowmark設置為10,則當bufferevent容器中有10個字符的時候才會去調用readcb這個回調函數。

void bufferevent_setwatermark(struct bufferevent *bufev, short events,size_t lowmark, size_t highmark);

6. 下面可以看一個設置和回調函數例子:

//創建一個bufferevent  struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
//設置讀取方法和error時候的方法  bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev);     
//設置類型  bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
//設置水位  bufferevent_setwatermark(bev, EV_READ, 0, 0);  
//讀取事件回調函數  void read_cb(struct bufferevent *bev, void *arg) {  
#define MAX_LINE     256  char line[MAX_LINE+1];  
int n;  
evutil_socket_t fd = bufferevent_getfd(bev);  
while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
line[n] = '\0';  
printf("fd=%u, read line: %s\n", fd, line);  
bufferevent_write(bev, line, n);  
}  
puts("haha");  
}  
//寫入事件回調函數  void write_cb(struct bufferevent *bev, void *arg) {}  
//事件回調  void error_cb(struct bufferevent *bev, short event, void *arg) {  
evutil_socket_t fd = bufferevent_getfd(bev);  
printf("fd = %u, ", fd);  
if (event & BEV_EVENT_TIMEOUT) {  
printf("Timed out\n");  
} else if (event & BEV_EVENT_EOF) {  
printf("connection closed\n");  
} else if (event & BEV_EVENT_ERROR) {  
printf("some other error\n");  
}  
bufferevent_free(bev);  
}

4. 輸入輸出相關函數

1. 獲取buffer:

// 獲取到輸入 buffer  struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);  
// 獲取到輸出 buffer  struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);

2. 寫入和輸出函數,成功返回0,失敗返回-1:

bufev:bufferevent

data:寫入的字符串數據

size:字符長度

//寫入  int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);  
//輸出  size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);

3. 寫入輸出函數2:

bufev:bufferevent

buf:buffer塊  下面會講到evbuffer的使用

int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);  
int bufferevent_read_buffer(struct bufferevent *bufev,struct evbuffer *buf);

使用Bufferevent后的Socket例子:

上面我們已經介紹完了Bufferevent的相關API,可以看下具體例子。

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   
  void read_cb(struct bufferevent *bev, void *arg) {  
    #define MAX_LINE    256  
    char line[MAX_LINE+1];  
    int n;  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
        line[n] = '\0';  
        printf("fd=%u, read line: %s\n", fd, line);  
        bufferevent_write(bev, line, n);  
    }  
    puts("haha");  
}  
void write_cb(struct bufferevent *bev, void *arg) {}  
void error_cb(struct bufferevent *bev, short event, void *arg) {  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    printf("fd = %u, ", fd);  
    if (event & BEV_EVENT_TIMEOUT) {  
        printf("Timed out\n");  
    } else if (event & BEV_EVENT_EOF) {  
        printf("connection closed\n");  
    } else if (event & BEV_EVENT_ERROR) {  
        printf("some other error\n");  
    }  
    bufferevent_free(bev);  
}  
  
//回調函數,用於監聽連接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //創建一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,並且將base_ev傳遞到do_read回調函數中去  
    //struct event *ev;  
    //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    //event_add(ev, NULL);  
  
    //創建一個bufferevent  
    struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
    //設置讀取方法和error時候的方法  
    bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev);    
    //設置類型  
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
    //設置水位,每次接受10個字符  
    bufferevent_setwatermark(bev, EV_READ, 0, 10);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置為IP通信      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--允許連接到所有本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //創建服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //創建event_base 事件的集合,多線程的話 每個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路復用的模型,linux一般為epoll  
    printf("METHOD:%s\n", x);  
  
    //創建一個事件,類型為持久性EV_PERSIST,回調函數為do_accept(主要用於監聽連接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //注冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷毀event_base  
    event_base_free(base_ev);    
    return 1;  
}

Evbuffer IO緩沖

上面講了Bufferevent主要用於事件的管理和調度IO。而Evbuffer給我們提供了非常實用的IO緩存工具。

上一個例子中,雖然解決了斷開連接、讀取事件等IO管理的工作,但是也是存在缺陷的。

1. 因為TCP粘包拆包的原因,我們不知道一次接收到的數據是否是完整的。

2. 我們無法根據客戶端傳遞過來的數據來分析客戶端的請求信息。根據上面的問題,我們可能會考慮設計一個緩沖容器,這個容器主要用來不停得接收客戶端傳遞過來的數據信息,並且要等到信息量接收到一定的程度的時候,我們對客戶端的信息進行分析處理,最后才能知道客戶端的請求內容。如果自己做這個緩沖容器,恐怕是需要花費很多的時間,而Libevent已經給我們設計了Evbuffer,我們可以直接使用Evbuffer緩沖容器來滿足我們的業務需求。

evbuffer結構:

struct evbuffer{  
  // 當前有效緩沖區的內存起始地址  
 u_char *buffer;   
  // 整個分配(realloc)用來緩沖的內存起始地址  
  u_char *orig_buffer;   
  // origin_buffer和buffer之間的字節數  
 size_t misalign;   
  // 整個分配用來緩沖的內存字節數  
 size_t totallen;   
  // 當前有效緩沖區的長度(字節數)  
 size_t off;   
  //回到函數,當緩沖區有變化的時候會被調用  
 void (*cb)(struct evbuffer *, size_t, size_t, void *);  
  //回調函數的參數  
 void *cbarg;   
};

libevent的緩沖是一個連續的內存區域,其處理數據的方式(寫數據和讀數據)更像一個隊列操作方式:從后寫入,從前

讀出。evbuffer分別設置相關指針(一個指標)用於指示讀出位置和寫入位置。其大致結構如圖:

orig_buffer指向由realloc分配的連續內存區域,buffer指向有效數據的內存區域,totallen表示orig_buffer指向的內存

區域的大小,misalign表示buffer相對於orig_buffer的偏移,off表示有效數據的長度。

下面是一些基礎的和最常用的API,詳細的API設計,還是請翻看官方網站:

1.  創建和銷毀Evbuffer

struct evbuffer *evbuffer_new(void);  
void evbuffer_free(struct evbuffer *buf);

2. 線程鎖

int evbuffer_enable_locking(struct evbuffer *buf, void *lock);  
void evbuffer_lock(struct evbuffer *buf);  
void evbuffer_unlock(struct evbuffer *buf);

3. 檢查buffer長度,比較常用

size_t evbuffer_get_length(const struct evbuffer *buf);

返回的是buffer中的字節數。

4. 向buffer中添加數據,常用

int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);

這個函數添加data處的datalen字節到buf的末尾,成功時返回0,失敗時返回-1。

int evbuffer_expand(struct evbuffer *buf, size_t datlen);

這個函數修改緩沖區的最后一塊,或者添加一個新的塊,使得緩沖區足以容納datlen字節,而不需要更多的內存分配。

int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t size);  
int evbuffer_prepend_buffer(struct evbuffer *dst, struct evbuffer* src);

除了將數據移動到目標緩沖區前面之外,這兩個函數的行為分別與evbuffer_add()和evbuffer_add_buffer()相同。

使用這些函數時要當心,永遠不要對與bufferevent共享的evbuffer使用。這些函數是2.0.1-alpha版本新添加的。

5. 刪除和移動buffer中的內容

int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);

evbuffer_remove()函數從buf前面復制和移除datlen字節到data處的內存中。如果可用字節少於datlen,函數復制所有字節。失敗時返回-1,否則返回復制了的字節數。

int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);  
int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst,  
    size_t datlen);

evbuffer_add_buffer()將src中的所有數據移動到dst末尾,成功時返回0,失敗時返回-1。

evbuffer_remove_buffer()函數從src中移動datlen字節到dst末尾,盡量少進行復制。如果字節數小於datlen,所有字節被移動。函數返回移動的字節數。

evbuffer_add_buffer()在0.8版本引入;evbuffer_remove_buffer()是2.0.1-alpha版本新增加的。

6. 搜索buffer中的內容,常用

struct evbuffer_ptr {  
ev_ssize_t pos;  
struct {  
  } _internal;  
};  
struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer,  
const char *what, size_t len, const struct evbuffer_ptr *start);  
struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer,  
const char *what, size_t len, const struct evbuffer_ptr *start,  
const struct evbuffer_ptr *end);  
struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer,  
struct evbuffer_ptr *start, size_t *eol_len_out,  
enum evbuffer_eol_style eol_style);

結構evbuffer_ptr中的pos為偏移量,如果為-1則沒查詢到,大於-1,則搜索到了匹配的位置。

1. evbuffer_search()函數在緩沖區中查找含有len個字符的字符串what。函數返回包含字符串位置,或者在沒有找到字符串時包含-1的evbuffer_ptr結構體。如果提供了start參數,則從指定的位置開始搜索;否則,從開始處進行搜索。

2. evbuffer_search_range()函數和evbuffer_search行為相同,只是它只考慮在end之前出現的what。

3. evbuffer_search_eol()函數像evbuffer_readln()一樣檢測行結束,但是不復制行,而是返回指向行結束符的evbuffer_ptr。如果eol_len_out非空,則它被設置為EOL字符串長度。

7. 面向行的讀取

很多互聯網協議都是基於行的。evbuffer_readln()函數從evbuffer前面取出一行,用一個新分配的空字符結束的字符串返回這一行。如果n_read_out不是NULL,則它被設置為返回的字符串的字節數。如果沒有整行供讀取,函數返回空。返回的字符串不包括行結束符。

enum evbuffer_eol_style {  
EVBUFFER_EOL_ANY,  
EVBUFFER_EOL_CRLF,  
EVBUFFER_EOL_CRLF_STRICT,  
EVBUFFER_EOL_LF,  
EVBUFFER_EOL_NUL  
};  
char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,  
    enum evbuffer_eol_style eol_style);

1. EVBUFFER_EOL_LF:行尾是單個換行符(也就是\n,ASCII值是0x0A)

2. EVBUFFER_EOL_CRLF_STRICT:行尾是一個回車符,后隨一個換行符(也就是\r\n,ASCII值是0x0D 0x0A)

3. EVBUFFER_EOL_CRLF:行尾是一個可選的回車,后隨一個換行符(也就是說,可以是\r\n或者\n)。這種格式對於解析基於文本的互聯網協議很有用,因為標准通常要求\r\n的行結束符,而不遵循標准的客戶端有時候只使用\n。

4. EVBUFFER_EOL_ANY:行尾是任意數量、任意次序的回車和換行符。這種格式不是特別有用。它的存在主要是為了向后兼容。

例子:

//readline  char * rline;  
size_t len;  
rline = evbuffer_readln(buf, &len, EVBUFFER_EOL_CRLF);  
puts("Hello");  
if (rline != NULL) {  
    bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果  }

8. 復制數據

ev_ssize_t evbuffer_copyout(struct evbuffer *buf, void *data, size_t datlen);  
ev_ssize_t evbuffer_copyout_from(struct evbuffer *buf,  
     const struct evbuffer_ptr *pos,  
     void *data_out, size_t datlen);

evbuffer_copyout()的行為與evbuffer_remove()相同,但是它不從緩沖區移除任何數據。也就是說,它從buf前面復制datlen字節到data處的內存中。如果可用字節少於datlen,函數會復制所有字節。失敗時返回-1,否則返回復制的字節數。

如果從緩沖區復制數據太慢,可以使用evbuffer_peek()。

使用Evbuffer優化后的例子

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   #include   #define MAX_LINE    256  
  void read_cb(struct bufferevent *bev, void *arg) {  
    struct evbuffer *buf = (struct evbuffer *)arg;  
    char line[MAX_LINE+1];  
    int n;  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
        line[n] = '\0';  
          
        //將讀取到的內容放進緩沖區  
        evbuffer_add(buf, line, n);  
  
        //搜索匹配緩沖區中是否有==,==號來分隔每次客戶端的請求  
        const char *x = "==";  
        struct evbuffer_ptr ptr = evbuffer_search(buf, x, strlen(x), 0);     
        if (ptr.pos != -1) {  
            bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果  
        }  
    }  
}  
void write_cb(struct bufferevent *bev, void *arg) {}  
void error_cb(struct bufferevent *bev, short event, void *arg) {  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    printf("fd = %u, ", fd);  
    if (event & BEV_EVENT_TIMEOUT) {  
        printf("Timed out\n");  
    } else if (event & BEV_EVENT_EOF) {  
        printf("connection closed\n");  
    } else if (event & BEV_EVENT_ERROR) {  
        printf("some other error\n");  
    }  
    //清空緩沖區  
    struct evbuffer *buf = (struct evbuffer *)arg;  
    evbuffer_free(buf);  
    bufferevent_free(bev);  
}  
  
//回調函數,用於監聽連接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //創建一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,並且將base_ev傳遞到do_read回調函數中去  
    //struct event *ev;  
    //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    //event_add(ev, NULL);  
  
    //創建一個evbuffer,用來緩沖客戶端傳遞過來的數據  
    struct evbuffer *buf = evbuffer_new();  
    //創建一個bufferevent  
    struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
    //設置讀取方法和error時候的方法,將buf緩沖區當參數傳遞  
    bufferevent_setcb(bev, read_cb, NULL, error_cb, buf);    
    //設置類型  
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
    //設置水位  
    bufferevent_setwatermark(bev, EV_READ, 0, 0);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置為IP通信      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--允許連接到所有本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //創建服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //創建event_base 事件的集合,多線程的話 每個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路復用的模型,linux一般為epoll  
    printf("METHOD:%s\n", x);  
  
    //創建一個事件,類型為持久性EV_PERSIST,回調函數為do_accept(主要用於監聽連接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //注冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷毀event_base  
    event_base_free(base_ev);    
    return 1;  
}

Util工具

Libevent還提供一些工具方法。這些方法可以簡化我們的開發。

1. 時間處理函數

//創建一個事件,類型為持久性EV_PERSIST,回調函數為do_accept(主要用於監聽連接進來的客戶端)  //將base_ev傳遞到do_accept中的arg參數  // 用於加或者減前兩個參數,結果被保存在第三個參數中  #define evutil_timeradd(tvp, uvp, vvp)   #define evutil_timersub(tvp, uvp, vvp)   // 清除 timeval 將其值設置為 0  #define evutil_timerclear(tvp)   // 判斷 timeval 是否為 0,如果是 0 返回 false,否則返回 true  #define evutil_timerisset(tvp)   // 比較兩個 timeval  // 使用的時候這樣用:  // evutil_timercmp(t1, t2, <=) 含義為判斷 t1 <= t2 是否成立  // cmp 為所有的 C 關系操作符  #define evutil_timercmp(tvp, uvp, cmp)  // 獲取當前時間並保存到 tv  // tz 目前無用  int evutil_gettimeofday(struct timeval *tv, struct timezone *tz);

2. Socket API

// 用於關閉一個 socket  int evutil_closesocket(evutil_socket_t s);  
#define EVUTIL_CLOSESOCKET(s) evutil_closesocket(s)  // 返回當前線程的最后一次 socket 操作的錯誤碼  #define EVUTIL_SOCKET_ERROR()  // 改變當前 socket 的錯誤碼  #define EVUTIL_SET_SOCKET_ERROR(errcode)  // 返回特定的 sock 的錯誤碼  #define evutil_socket_geterror(sock)  // 通過 socket 錯誤碼獲取到一個字符串描述  #define evutil_socket_error_to_string(errcode)  // 設置 sock 為非阻塞的 socket  int evutil_make_socket_nonblocking(evutil_socket_t sock);  
// 設置 sock 的地址可重用  int evutil_make_listen_socket_reuseable(evutil_socket_t sock);

3. 字符串

// 它們對應於標准的 snprintf 和 vsnprintf  int evutil_snprintf(char *buf, size_t buflen, const char *format, ...);  
int evutil_vsnprintf(char *buf, size_t buflen, const char *format, va_list ap);

4. 安全的隨機函數

// 此函數將使用隨機的數據填充 n 個字節的 buf  void evutil_secure_rng_get_bytes(void *buf, size_t n);

附:一個客戶端例子

#include     #include         #include         #include        #include     #include     #include     #include         int main() {      
int client_fd; //定義一個客戶端的SOCKET      struct sockaddr_in server_addr; //服務器端        memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零    server_addr.sin_family=AF_INET; //設置為IP通信          server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");//服務器IP地址        server_addr.sin_port = htons(8001); //服務器端口號          client_fd = socket(PF_INET, SOCK_STREAM, 0);      
if (client_fd < 1) {  
puts("client socket error");       
return 0;     
}       
     int ret = connect(client_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));   
if (ret < 0) {        
puts("client connect error!");     
return 0;     
}       
char buf[1024];       
int len = recv(client_fd, buf, 1024, 0); //等待接收服務器端的數據     buf[len] = '\0';    
puts(buf);      
char *x = "Hello World,saodsadoosadosa==sadsad==";    
send(client_fd, x, strlen(x), 0); //發送數據     memset(buf, 0, 1024);  
int len2 = recv(client_fd, buf, 1024, 0); //繼續接收服務端返回的數據   buf[len2] = '\0';   
puts(buf);      
shutdown(client_fd,2); //關閉socket    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM