處理大並發之五 使用libevent利器bufferevent


轉自:http://blog.csdn.net/feitianxuxue/article/details/9386843

 

處理大並發之五 使用libevent利器bufferevent

         首先來翻譯一段文章

         你可能注意到隨着我們代碼變得越來越高效,程序也變得更加復雜。當我們產生一個進程的時候,我們沒有必要為每一個鏈接管理一個buffer,我們只需要每個處理獨立棧分配緩沖區就可以了。在讀和寫的時候,我們不必明確的跟蹤每一個socket,這在我們的代碼里是一個暗示,我們沒有必要定義一個結構體去跟蹤每一個操作什么時候完成,我們只需要使用循環棧變量就可以了。

         此外,如果你在windows網絡編程方面有着豐富的經驗,當你在使用上一篇博客中的例子時,你可能認識到libevent可能達不到最理想的性能。在windows上,你做的快速異步IO不是用的select,它使用的IOCP API。和其他的快速網絡API不同,當你的程序執行完成,sock准備完成,IOCP不會通知你的程序,取而代之的是,程序告訴windows網絡棧開啟一個網絡操作,並且當操作執行完成時,IOCP會告訴程序。

         幸運的是,libevent2 的bufferevents接口解決了上面的這些沖突,它使得程序更加容易寫,並且為windows和unix提供了有效的接口。

分析:

libevent的bufferevent在event的基礎上自己維護了一個buffer,這樣的話,就不需要再自己管理一個buffer了,上一篇博客是自己維護一個buffer,維護過程復雜,且過程難以理解,既然libevent自己提供了bufferevent這個神器,且有API,何必自己維護呢?

先看看struct bufferevent這個結構體

 

[cpp]  view plain  copy
 
  1. struct bufferevent {  
  2.                struct event_base *ev_base;  
  3.         const struct bufferevent_ops *be_ops;  
  4.         struct event ev_read;  
  5.         struct event ev_write;  
  6.         struct evbuffer *input;  
  7.         struct evbuffer *output;  
  8.         ……  
  9.         bufferevent_data_cb readcb;  
  10.         bufferevent_data_cb writecb;  
  11.         bufferevent_event_cb errorcb;  
  12.         ……  
  13. }  

 

可以看出struct bufferevent內置了兩個event(讀/寫)和對應的緩沖區。當有數據被讀入(input)的時候,readcb被調用,當output被輸出完成的時候,writecb被調用,當網絡I/O出現錯誤,如鏈接中斷,超時或其他錯誤時,errorcb被調用。

使用bufferevent的過程:

1. 設置sock為非阻塞的

[cpp]  view plain  copy
 
  1. eg:  evutil_make_socket_nonblocking(fd);  

 

2. 使用bufferevent_socket_new創建一個structbufferevent *bev,關聯該sockfd,托管給event_base

函數原型為:

 

[cpp]  view plain  copy
 
  1. struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,  int options)  
  2. eg:  struct bufferevent *bev;  
  3. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  

 

3. 設置讀寫對應的回調函數

函數原型為:

 

[cpp]  view plain  copy
 
  1. void bufferevent_setcb(struct bufferevent *bufev,  
  2.     bufferevent_data_cb readcb, bufferevent_data_cb writecb,  
  3.     bufferevent_event_cb eventcb, void *cbarg)  
  4. eg.  bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);  

 

4. 啟用讀寫事件,其實是調用了event_add將相應讀寫事件加入事件監聽隊列poll。正如文檔所說,如果相應事件不置為true,bufferevent是不會讀寫數據的

函數原型:

 

[cpp]  view plain  copy
 
  1. int bufferevent_enable(struct bufferevent *bufev, short event)  
  2. eg.  bufferevent_enable(bev, EV_READ|EV_WRITE);  

 

5. 進入bufferevent_setcb回調函數:

在readcb里面從input中讀取數據,處理完畢后填充到output中;

writecb對於服務端程序,只需要readcb就可以了,可以置為NULL;

errorcb用於處理一些錯誤信息。

 

針對這些使用過程進入源碼進行分析:

1. bufferevent_socket_new

(1)在bufferevent_init_common中調用evbuffer_new()初始化input和output

