目錄
開啟事件主循環
執行事件主循環
校對時間
阻塞/非阻塞
處理激活隊列中的event
事件主循環的退出
event_base_loopexit
event_base_loopbreak
開啟事件主循環
在libevent中,事件主循環的作用就是執行一個循環,在循環中監聽事件以及超時的事件並且將這些激活的事件進行處理。libevent提供了對用戶開放了兩種執行事件主循環的函數:
int event_base_dispatch(struct event_base *);
int event_base_loop(struct event_base *, int);
在event_base_dispatch函數中,實際上調用的是event_base_loop(event_base, 0);也就是如果使用event_base_dispatch函數執行事件主循環,那么會將event_base_loop的第二個參數設置為0去調用它,下面來看看event_base_loop函數的定義:
執行事件主循環
int
event_base_loop(struct event_base *base, int flags)
{
//設置為EVLOOP_NONBLOCK,那么event_loop只會處理當前已經激活的event,處理結束后就會退出event_loop
//設置為EVLOOP_ONCE,那么event_loop就會等待到第一個事件超時,處理在這段時間內激活的event,直到所有激活的事件都處理完就退出event_loop
//設置為其他值,那么就會一直循環監聽,直到被動退出
const struct eventop *evsel = base->evsel; //獲取使用的后端
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock); //event_base初始化后默認分配的是遞歸鎖
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base); //清空緩存的超時
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID(); //存放執行base主循環的線程id
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
timeout_correct(base, &tv); //用來檢查用戶是否回調了系統時間,如果改變了就往回調整系統時間
tv_p = &tv; //回調后的時間
//如果當前沒有事件激活,那么后面的dispatch就應該阻塞,如果不阻塞那么就會多次調用dispatch;
//而對於flags來說,如果把flags設置為EVLOOP_NONBLOCK,那么說明調用者希望后面的dispatch是非阻塞的
//因此如果當前沒有事件激活的,那么flags就不應該設置為EVLOOP_NONBLOCK;
//除此之外的其他情況則會將dispatch設置為非阻塞。
//dispatch是否阻塞取決於傳入的超時參數,超時參數描述了dispatch阻塞的時長,如果為0那么dispatch就立即返回,如果為-1就是一直阻塞,直到相應事件發生。
//因此這里如果需要阻塞,那么就設置dispatch的阻塞時長為從現在開始到第一個超時的event所需的時間
//否則則設置阻塞時長為0,相當於非阻塞。
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p); //計算min_heap中最先超時的event還有多長時間就要超時
} else {//如果有事件激活或者是event_loop為非阻塞,就不用等待
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv); //清空超時
}
/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {//如果base中的所有event都處理完了就退出,如果base中存在永久事件,那么event_loop是不會主動退出的
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
/* update last old time */
gettime(base, &base->event_tv); //更新系統時間
clear_time_cache(base); //清空緩存的時間
//根據前面,如果希望dispatch是非阻塞的,那么這里的tv_p就是0,該函數就會立即返回;
//如果希望dispatch是阻塞的,那么這里的tv_p就是距離第一個超時event的剩余時長
//這是為了處理在所有事件都沒有超時的情況下可能發生的感興趣事件
//如果在這段時間內沒有感興趣的事件發生,實際上這里的dispatch也並沒有對事件做任何處理
res = evsel->dispatch(base, tv_p);
//返回后,接下來就是處理超時的事件了
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base); //將base的min_heap中所有超時的事件以超時激活類型添加到激活隊列中
if (N_ACTIVE_CALLBACKS(base)) { //如果激活隊列中有事件
int n = event_process_active(base); //執行激活隊列中的event相應的回調函數,返回的n是成功執行的非內部事件數目
if ((flags & EVLOOP_ONCE) //如果設置了EVLOOP_ONCE,並且所有激活的事件都處理完了,那么就退出event_loop
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)//如果設置了EVLOOP_NONBLOCK那么也會退出event_loop循環
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
//循環結束
done:
clear_time_cache(base);
base->running_loop = 0; //表示base的循環結束
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
校對時間
在循環中,首先會調用timeout_correct來校對時間,之所以校對,是因為用戶是可以改變系統時間的,比如說把現在的10點改成了8點,那么當前的系統時間就會變小,由於定時器中存放的時間都是按照之前設置的系統時間,因此這些超時event會更晚發生,所以在每次循環的開始應該先進行校對時間。timeout_correct函數定義如下:
static void
timeout_correct(struct event_base *base, struct timeval *tv)
{
/* Caller must hold th_base_lock. */
struct event **pev;
unsigned int size;
struct timeval off;
int i;
if (use_monotonic) //如果平台支持use_monotonic,那么系統時間就不需要回調了,直接使用系統啟動時間
return;
/* Check if time is running backwards */
gettime(base, tv); //tv就是系統時間,也是進行回調糾正的時間
if (evutil_timercmp(tv, &base->event_tv, >=)) { //如果此時獲取的系統時間tv大於之前保存的時間event_tv,那么說明沒有進行回調
base->event_tv = *tv;
return;
}
//執行到這里說明剛剛獲得的tv小於保存的base->event_tv,說明用戶往回修改了系統時間
event_debug(("%s: time is running backwards, corrected",
__func__));
evutil_timersub(&base->event_tv, tv, &off); //計算二者之間的差值,保存到off中,反映了用戶調小了多少時間
/*
* We can modify the key element of the node without destroying
* the minheap property, because we change every element.
*/
pev = base->timeheap.p;
size = base->timeheap.n;
for (; size-- > 0; ++pev) {
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
for (i=0; i<base->n_common_timeouts; ++i) {
struct event *ev;
struct common_timeout_list *ctl =
base->common_timeout_queues[i];
TAILQ_FOREACH(ev, &ctl->events,
ev_timeout_pos.ev_next_with_common_timeout) {
struct timeval *ev_tv = &ev->ev_timeout;
ev_tv->tv_usec &= MICROSECONDS_MASK;
evutil_timersub(ev_tv, &off, ev_tv);
ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC |
(i<<COMMON_TIMEOUT_IDX_SHIFT);
}
}
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
}
校對時間的思路其實很簡單:event_base中的event_tv會緩存上一次循環中存儲的校對后的時間,Libevent判斷當前是否需要校對時間就是將當前的系統時間tv與event_tv進行比較,在正常情況下tv必定是不小於event_tv的,如果不是這樣,那么就說明用戶往回調整了時間,此時就計算二者之間的差值,這個差值就可以近似認為是“用戶往回調整的時長”,為了讓定時器中的事件都盡可能准時的發生,那么就需要讓這些事件的超時時間都往回調,如果還設置了common_timeout,那么也同樣進行調整,這樣就完成了校對時間,最后再把校對后的tv存儲到event_tv中即可。
阻塞/非阻塞
event_base_loop的第二個參數flags給出了兩種定義:
/** Block until we have an active event, then exit once all active events
* have had their callbacks run. */
#define EVLOOP_ONCE 0x01
/** Do not block: see which events are ready now, run the callbacks
* of the highest-priority ones, then exit. */
#define EVLOOP_NONBLOCK 0x02
從英文注釋中也能看出,EVLOOP_ONCE是在事件主循環中執行當前激活的event,如果當前沒有事件激活,那么就會一直阻塞直到有事件激活,當所有激活的event都執行結束后就退出event_base_loop;而EVLOOP_NONBLOCK則是非阻塞的,直接去處理當前激活的event,如果當前有event激活,那么就直接處理后退出,如果沒有event激活,那么也會退出;如果設置為以上兩種flag之外的值,比如說event_base_dispatch中傳入event_base_loop的flags為0,那么event_base_loop就會一直執行下去,直到base的event_gotterm或event_break被置位。
那么EVLOOP_ONCE是如何實現阻塞,而EVLOOP_NONBLOCK是如何實現非阻塞的呢?
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p); //計算min_heap中最先超時的event還有多長時間就要超時
} else {//如果有事件激活或者是event_loop為非阻塞,就不用等待
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv); //清空超時
}
在event_base_loop的這段程序中,用到了timeout_next函數,這個函數定義如下:
static int
timeout_next(struct event_base *base, struct timeval **tv_p) //tv_p中保存了最先超時的event的剩余時間
{
/* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
ev = min_heap_top(&base->timeheap); //獲取最先超時的event
if (ev == NULL) { //說明timeheap中沒有event,也就是沒有事件激活
/* if no time-based events are active wait for I/O */
*tv_p = NULL;
goto out;
}
if (gettime(base, &now) == -1) { //獲取時間到now中
res = -1;
goto out;
}
//比較最先超時的event的超時時間與獲取到的時間,
//如果最先超時的時間都已經小於等於當前獲取到的時間,說明此時可能已經超過了設置的時間,就清空tv,並且立即返回
if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
evutil_timerclear(tv);
goto out;
}
//到這里說明最先超時的event還沒有超時
evutil_timersub(&ev->ev_timeout, &now, tv); //計算剩余的超時時間,保存到tv中
EVUTIL_ASSERT(tv->tv_sec >= 0);
EVUTIL_ASSERT(tv->tv_usec >= 0);
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
return (res);
}
可以知道,該函數用來計算當前距離min_heap中最先超時的那個event超時還剩多長時間。如果最先超時的那個event的超時時間不小於當前的系統時間,那么就會清空傳入的tv的超時值然后返回,否則就計算超時時間與系統時間的差值保存到tv的超時值中。
回到前面,N_ACTIVE_CALLBACKS(base)是一個宏定義,表示base中已激活的event數量。當base中沒有激活的event,並且沒有設置EVLOOP_NONBLOCK時,tv中保存的就是還有多長時間min_heap中第一個event會超時。否則tv中保存的超時值就是0。
接下來就繼續執行了evsel->dispatch(base, tv_p); 語句,直接調用base后端方法中綁定的dispatch函數,可以參考epoll模型中該函數的作用。如果base所綁定的eventop為epoll,那么顯然在dispatch中就會調用epoll_wait。而這里的tv_p參數也會作為超時參數傳給epoll_wait。如果base中沒有激活的event並且沒有設置EVLOOP_NONBLOCK,說明需要按照阻塞的方式去執行dispatch,此時tv_p中保存的是還有多長時間第一個event超時,就讓dispatch阻塞tv_p這么長的時間去監聽所有已添加但未激活的event。除了這種情況,如果base中存在已激活的event,那么自然就應該馬上到激活隊列中去處理這些event,因此傳遞給dispatch的tv_p為0讓其立刻返回;如果設置了EVLOOP_NONBLOCK,其含義是只處理當前已經激活的,不需要阻塞去監聽其他event,因此這里依然會設置tv_p為0讓dispatch立刻返回。值得一提的是,這里的dispatch函數的調用相當於是在第一個event超時發生之前去監聽所有event感興趣的非超時事件。
在dispatch函數返回后,說明此時監聽非超時event結束,接下來就開始處理超時event。先調用timeout_process將min_heap中所有超時的event添加到激活隊列中(可參考timeout_process),timeout_process函數返回后,如果base中有激活的event,那么就會調用event_process_active函數去處理這些event,該函數定義如下:
處理激活隊列中的event
static int
event_process_active(struct event_base *base)//遍歷base的激活隊列中所有event,調用其回調函數
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c = 0;
for (i = 0; i < base->nactivequeues; ++i) { //遍歷激活隊列中的事件
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { //同一個優先級下可以有多個事件
base->event_running_priority = i; //設置當前的優先級
activeq = &base->activequeues[i]; //獲取優先級i下的所有event組成的鏈表
c = event_process_active_single_queue(base, activeq); //遍歷activeq鏈表,調用其中每個event的回調函數
if (c < 0) {//c是執行的非內部事件數目
base->event_running_priority = -1;
return -1;
} else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */
/* If we get here, all of the events we processed
* were internal. Continue. */
}
}
event_process_deferred_callbacks(&base->defer_queue,&base->event_break); //處理延時激活隊列中激活的event
base->event_running_priority = -1;
return c;
}
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq)
{
struct event *ev;
int count = 0;
EVUTIL_ASSERT(activeq != NULL);
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST) //如果是永久事件就從激活隊列中刪除,保留其在添加隊列和定時隊列
event_queue_remove(base, ev, EVLIST_ACTIVE);
else //非永久事件從所有隊列中刪除
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL))//非內部事件計數
++count;
......
switch (ev->ev_closure) { //在調用回調函數是否進行其他行為
case EV_CLOSURE_SIGNAL:
event_signal_closure(base, ev);
break;
case EV_CLOSURE_PERSIST: //對於永久事件,在調用回調函數之前會重新調用event_add來添加該事件到對應隊列中
event_persist_closure(base, ev);
break;
default:
case EV_CLOSURE_NONE: //對於一般事件,直接調用回調函數
EVBASE_RELEASE_LOCK(base, th_base_lock);//釋放鎖
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg); //調用回調函數
break;
}
......
}
到這里,就把所有激活的event都調用了相應的回調函數,如果是永久事件,那么它還會存在於隊列中等待下一次激活,如果是普通事件則會在激活后徹底刪除。激活后會進行一些判斷:如果flags設置了EVLOOP_ONCE,按照前面所說的,必須要將當前所有已經激活的event都處理完才退出主循環,因此這里還需要再次確保激活隊列中沒有event,這里判斷n的主要原因是確保剛才激活的event都是用戶定義的event而不是內部event(如common_timeout event就屬於內部event),如果只處理了內部event那也被認為是沒有處理任何激活事件,當以上條件都滿足,那么done就會設置為1,下一輪while(!done)就會直接退出,主循環結束;而如果flags設置了EVLOOP_NONBLOCK,根據其含義下一次循環也直接退出。
if (N_ACTIVE_CALLBACKS(base)) { //如果激活隊列中有事件
int n = event_process_active(base); //執行激活隊列中的event相應的回調函數,返回的n是成功執行的非內部事件數目
if ((flags & EVLOOP_ONCE) //如果設置了EVLOOP_ONCE,並且所有激活的事件都處理完了,那么就退出event_loop
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)//如果設置了EVLOOP_NONBLOCK那么也會退出event_loop循環
done = 1;
}
這里提到了設置EVLOOP_NONBLOCK和EVLOOP_ONCE時退出主循環的方式,前面說過,如果設置的flags不是這二者(如調用event_base_dispatch),那么主循環就會一直執行下去,那么此時又該如何退出事件主循環呢?
事件主循環的退出
對於event_base_loop的退出,實際上是通過設置base的event_gotterm或event_break來實現的,在base的英文注釋中提到,設置前者表明會在當前所有激活的event都處理結束后退出主循環,而設置后者表明會立刻退出主循環 ,二者的注釋如下所示:
/** Set if we should terminate the loop once we're done processing events. */
int event_gotterm;
/** Set if we should terminate the loop immediately */
int event_break;
關於設置base中的這兩個變量,libevent向用戶提供了兩種退出event_base_loop的方法:
int event_base_loopexit(struct event_base *, const struct timeval *);
int event_base_loopbreak(struct event_base *);
event_base_loopexit
先來看看event_base_loopexit函數,該函數定義如下:
int
event_base_loopexit(struct event_base *event_base, const struct timeval *tv)
{
return (event_base_once(event_base, -1, EV_TIMEOUT, event_loopexit_cb,
event_base, tv)); //添加一個新的純超時事件,在tv時超時,回調event_loopexit_cb設置event_gotterm位
}
int
event_base_once(struct event_base *base, evutil_socket_t fd, short events,
void (*callback)(evutil_socket_t, short, void *),
void *arg, const struct timeval *tv)
{
struct event_once *eonce;
struct timeval etv;
int res = 0;
/* We cannot support signals that just fire once, or persistent
* events. */
if (events & (EV_SIGNAL|EV_PERSIST))
return (-1);
if ((eonce = mm_calloc(1, sizeof(struct event_once))) == NULL)
return (-1);
eonce->cb = callback;
eonce->arg = arg;
//注冊一個新事件,回調函數為event_once_cb
if (events == EV_TIMEOUT) { //如果是超時事件
if (tv == NULL) {
evutil_timerclear(&etv);
tv = &etv;
}
evtimer_assign(&eonce->ev, base, event_once_cb, eonce); //注冊一個純超時事件
} else if (events & (EV_READ|EV_WRITE)) { //如果是讀寫事件
events &= EV_READ|EV_WRITE;
event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce);
} else {
/* Bad event combination */
mm_free(eonce);
return (-1);
}
if (res == 0)
res = event_add(&eonce->ev, tv); //添加事件到相應隊列中
if (res != 0) {
mm_free(eonce);
return (res);
}
return (0);
}
也就是說,event_base_loopexit函數實際上就是重新注冊了一個純超時event,超時時直接調用event_loopexit_cb函數,event_loopexit_cb函數定義如下:
static void
event_loopexit_cb(evutil_socket_t fd, short what, void *arg)
{
struct event_base *base = arg;
base->event_gotterm = 1;
}
可以看到,在event_loopexit_cb函數,將base的event_gotterm置為1。在此之后,當event_base_loop執行完當前循環,在下一次循環開始檢測到event_gotterm非0,就會退出主循環。因此,event_base_loopexit實際上是傳入一個timeval超時參數,它會根據超時參數注冊並且添加一個純超時event,當event超時就會回調event_loopexit_cb函數將base的event_gotterm置為1,從而讓事件主循環退出。
接下來再看看event_base_loopbreak函數。
event_base_loopbreak
event_base_loopbreak函數定義如下所示:
int
event_base_loopbreak(struct event_base *event_base) //設置event_break位
{
int r = 0;
if (event_base == NULL)
return (-1);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
event_base->event_break = 1;
if (EVBASE_NEED_NOTIFY(event_base)) {
r = evthread_notify_base(event_base);
} else {
r = (0);
}
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
return r;
}
由此可見event_base_loopbreak的執行邏輯非常簡單,就是直接將base的event_break置為1來退出主循環。
除此之外,從event_base_loop函數中可以看到,如果base中沒有event了,那么也會直接退出主循環。
————————————————
版權聲明:本文為CSDN博主「HerofH_」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_28114615/article/details/96826553