最近開始重構定制公司的網站后台服務器,開始關注libevent 以及livev 相關 ,也歡迎相關的同學一起討論。這兩者采用相同的架構和設計思想,很多原理和代碼都可以相互參考和對比理解。
libev 和libevent 比較適合分布式並發系統,關於其和傳統方式的比較,可以參看以下內容:
原文地址: http://www.ibm.com/developerworks/cn/aix/library/au-libev/index.html
許多服務器部署(尤其是 web 服務器部署)面對的最大問題之一是必須能夠處理大量連接。無論是通過構建基於雲的服務來處理網絡通信流,還是把應用程序分布在 IBM Amazon EC 實例上,還是為網站提供高性能組件,都需要能夠處理大量並發連接。
一個好例子是,web 應用程序最近越來越動態了,尤其是使用 AJAX 技術的應用程序。如果要部署的系統允許數千客戶端直接在網頁中更新信息,比如提供事件或問題實時監視的系統,那么提供信息的速度就非常重要了。在網格或雲環境中,可能有來自數千客戶端的持久連接同時打開着,必須能夠處理每個客戶端的請求並做出響應。
在討論 libevent 和 libev 如何處理多個網絡連接之前,我們先簡要回顧一下處理這類連接的傳統解決方案。
處理多個連接有許多不同的傳統方法,但是在處理大量連接時它們往往會產生問題,因為它們使用的內存或 CPU 太多,或者達到了某個操作系統限制。
使用的主要方法如下:
- 循環:早期系統使用簡單的循環選擇解決方案,即循環遍歷打開的網絡連接的列表,判斷是否有要讀取的數據。這種方法既緩慢(尤其是隨着連接數量增加越來越慢),又低效(因為在處理當前連接時其他連接可能正在發送請求並等待響應)。在系統循環遍歷每個連接時,其他連接不得不等待。如果有 100 個連接,其中只有一個有數據,那么仍然必須處理其他 99 個連接,才能輪到真正需要處理的連接。
- poll、epoll 和變體:這是對循環方法的改進,它用一個結構保存要監視的每個連接的數組,當在網絡套接字上發現數據時,通過回調機制調用處理函數。poll 的問題是這個結構會非常大,在列表中添加新的網絡連接時,修改結構會增加負載並影響性能。
- 選擇:
select()
函數調用使用一個靜態結構,它事先被硬編碼為相當小的數量(1024 個連接),因此不適用於非常大的部署。
在各種平台上還有其他實現(比如 Solaris 上的 /dev/poll 或 FreeBSD/NetBSD 上的 kqueue),它們在各自的 OS 上性能可能更好,但是無法移植,也不一定能夠解決處理請求的高層問題。
上面的所有解決方案都用簡單的循環等待並處理請求,然后把請求分派給另一個函數以處理實際的網絡交互。關鍵在於循環和網絡套接字需要大量管理代碼,這樣才能監聽、更新和控制不同的連接和接口。
處理許多連接的另一種方法是,利用現代內核中的多線程支持監聽和處理連接,為每個連接啟動一個新線程。這把責任直接交給操作系統,但是會在 RAM 和 CPU 方面增加相當大的開銷,因為每個線程都需要自己的執行空間。另外,如果每個線程都忙於處理網絡連接,線程之間的上下文切換會很頻繁。最后,許多內核並不適於處理如此大量的活躍線程。
libevent 庫實際上沒有更換 select()
、poll()
或其他機制的基礎。而是使用對於每個平台最高效的高性能解決方案在實現外加上一個包裝器。
為了實際處理每個請求,libevent 庫提供一種事件機制,它作為底層網絡后端的包裝器。事件系統讓為連接添加處理函數變得非常簡便,同時降低了底層 I/O 復雜性。這是 libevent 系統的核心。
libevent 庫的其他組件提供其他功能,包括緩沖的事件系統(用於緩沖發送到客戶端/從客戶端接收的數據)以及 HTTP、DNS 和 RPC 系統的核心實現。
創建 libevent 服務器的基本方法是,注冊當發生某一操作(比如接受來自客戶端的連接)時應該執行的函數,然后調用主事件循環event_dispatch()
。執行過程的控制現在由 libevent 系統處理。注冊事件和將調用的函數之后,事件系統開始自治;在應用程序運行時,可以在事件隊列中添加(注冊)或刪除(取消注冊)事件。事件注冊非常方便,可以通過它添加新事件以處理新打開的連接,從而構建靈活的網絡處理系統。
例如,可以打開一個監聽套接字,然后注冊一個回調函數,每當需要調用 accept()
函數以打開新連接時調用這個回調函數,這樣就創建了一個網絡服務器。清單 1 所示的代碼片段說明基本過程:
系統接口介紹
地址:http://www.dirlt.com/libev.html
Table of Contents
1 libev
主頁http://software.schmorp.de/pkg/libev.html
文檔http://software.schmorp.de/pkg/libev.html
libev所實現的功能就是一個強大的reactor,可能notify事件主要包括下面這些:
- ev_io // IO可讀可寫
- ev_stat // 文件屬性變化
- ev_async // 激活線程
- ev_signal // 信號處理
- ev_timer // 定時器
- ev_periodic // 周期任務
- ev_child // 子進程狀態變化
- ev_fork // 開辟進程
- ev_cleanup // event loop退出觸發事件
- ev_idle // 每次event loop空閑觸發事件
- ev_embed // TODO(zhangyan04):I have no idea.
- ev_prepare // 每次event loop之前事件
- ev_check // 每次event loop之后事件
1.1 About The Code
代碼風格相當嚴謹而且排版也非常工整,並且從域名看出作者是德國人。但是內部使用了大量的宏造成閱讀代碼並不是非常方便。 並且從代碼角度分析,應該是一開始支持有一個默認的event_loop,但是隨着多核產生實際應用中可能會使用到多個event_loop, 猜想作者應該是為了方便的話使用了很多宏進行替換。允許使用多個event_loop的宏是EV_MULTIPLICITY.比如下面這段代碼
void noinline ev_io_start (EV_P_ ev_io *w) { int fd = w->fd; if (expect_false (ev_is_active (w))) return; assert (("libev: ev_io_start called with negative fd", fd >= 0)); assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE)))); EV_FREQUENT_CHECK; ev_start (EV_A_ (W)w, 1); array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); wlist_add (&anfds[fd].head, (WL)w); fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); w->events &= ~EV__IOFDSET; EV_FREQUENT_CHECK; }
初次閱讀這個代碼會覺得非常難懂。
#define anfds ((loop)->anfds)
事實上一個ev_loop里面的字段是相當多的,不過也很正常本身就是一個強大的reactor.但是這造成一個直接后果, 就是對於想要了解ev_loop的全貌比較困難,所以想要徹底地了解libev也比較麻煩,所以我們只能夠從應用層面來嘗試了解它。
1.2 EventLoop
首先我們關注一下reactor本身。在libev下面reactor對象稱為event_loop.event_loop允許動態創建和銷毀,並且允許綁定自定義數據
struct ev_loop * ev_loop_new (unsigned int flags); void ev_loop_destroy (EV_P); void ev_set_userdata (EV_P_ void *data); void *ev_userdata (EV_P);
我們這里主要關注一下flags.這里面主要是選擇使用什么backend來進行poll操作,可以選擇的有:
- EVBACKEND_SELECT
- EVBACKEND_POLL
- EVBACKEND_EPOLL // 通常我們選擇這個
- EVBACKEND_KQUEUE
- EVBACKEND_DEVPOLL
- EVBACKEND_PORT
但是還有三個比較重要選項:
- EVFLAG_NOINOTIFY // 不適用inofity調用來使用ev_stat.這樣可以減少fd使用。
- EVFLAG_SIGNALFD // 使用signalfd來檢測信號是否發生,同樣這樣可以減少fd使用。
大部分時候我們使用EVFLAG_AUTO(0)一般就足夠滿足需求了,從代碼角度來看如果支持epoll的話那么首先會選擇epoll. 因為在watcher的回調函數里面是可以知道當前event_loop的,這樣就可以獲得自定義數據。然后我們看看這個event_loop如何運行和停止的
void ev_run (EV_P_ int flags); void ev_break (EV_P_ int how);
同樣我們這里比較關注flags和how這兩個參數。flags有下面這幾個:
- 0.通常這是我們想要的,每次輪詢在poll都會等待一段時間然后處理pending事件。
- EVRUN_NOWAIT.運行一次,在poll時候不會等待。這樣效果相當於只是處理pending事件。
- EVRUN_ONCE.運行一次,但是在poll時候會等待,然后處理pending事件。
而how有下面這幾個:
- EVBREAK_ONE.只是退出一次ev_run這個調用。通常來說使用這個就可以了。
- EVBREAK_ALL.退出所有的ev_run調用。這種情況存在於ev_run在pengding處理時候會遞歸調用。
在backend/epoll底層每次epoll_wait時候,libev提供了接口回調可以在epoll_wait前后調用
void ev_set_loop_release_cb (loop, void (*release)(EV_P), void (*acquire)(EV_P)) static void epoll_poll (EV_P_ ev_tstamp timeout) { /* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */ /* the default libev max wait time, however. */ EV_RELEASE_CB; eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, epoll_epermcnt ? 0 : ev_timeout_to_ms (timeout)); EV_ACQUIRE_CB; }
在event_loop里面我們還關心一件事情,就是每次event_loop輪詢的時間長短。通常來說這個不會是太大問題,但是在高性能情況下面我們需要設置
void ev_set_io_collect_interval (EV_P_ ev_tstamp interval); void ev_set_timeout_collect_interval (EV_P_ ev_tstamp interval);
在ev_run里面有使用這些參數的代碼比較麻煩。但是大意是這樣,如果我們這是了timeout_interval的話,那么我們每次檢查timeout時間的話必須 在timeout_interval,使用這段時間ev_sleep.但是這個又會影響到io_interval,所以內部做了一些換算,換算的結果作為epoll_wait超時時間。 不過同樣在大部分時候我們不需要關心它,默認時候是0.0,系統會使用最快的響應方式來處理。
1.3 Watcher
然后我們關心一下EventHandler.在libev下面watcher相當於EventHandler這么一個概念,通常里面會綁定fd回調函數以及我們需要關注的事件。 然后一旦觸發事件之后會觸發我們使用的回調函數,回調函數參數通常有reactor,watcher以及觸發的事件。這里不打算重復文檔里面的watcher 相關的內容和對應的API,但是對於某些內容的話可能會提到並且附帶一些注釋。之前我們還是看看通用過程,這里使用TYPE區分不同類型watcher.
typedef void (*)(struct ev_loop *loop, ev_TYPE *watcher, int revents) callback; // callback都是這種類型 ev_init (ev_TYPE *watcher, callback); // 初始化watcher ev_TYPE_set (ev_TYPE *watcher, [args]); // 設置watcher ev_TYPE_init (ev_TYPE *watcher, callback, [args]); // 通常使用這個函數最方便,初始化和設置都在這里 ev_TYPE_start (loop, ev_TYPE *watcher); // 注冊watcher ev_TYPE_stop (loop, ev_TYPE *watcher); // 注銷watcher ev_set_priority (ev_TYPE *watcher, int priority); // 設置優先級 ev_feed_event (loop, ev_TYPE *watcher, int revents); // 這個做跨線程通知非常有用,相當於觸發了某個事件。 bool ev_is_active (ev_TYPE *watcher); // watcher是否active. bool ev_is_pending (ev_TYPE *watcher); // watcher是否pending. int ev_clear_pending (loop, ev_TYPE *watcher); // 清除watcher pending狀態並且返回事件
wacther的狀態有下面這么幾種:
- initialiased.調用init函數初始化
- active.調用start進行注冊
- pending.已經觸發事件但是沒有處理
- inactive.調用stop注銷。這個狀態等同於initialised這個狀態。
其實關於每個watcher具體是怎么實現的沒有太多意思,因為大部分現有代碼都差不多。會在下一節說說內部數據結構是怎么安排的, 了解內部數據結構以及過程之后很多問題就可以避免了,比如"The special problem of disappearing file descriptors"這類問題。
1.4 How it works
1.4.1 ev_run
最主要的還是看看ev_run這個部分代碼。我們不打算仔細閱讀只是看看梗概然后大體分析一下數據結構應該怎么樣的
void ev_run (EV_P_ int flags) { assert (("libev: ev_loop recursion during release detected", loop_done != EVBREAK_RECURSE)); loop_done = EVBREAK_CANCEL; EV_INVOKE_PENDING; /* in case we recurse, ensure ordering stays nice and clean */ do { if (expect_false (loop_done)) break; /* update fd-related kernel structures */ fd_reify (EV_A); /* calculate blocking time */ { ev_tstamp waittime = 0.; ev_tstamp sleeptime = 0.; /* remember old timestamp for io_blocktime calculation */ ev_tstamp prev_mn_now = mn_now; /* update time to cancel out callback processing overhead */ time_update (EV_A_ 1e100); if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt))) { waittime = MAX_BLOCKTIME; if (timercnt) { ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge; if (waittime > to) waittime = to; } /* don't let timeouts decrease the waittime below timeout_blocktime */ if (expect_false (waittime < timeout_blocktime)) waittime = timeout_blocktime; /* extra check because io_blocktime is commonly 0 */ if (expect_false (io_blocktime)) { sleeptime = io_blocktime - (mn_now - prev_mn_now); if (sleeptime > waittime - backend_fudge) sleeptime = waittime - backend_fudge; if (expect_true (sleeptime > 0.)) { ev_sleep (sleeptime); waittime -= sleeptime; } } } assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */ backend_poll (EV_A_ waittime); assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */ /* update ev_rt_now, do magic */ time_update (EV_A_ waittime + sleeptime); } /* queue pending timers and reschedule them */ timers_reify (EV_A); /* relative timers called last */ EV_INVOKE_PENDING; } while (expect_true ( activecnt && !loop_done && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT)) )); if (loop_done == EVBREAK_ONE) loop_done = EVBREAK_CANCEL; }
我們可以總結一下大致步驟,其實和大部分的event loop寫出來差不多。
- 首先觸發那些已經pending的watchers.
- 判斷是否loop_done
- fd_reify.這個后面會單獨說。
- 計算出waittime並且進行必要的sleep.
- backend_poll開始輪詢,並且整理好pending事件
- timers_reify.這個和fd_reify不同
- 調用EV_INVOKE_PENDING來觸發pending的io事件
非常簡單。接下來我們看看fd_reify,backend_poll,timers_reify以及EV_INVOKE_PENDING.
1.4.2 fd_reify
下面是fd_reify代碼片段.可以看出,這個部分就是在修改fd關注的events。
inline_size void fd_reify (EV_P) { int i; for (i = 0; i < fdchangecnt; ++i) { int fd = fdchanges [i]; ANFD *anfd = anfds + fd; ev_io *w; unsigned char o_events = anfd->events; unsigned char o_reify = anfd->reify; anfd->reify = 0; /*if (expect_true (o_reify & EV_ANFD_REIFY)) probably a deoptimisation */ { anfd->events = 0; for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next) anfd->events |= (unsigned char)w->events; if (o_events != anfd->events) o_reify = EV__IOFDSET; /* actually |= */ } if (o_reify & EV__IOFDSET) backend_modify (EV_A_ fd, o_events, anfd->events); } fdchangecnt = 0; }
而這個fdchanges這個是在哪里調用的呢。我們可以看到就是在ev_io_start這個部分。也就是說如果我們想要修改 fd關注事件的話,我們必須顯示地ev_io_stop掉然后修正之后重新ev_io_start.底層調用fd_change的話底層維護 數組fdchanges來保存發生events變動的fd.
void noinline ev_io_start (EV_P_ ev_io *w) { int fd = w->fd; if (expect_false (ev_is_active (w))) return; assert (("libev: ev_io_start called with negative fd", fd >= 0)); assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE)))); EV_FREQUENT_CHECK; ev_start (EV_A_ (W)w, 1); array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); wlist_add (&anfds[fd].head, (WL)w); fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); w->events &= ~EV__IOFDSET; EV_FREQUENT_CHECK; } inline_size void fd_change (EV_P_ int fd, int flags) { unsigned char reify = anfds [fd].reify; anfds [fd].reify |= flags; if (expect_true (!reify)) { ++fdchangecnt; array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2); fdchanges [fdchangecnt - 1] = fd; } }
1.4.3 backend_poll
backend_poll底層支持很多poll實現,我們這里僅僅看ev_epoll.c就可以.代碼在這里面我們不列舉了, 如果某個fd觸發事件的話那么最終會調用fd_event(EV_A_,fd,event)來進行通知。所以我們看看fd_event.
inline_speed void fd_event_nocheck (EV_P_ int fd, int revents) { ANFD *anfd = anfds + fd; ev_io *w; for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next) { int ev = w->events & revents; if (ev) ev_feed_event (EV_A_ (W)w, ev); } } void noinline ev_feed_event (EV_P_ void *w, int revents) { W w_ = (W)w; int pri = ABSPRI (w_); if (expect_false (w_->pending)) pendings [pri][w_->pending - 1].events |= revents; else { w_->pending = ++pendingcnt [pri]; array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2); // set the watcher and revents. pendings [pri][w_->pending - 1].w = w_; pendings [pri][w_->pending - 1].events = revents; } }
可以看到底層是一個ANFD的數組,根據fd進行偏移。如果fd過大的話似乎會影響性能沒有hpserver里面的demuxtable實現方式好。 然后得到這個fd下面所有的watcher,然后在loop->pendings里面記錄所有這些觸發的watcher.
1.4.4 timers_reify
其中HEAP0就是最小堆下標。如果repeat的話說明需要重復發生,那么就會重新調整時間戳,如果不是repeat的話, 那么內部會調用ev_timer_stop這個方法將這個計時器移除。所有的定時任務都通過feed_reverse添加。feed_reverse 內部是維護一個動態數組來保存所有的定時器任務,然后在feed_reverse_done里面遍歷這些任務來觸發這些定時器任務。
inline_size void timers_reify (EV_P) { EV_FREQUENT_CHECK; if (timercnt && ANHE_at (timers [HEAP0]) < mn_now) { do { ev_timer *w = (ev_timer *)ANHE_w (timers [HEAP0]); /*assert (("libev: inactive timer on timer heap detected", ev_is_active (w)));*/ /* first reschedule or stop timer */ if (w->repeat) { ev_at (w) += w->repeat; if (ev_at (w) < mn_now) ev_at (w) = mn_now; assert (("libev: negative ev_timer repeat value found while processing timers", w->repeat > 0.)); ANHE_at_cache (timers [HEAP0]); downheap (timers, timercnt, HEAP0); } else ev_timer_stop (EV_A_ w); /* nonrepeating: stop timer */ EV_FREQUENT_CHECK; feed_reverse (EV_A_ (W)w); } while (timercnt && ANHE_at (timers [HEAP0]) < mn_now); feed_reverse_done (EV_A_ EV_TIMER); } }
1.4.5 EV_INVOKE_PENDING
這個宏最終調用的函數就是下面這個,遍歷所有的pendings事件並且逐一觸發。
void noinline ev_invoke_pending (EV_P) { int pri; for (pri = NUMPRI; pri--; ) while (pendingcnt [pri]) { ANPENDING *p = pendings [pri] + --pendingcnt [pri]; p->w->pending = 0; EV_CB_INVOKE (p->w, p->events); EV_FREQUENT_CHECK; } }
1.5 Example
嘗試編寫一個簡單的帶有超時的echo-server和echo-client就發現其實還有非常多的其他的工作量,比如buffer的管理狀態機實現等。 所以我沒有寫出一個完整的example,只是簡單地寫了假設echo-client連接上server的話就簡單地打印鏈接信息並且關閉。
1.5.1 common.h
#ifndef _COMMON_H_ #define _COMMON_H_ #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <strings.h> #include <cstdlib> #include <cstdio> #include <cstddef> #include <string> namespace common{ #define D(exp,fmt,...) do { \ if(!(exp)){ \ fprintf(stderr,fmt,##__VA_ARGS__); \ abort(); \ } \ }while(0) static void setnonblock(int fd){ fcntl(fd,F_SETFL,fcntl(fd,F_GETFL) | O_NONBLOCK); } static void setreuseaddr(int fd){ int ok=1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&ok,sizeof(ok)); } static void setaddress(const char* ip,int port,struct sockaddr_in* addr){ bzero(addr,sizeof(*addr)); addr->sin_family=AF_INET; inet_pton(AF_INET,ip,&(addr->sin_addr)); addr->sin_port=htons(port); } static std::string address_to_string(struct sockaddr_in* addr){ char ip[128]; inet_ntop(AF_INET,&(addr->sin_addr),ip,sizeof(ip)); char port[32]; snprintf(port,sizeof(port),"%d",ntohs(addr->sin_port)); std::string r; r=r+"("+ip+":"+port+")"; return r; } static int new_tcp_server(int port){ int fd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); D(fd>0,"socket failed(%m)\n"); setnonblock(fd); setreuseaddr(fd); sockaddr_in addr; setaddress("0.0.0.0",port,&addr); bind(fd,(struct sockaddr*)&addr,sizeof(addr)); listen(fd,64); // backlog = 64 return fd; } static int new_tcp_client(const char* ip,int port){ int fd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); setnonblock(fd); sockaddr_in addr; setaddress(ip,port,&addr); connect(fd,(struct sockaddr*)(&addr),sizeof(addr)); return fd; } }; // namespace common #endif // _COMMON_H_
1.5.2 echo-client.cc
#include "ev.h" #include "common.h" static void do_connected(struct ev_loop* reactor,ev_io* w,int events){ close(w->fd); ev_break(reactor,EVBREAK_ALL); } int main(){ struct ev_loop* reactor=ev_loop_new(EVFLAG_AUTO); int fd=common::new_tcp_client("127.0.0.1",34567); ev_io io; ev_io_init(&io,&do_connected,fd,EV_WRITE); ev_io_start(reactor,&io); ev_run(reactor,0); close(fd); ev_loop_destroy(reactor); return 0; }
1.5.3 echo-server.cc
#include "ev.h" #include "common.h" static void do_accept(struct ev_loop* reactor,ev_io* w,int events){ struct sockaddr_in addr; socklen_t addr_size=sizeof(addr); int conn=accept(w->fd,(struct sockaddr*)&addr,&addr_size); std::string r=common::address_to_string(&addr); fprintf(stderr,"accept %s\n",r.c_str()); close(conn); } int main(){ struct ev_loop* reactor=ev_loop_new(EVFLAG_AUTO); int fd=common::new_tcp_server(34567); ev_io w; ev_io_init(&w,do_accept,fd,EV_READ); ev_io_start(reactor,&w); ev_run(reactor,0); close(fd); ev_loop_destroy(reactor); }