(2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。

(3)在evbuffer_add_cb中給output添加了一個callback bufferevent_socket_outbuf_cb

2. bufferevent_setcb

該函數的作用主要是賦值,把該函數后面的參數,賦值給第一個參數struct bufferevent *bufev定義的變量

3. bufferevent_enable

調用event_add將讀寫事件加入到事件監聽隊列中。

 

對bufferevent常用的幾個函數進行分析:

 

[cpp]  view plain  copy
 
  1. char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);  

 

 

含義:Read a single line from an evbuffer.

返回值:讀到的一行內容

 

[cpp]  view plain  copy
 
  1. int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);  

 

 

含義:將數據添加到evbuffer的結尾

返回值:成功返回0,失敗返回-1

 

[cpp]  view plain  copy
 
  1. int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);  

 

 

含義:從evbuffer讀取數據到data

返回值:成功返回0,失敗返回-1

 

[cpp]  view plain  copy
 
  1. size_t evbuffer_get_length(const structevbuffer *buf);  

 

 

含義:返回evbuffer中存儲的字節長度

 

暫時先分析到這里,下面是代碼,客戶端發送消息:HTTP/1.0, Client 0 send Message:

Request: Hello Server! over,服務端一條消息收完成后,會回復:Response ok! Hello Client!

服務端從bufferevent中取出消息是按行取的。代碼可能有不完善的地方,由於才疏學淺,研究時間短(3天),希望高手提出寶貴意見。

libevent_eventbuffer_server.c

