bufferevent簡單介紹
一般通過libevent進行網絡編程,都是將一個socket的fd與一個event進行綁定,並自行維護一個buffer用於存儲從socket上接收的數據,同時可能也用於待發送數據的緩存。然后通過可讀可寫事件從socket上收取數據寫入緩存並進行相應處理,或者將緩存中的數據通過socket發送。
libevent為這種帶緩存的IO模式提供了一種通用的機制,那就是bufferevent。一個bufferevent包含了一個底層傳輸的fd(通常為socket),一個輸入buffer和一個輸出buffer,並且bufferevent已經幫我們完成了從socket上接收數據寫入輸入buffer,同時從輸出buffer中取出數據通過socket發送,當輸入輸出緩存中的數據達到一定量時調用我們設置的回調函數。這樣使得我們可以更加關注數據的處理。
bufferevent的簡單使用
#include "stdio.h" #include "stdlib.h" #include "string.h" #include "arpa/inet.h" #include "event.h" //讀回調處理 void read_callback(struct bufferevent * pBufEv, void * pArg) { //獲取輸入緩存 struct evbuffer * pInput = bufferevent_get_input(pBufEv); //獲取輸入緩存數據的長度 int nLen = evbuffer_get_length(pInput); //獲取數據的地址 const char * pBody = (const char *)evbuffer_pullup(pInput, nLen); //進行數據處理 //寫到輸出緩存,由bufferevent的可寫事件讀取並通過fd發送 //bufferevent_write(pBufEv, pResponse, nResLen); return ; } //寫回調處理 void write_callback( struct bufferevent * pBufEv, void * pArg ) { return ; } //事件回調處理 void event_callback(struct bufferevent * pBufEv, short sEvent, void * pArg) { //成功連接通知事件 if(BEV_EVENT_CONNECTED == sEvent) { bufferevent_enable(pBufEv, EV_READ); } return ; } int main( void ) { struct event_base * pEventBase = NULL; struct bufferevent * pBufEv = NULL; //創建事件驅動句柄 pEventBase = event_base_new(); //創建socket類型的bufferevent pBufEv = bufferevent_socket_new(pEventBase, -1, 0); //設置回調函數, 及回調函數的參數 bufferevent_setcb(pBufEv, read_callback, write_callback, event_callback, NULL); struct sockaddr_in tSockAddr; memset(&tSockAddr, 0, sizeof(tSockAddr)); tSockAddr.sin_family = AF_INET; tSockAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); tSockAddr.sin_port = htons(50000); //連接服務器 if( bufferevent_socket_connect(pBufEv, (struct sockaddr*)&tSockAddr, sizeof(tSockAddr)) < 0) { return 0; } //開始事件循環 event_base_dispatch(pEventBase); //事件循環結束 資源清理 bufferevent_free(pBufEv); event_base_free(pEventBase); return 0; }
使用細節
tcp連接斷開處理
對於客戶端來說,如果僅有bufferevent這么一個事件,那么當tcp連接斷開時,調用回調函數后會退出事件循環(event_base_loop)。因為bufferevent感知tcp連接斷開后會刪除相關的事件,這個時候事件循環中沒有任何事件,於是退出循環。
在官網的教程中看到可以對event_base設置選項EVLOOP_NO_EXIT_ON_EMPTY保證沒有等待事件時也不會退出事件循環,但是在最新穩定版本中(libevent-2.0.21-stable)沒有該選項設置,在2.1.x-alph中才有該選項。當然我們可以采用增加定時器事件的方式來處理斷鏈后不退出事件循環,甚至進一步實現斷鏈重連的功能。這個定時器事件可以是斷鏈后在回調函數中動態增加,也可以是一開始就增加一個持久的定時器事件,檢測連接狀態並觸發向服務器重連。例如:
int g_nState; //定時器事件回調函數 void handle_timeout(int nSock, short sWhat, void * pArg) { if( 0 == g_nState ) { struct bufferevent * pBufferEvent = (struct bufferevent *)pArg; struct sockaddr_in tSockAddr; memset(&tSockAddr, 0, sizeof(tSockAddr)); tSockAddr.sin_family = AF_INET; tSockAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); tSockAddr.sin_port = htons(50000); bufferevent_socket_connect(pBufferEvent, (struct sockaddr*)&tSockAddr, sizeof(tSockAddr)); } } void event_callback(struct bufferevent * pBufEv, short sEvent, void * pArg) { //成功連接 狀態變更 if( BEV_EVENT_CONNECTED == sEvent ) { bufferevent_enable( pBufEv, EV_READ ); g_nState = 1; } //出現錯誤 if( 0 != (sEvent & (BEV_EVENT_ERROR)) ) { //關閉fd 並更改狀態 int fd = bufferevent_getfd(pBufEv); if( fd > 0 ) { evutil_closesocket(fd); } bufferevent_setfd(pBufEv, -1); g_nState = 0; } } int main( void ) { ... //增加PERSIST的定時器事件 struct event eTimeout; struct timeval tTimeout = {10, 0}; //回調函數的參數為bufferevent event_assign(&eTimeout, pEventBase, -1, EV_PERSIST, handle_timeout, pBufferEvent); evtimer_add(&eTimeout, &tTimeout); ... }
這里需要注意的是:重連之前最好先關閉bufferevent中的fd,或者直接對bufferevent進行釋放並重新創建一個新的bufferevent。如果是直接釋放bufferevent再次新建,那么在創建bufferevent時記得設置BEV_OPT_CLOSE_ON_FREE參數,這樣在釋放bufferevent時會對fd進行關閉,從而不會出現fd泄漏。(不設置該參數,通過bufferevent_setfd傳入fd,釋放bufferevent后自行關閉fd也是一種處理方式)
心跳處理
通常,客戶端與服務端之間都有心跳檢測,以檢測tcp鏈路是否正常。那么通過bufferevent開發的客戶端或者服務端完成心跳檢測功能可以有這么幾種實現方式:
(1)增加定時器事件:前面提到了可以增加持久的定時器事件來檢測狀態並觸發斷鏈重連,當然我們也可以利用這個定時器事件來完成定時發送心跳包的功能。個人覺得這種方式不太好的一點是:需要有一種機制讓定時器事件的回調處理函數獲取bufferevent的句柄,例如作為定時器事件回調函數的參數,這樣才能將心跳包的數據寫入該bufferevent並通過fd發送,這兩種事件攪合在一起會有些混亂。
(2)利用bufferevent的超時機制:bufferevent可以為讀寫設置超時時間,我們可以利用讀超時來完成定時發送心跳包的功能。在事件的回調處理函數中處理BEV_EVENT_TIMEOUT|BEV_EVENT_READING事件,然后將心跳包寫入輸出緩存。這種方式有一點需要注意:bufferevent觸發超時事件后會將對應的可讀/可寫事件刪除,我們在處理完超時事件后需要重新注冊一下對應的事件(bufferevent_enable)。
void event_callback(struct bufferevent * pBufEv, short sEvent, void * pArg) { if( BEV_EVENT_CONNECTED == sEvent ) { bufferevent_enable( pBufEv, EV_READ ); //設置讀超時時間 10s struct timeval tTimeout = {10, 0}; bufferevent_set_timeouts( pBufEv, &tTimeout, NULL); } if(0 != (sEvent & (BEV_EVENT_TIMEOUT|BEV_EVENT_READING)) ) { //發送心跳包 ... //重新注冊可讀事件 bufferevent_enable(pBufEv, EV_READ); } ... return; }
高低水位的使用
默認情況下,bufferevent從fd上接收到任何數據並寫入輸入緩存區時,就會回調交給我們進行處理。而我們的客戶端和服務端通信時都會遵循一定的協議(數據包格式),比如固定長度的包頭,然后從包頭中獲取包體的數據長度,等包體的數據都接收完成后再進行實際處理。在這種情況下,我們可以設置讀的低水位減少回調的次數。bufferevent會等輸入緩存區中的數據長度超過最低水位時,才回調我們的函數進行業務處理。
實現細節
整體概況
從bufferevent的結構體中我們可以看到,bufferevent中包含了讀,寫兩個事件,這兩個事件的回調函數分別為bufferevent_readcb和bufferevent_writecb。bufferevent同時還包含了輸入輸出兩個緩存區,以及讀、寫、事件回調函數的指針,高低水位的設置,事件驅動的句柄等。當觸發可讀可寫事件后,回調bufferevent_readcb或bufferevent_writecb,在這里完成從fd上的數據收發,然后根據收發結果及高低水位的設置等來進行不同的回調處理。
evbuffer與bufferevent
bufferevent采用evbuffer作為輸入輸出緩存。evbuffer像是一個字節隊列,在隊列的末尾寫入數據,在隊列的頭部讀取數據。evbuffer具體實現則是一個鏈表,鏈表中的每個節點都是一塊連續的內存塊,往evbuffer寫數據時(調用evbuffer_add/evbuffer_add_printf等函數),evbuffer內部動態創建鏈表節點,並緊湊的寫入數據(一個節點寫滿后,再寫另外一個節點);從evbuffer中刪除數據時(調用evbuffer_remove/evbuffer_drain),從鏈表頭部節點開始讀取,當一個節點的數據被全部讀取后刪除該節點,如果未讀取完,則用標示記錄數據已讀取(刪除)的位置。對於這種頭部有數據被標示為讀取(刪除)的節點,再次寫入數據時,可能會進行調整,即將數據部分整體往前拷貝移動,然后再繼續寫入數據。
defer callback
在創建bufferevent時,可以設置不同的選項,其中一個是BEV_OPT_DEFER_CALLBACKS,這意味着延遲進行回調。所謂延遲回調,是將該事件延遲等到本次事件循環中所有active事件都處理完成后再進行該事件的處理。在event_base中,有一個active事件隊列,一個defer事件隊列,事件循環時,遍歷active事件隊列並進行相應的處理,當發現某個事件是需要延遲處理時,將該事件放到defer事件隊列中,繼續后續active事件的處理,等active事件隊列中的事件都處理完成后,再處理defer隊列中的事件。
對於bufferevent來說,當fd上有數據可讀時,其實是先進行了一次回調(bufferevent_readcb),這個回調函數中判斷是否需要延遲處理,如果不需要延遲處理則直接回調我們設置的回調函數,如果需要延遲處理,則等libevent處理完其他的active事件后再次調用bufferevent的回調函數,然后在這個回調函數中再調用我們設置的回調函數。
==============================
bufferevent總結到此,如有不正確之處,歡迎吐槽交流。
最后,學習bufferevent時在mailing list中看到這么一句話,個人非常喜歡,與大家分享。
I just don't want to do mistakes/bad design decisions without fully understanding how it works。