[cpp]  view plain  copy
 
  1. #include <netinet/in.h>  
  2. #include <sys/socket.h>  
  3. #include <fcntl.h>  
  4.   
  5. #include <event2/event.h>  
  6. #include <event2/buffer.h>  
  7. #include <event2/bufferevent.h>  
  8.   
  9. #include <assert.h>  
  10. #include <unistd.h>  
  11. #include <string.h>  
  12. #include <stdlib.h>  
  13. #include <stdio.h>  
  14. #include <errno.h>  
  15.   
  16. void do_read(evutil_socket_t fd, short events, void *arg);  
  17.   
  18. //struct bufferevent內建了兩個event(read/write)和對應的緩沖區(struct evbuffer *input, *output),並提供相應的函數用來操作>  
  19. 緩沖區(或者直接操作bufferevent)  
  20. //接收到數據后,判斷是不一樣一條消息的結束,結束標志為"over"字符串  
  21. void readcb(struct bufferevent *bev, void *ctx)  
  22. {  
  23.     printf("called readcb!\n");  
  24.     struct evbuffer *input, *output;  
  25.     char *request_line;  
  26.     size_t len;  
  27.     input = bufferevent_get_input(bev);//其實就是取出bufferevent中的input  
  28.     output = bufferevent_get_output(bev);//其實就是取出bufferevent中的output  
  29.   
  30.     size_t input_len = evbuffer_get_length(input);  
  31.     printf("input_len: %d\n", input_len);  
  32.     size_t output_len = evbuffer_get_length(output);  
  33.     printf("output_len: %d\n", output_len);  
  34.   
  35.     while(1)  
  36.     {  
  37.         request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//從evbuffer前面取出一行,用一個新分配的空字符結束  
  38. 的字符串返回這一行,EVBUFFER_EOL_CRLF表示行尾是一個可選的回車,后隨一個換行符  
  39.         if(NULL == request_line)  
  40.         {  
  41.             printf("The first line has not arrived yet.\n");  
  42.             free(request_line);//之所以要進行free是因為 line = mm_malloc(n_to_copy+1)),在這里進行了malloc  
  43.             break;  
  44.         }  
  45.         else  
  46. <span style="white-space: pre;">    </span>{  
  47.             printf("Get one line date: %s\n", request_line);  
  48.             if(strstr(request_line, "over") != NULL)//用於判斷是不是一條消息的結束  
  49.             {  
  50.                 char *response = "Response ok! Hello Client!\r\n";  
  51.                 evbuffer_add(output, response, strlen(response));//Adds data to an event buffer  
  52.                 printf("服務端接收一條數據完成,回復客戶端一條消息: %s\n", response);  
  53.                 free(request_line);  
  54.                 break;  
  55.             }  
  56.         }  
  57.         free(request_line);  
  58.     }  
  59.   
  60.     size_t input_len1 = evbuffer_get_length(input);  
  61.     printf("input_len1: %d\n", input_len1);  
  62.     size_t output_len1 = evbuffer_get_length(output);  
  63.     printf("output_len1: %d\n\n", output_len1);  
  64. }  
  65.   
  66. void errorcb(struct bufferevent *bev, short error, void *ctx)  
  67. {  
  68.     if (error & BEV_EVENT_EOF)  
  69.     {  
  70.         /* connection has been closed, do any clean up here */  
  71.         printf("connection closed\n");  
  72.     }  
  73.     else if (error & BEV_EVENT_ERROR)  
  74.     {  
  75.         /* check errno to see what error occurred */  
  76.         printf("some other error\n");  
  77.     }  
  78.     else if (error & BEV_EVENT_TIMEOUT)  
  79.     {  
  80.         /* must be a timeout event handle, handle it */  
  81.         printf("Timed out\n");  
  82.     }  
  83.     bufferevent_free(bev);  
  84. }  
  85.   
  86. void do_accept(evutil_socket_t listener, short event, void *arg)  
  87. {  
  88.     struct event_base *base = arg;  
  89.     struct sockaddr_storage ss;  
  90.     socklen_t slen = sizeof(ss);  
  91.     int fd = accept(listener, (struct sockaddr*)&ss, &slen);  
  92.     if (fd < 0)  
  93.     {  
  94.         perror("accept");  
  95.     }  
  96.     else if (fd > FD_SETSIZE)  
  97.     {  
  98.         close(fd);  
  99.     }  
  100.     else  
  101.     {  
  102.         struct bufferevent *bev;  
  103.         evutil_make_socket_nonblocking(fd);  
  104.   
  105.         //使用bufferevent_socket_new創建一個struct bufferevent *bev,關聯該sockfd,托管給event_base  
  106.         ////BEV_OPT_CLOSE_ON_FREE表示釋放bufferevent時關閉底層傳輸端口。這將關閉底層套接字,釋放底層bufferevent等。  
  107.         bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  
  108.   
  109.         //設置讀寫對應的回調函數  
  110.         bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);  
  111. //      bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);  
  112.   
  113.         //啟用讀寫事件,其實是調用了event_add將相應讀寫事件加入事件監聽隊列poll。正如文檔所說,如果相應事件不置為true,buf  
  114. ferevent是不會讀寫數據的  
  115.         bufferevent_enable(bev, EV_READ|EV_WRITE);  
  116.     }  
  117. }  
  118.   
  119. void run(void)  
  120. {  
  121.     evutil_socket_t listener;  
  122.     struct sockaddr_in sin;  
  123.     struct event_base *base;  
  124.     struct event *listener_event;  
  125.   
  126.     base = event_base_new();  
  127.     if (!base)  
  128.         return; /*XXXerr*/  
  129.   
  130.     sin.sin_family = AF_INET;  
  131.     sin.sin_addr.s_addr = 0;  
  132.     sin.sin_port = htons(8000);  
  133.   
  134.     listener = socket(AF_INET, SOCK_STREAM, 0);  
  135.     evutil_make_socket_nonblocking(listener);  
  136.   
  137. #ifndef WIN32  
  138.     {  
  139.         int one = 1;  
  140.         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));  
  141.     }  
  142. #endif  
  143.   
  144.     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)  
  145.     {  
  146.         perror("bind");  
  147.         return;  
  148.     }  
  149. if (listen(listener, 16)<0)  
  150.     {  
  151.         perror("listen");  
  152.         return;  
  153.     }  
  154.   
  155.     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);  
  156.     /*XXX check it */  
  157.     event_add(listener_event, NULL);  
  158.   
  159.     event_base_dispatch(base);  
  160. }  
  161.   
  162. int main(int argc, char **argv)  
  163. {  
  164.     setvbuf(stdout, NULL, _IONBF, 0);  
  165.   
  166.     run();  
  167.     return 0;  
  168. }  

編譯:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent

運行:

服務端:

 

客戶端:

 

今晚博客暫時寫完了,時間比較倉促,錯誤估計不會少,關於bufferevent很多API都還不是很熟悉,還有libevent 添加事件event_add是非線程安全的,如果使用多線程,需要保證event_add不能出現在多個線程中,以后有時間慢慢研究。

體會:關於源碼,還需要好好研究,其實今晚挺郁悶的,弄了半天,沒有什么進展,現在自己還有很多疑問,主要是自己太急了,不過3天時間做到基本了解,自己還算滿意,下一步有時間多研究下吧。晚安,北京

如是轉載,請指明原出處:http://blog.csdn.net/feitianxuxue,謝謝合作!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM