分析libevent的源代碼,我的想法的是先分析各種結構體,struct event_base、struct event,然后是event_base_new函數、event_new函數、event_add函數,最后分析event_base_dispatch函數。
一、各種結構體
1、event_base
1 struct event_base { 2 /** Function pointers and other data to describe this event_base's 3 * backend. */ 4 const struct eventop *evsel; 5 /** Pointer to backend-specific data. */ 6 void *evbase; 7 8 /** List of changes to tell backend about at next dispatch. Only used 9 * by the O(1) backends. */ 10 struct event_changelist changelist; 11 12 /** Function pointers used to describe the backend that this event_base 13 * uses for signals */ 14 const struct eventop *evsigsel; 15 /** Data to implement the common signal handelr code. */ 16 struct evsig_info sig; 17 18 /** Number of virtual events */ 19 int virtual_event_count; 20 /** Maximum number of virtual events active */ 21 int virtual_event_count_max; 22 /** Number of total events added to this event_base */ 23 int event_count; 24 /** Maximum number of total events added to this event_base */ 25 int event_count_max; 26 /** Number of total events active in this event_base */ 27 int event_count_active; 28 /** Maximum number of total events active in this event_base */ 29 int event_count_active_max; 30 31 /** Set if we should terminate the loop once we're done processing 32 * events. */ 33 int event_gotterm; 34 /** Set if we should terminate the loop immediately */ 35 int event_break; 36 /** Set if we should start a new instance of the loop immediately. */ 37 int event_continue; 38 39 /** The currently running priority of events */ 40 int event_running_priority; 41 42 /** Set if we're running the event_base_loop function, to prevent 43 * reentrant invocation. */ 44 int running_loop; 45 46 /** Set to the number of deferred_cbs we've made 'active' in the 47 * loop. This is a hack to prevent starvation; it would be smarter 48 * to just use event_config_set_max_dispatch_interval's max_callbacks 49 * feature */ 50 int n_deferreds_queued; 51 52 /* Active event management. */ 53 /** An array of nactivequeues queues for active event_callbacks (ones 54 * that have triggered, and whose callbacks need to be called). Low 55 * priority numbers are more important, and stall higher ones. 56 */ 57 struct evcallback_list *activequeues; 58 /** The length of the activequeues array */ 59 int nactivequeues; 60 /** A list of event_callbacks that should become active the next time 61 * we process events, but not this time. */ 62 struct evcallback_list active_later_queue; 63 64 /* common timeout logic */ 65 66 /** An array of common_timeout_list* for all of the common timeout 67 * values we know. */ 68 struct common_timeout_list **common_timeout_queues; 69 /** The number of entries used in common_timeout_queues */ 70 int n_common_timeouts; 71 /** The total size of common_timeout_queues. */ 72 int n_common_timeouts_allocated; 73 74 /** Mapping from file descriptors to enabled (added) events */ 75 struct event_io_map io; 76 77 /** Mapping from signal numbers to enabled (added) events. */ 78 struct event_signal_map sigmap; 79 80 /** Priority queue of events with timeouts. */ 81 struct min_heap timeheap; 82 83 /** Stored timeval: used to avoid calling gettimeofday/clock_gettime 84 * too often. */ 85 struct timeval tv_cache; 86 87 struct evutil_monotonic_timer monotonic_timer; 88 89 /** Difference between internal time (maybe from clock_gettime) and 90 * gettimeofday. */ 91 struct timeval tv_clock_diff; 92 /** Second in which we last updated tv_clock_diff, in monotonic time. */ 93 time_t last_updated_clock_diff; 94 95 #ifndef EVENT__DISABLE_THREAD_SUPPORT 96 /* threading support */ 97 /** The thread currently running the event_loop for this base */ 98 unsigned long th_owner_id; 99 /** A lock to prevent conflicting accesses to this event_base */ 100 void *th_base_lock; 101 /** A condition that gets signalled when we're done processing an 102 * event with waiters on it. */ 103 void *current_event_cond; 104 /** Number of threads blocking on current_event_cond. */ 105 int current_event_waiters; 106 #endif 107 /** The event whose callback is executing right now */ 108 struct event_callback *current_event; 109 110 #ifdef _WIN32 111 /** IOCP support structure, if IOCP is enabled. */ 112 struct event_iocp_port *iocp; 113 #endif 114 115 /** Flags that this base was configured with */ 116 enum event_base_config_flag flags; 117 118 struct timeval max_dispatch_time; 119 int max_dispatch_callbacks; 120 int limit_callbacks_after_prio; 121 122 /* Notify main thread to wake up break, etc. */ 123 /** True if the base already has a pending notify, and we don't need 124 * to add any more. */ 125 int is_notify_pending; 126 /** A socketpair used by some th_notify functions to wake up the main 127 * thread. */ 128 evutil_socket_t th_notify_fd[2]; 129 /** An event used by some th_notify functions to wake up the main 130 * thread. */ 131 struct event th_notify; 132 /** A function used to wake up the main thread from another thread. */ 133 int (*th_notify_fn)(struct event_base *base); 134 135 /** Saved seed for weak random number generator. Some backends use 136 * this to produce fairness among sockets. Protected by th_base_lock. */ 137 struct evutil_weakrand_state weakrand_seed; 138 139 /** List of event_onces that have not yet fired. */ 140 LIST_HEAD(once_event_list, event_once) once_events; 141 142 };
struct event_base結構體在event-internal.h文件中定義。
二、初始化函數
1、event_base_new函數
1 struct event_base * 2 event_base_new(void) 3 { 4 struct event_base *base = NULL; 5 struct event_config *cfg = event_config_new(); 6 if (cfg) { 7 base = event_base_new_with_config(cfg); 8 event_config_free(cfg); 9 } 10 return base; 11 }
(1)調用event_config_new函數分配一個struct event_config結構體。
(2)如果分配成功,就調用event_base_new_with_config(cfg)分配一個struct event_base對象指針,然后將該指針返回。
總結:所以event_base_new還是調用了event_base_new_with_config函數。所以下面接着來看event_base_new_with_config函數。
2、event_base_new_with_config函數
1 struct event_base * 2 event_base_new_with_config(const struct event_config *cfg) 3 { 4 int i; 5 struct event_base *base; 6 int should_check_environment; 7 8 #ifndef EVENT__DISABLE_DEBUG_MODE 9 event_debug_mode_too_late = 1; 10 #endif 11 12 if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) { 13 event_warn("%s: calloc", __func__); 14 return NULL; 15 } 16 17 if (cfg) 18 base->flags = cfg->flags; 19 20 should_check_environment = 21 !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV)); 22 23 { 24 struct timeval tmp; 25 int precise_time = 26 cfg && (cfg->flags & EVENT_BASE_FLAG_PRECISE_TIMER); 27 int flags; 28 if (should_check_environment && !precise_time) { 29 precise_time = evutil_getenv_("EVENT_PRECISE_TIMER") != NULL; 30 base->flags |= EVENT_BASE_FLAG_PRECISE_TIMER; 31 } 32 flags = precise_time ? EV_MONOT_PRECISE : 0; 33 evutil_configure_monotonic_time_(&base->monotonic_timer, flags); 34 35 gettime(base, &tmp); 36 } 37 38 min_heap_ctor_(&base->timeheap); 39 40 base->sig.ev_signal_pair[0] = -1; 41 base->sig.ev_signal_pair[1] = -1; 42 base->th_notify_fd[0] = -1; 43 base->th_notify_fd[1] = -1; 44 45 TAILQ_INIT(&base->active_later_queue); 46 47 evmap_io_initmap_(&base->io); 48 evmap_signal_initmap_(&base->sigmap); 49 event_changelist_init_(&base->changelist); 50 51 base->evbase = NULL; 52 53 if (cfg) { 54 memcpy(&base->max_dispatch_time, 55 &cfg->max_dispatch_interval, sizeof(struct timeval)); 56 base->limit_callbacks_after_prio = 57 cfg->limit_callbacks_after_prio; 58 } else { 59 base->max_dispatch_time.tv_sec = -1; 60 base->limit_callbacks_after_prio = 1; 61 } 62 if (cfg && cfg->max_dispatch_callbacks >= 0) { 63 base->max_dispatch_callbacks = cfg->max_dispatch_callbacks; 64 } else { 65 base->max_dispatch_callbacks = INT_MAX; 66 } 67 if (base->max_dispatch_callbacks == INT_MAX && 68 base->max_dispatch_time.tv_sec == -1) 69 base->limit_callbacks_after_prio = INT_MAX; 70 71 for (i = 0; eventops[i] && !base->evbase; i++) { 72 if (cfg != NULL) { 73 /* determine if this backend should be avoided */ 74 if (event_config_is_avoided_method(cfg, 75 eventops[i]->name)) 76 continue; 77 if ((eventops[i]->features & cfg->require_features) 78 != cfg->require_features) 79 continue; 80 } 81 82 /* also obey the environment variables */ 83 if (should_check_environment && 84 event_is_method_disabled(eventops[i]->name)) 85 continue; 86 87 base->evsel = eventops[i]; 88 89 base->evbase = base->evsel->init(base); 90 } 91 92 if (base->evbase == NULL) { 93 event_warnx("%s: no event mechanism available", 94 __func__); 95 base->evsel = NULL; 96 event_base_free(base); 97 return NULL; 98 } 99 100 if (evutil_getenv_("EVENT_SHOW_METHOD")) 101 event_msgx("libevent using: %s", base->evsel->name); 102 103 /* allocate a single active event queue */ 104 if (event_base_priority_init(base, 1) < 0) { 105 event_base_free(base); 106 return NULL; 107 } 108 109 /* prepare for threading */ 110 111 #if !defined(EVENT__DISABLE_THREAD_SUPPORT) && !defined(EVENT__DISABLE_DEBUG_MODE) 112 event_debug_created_threadable_ctx_ = 1; 113 #endif 114 115 #ifndef EVENT__DISABLE_THREAD_SUPPORT 116 if (EVTHREAD_LOCKING_ENABLED() && 117 (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { 118 int r; 119 EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0); 120 EVTHREAD_ALLOC_COND(base->current_event_cond); 121 r = evthread_make_base_notifiable(base); 122 if (r<0) { 123 event_warnx("%s: Unable to make base notifiable.", __func__); 124 event_base_free(base); 125 return NULL; 126 } 127 } 128 #endif 129 130 #ifdef _WIN32 131 if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP)) 132 event_base_start_iocp_(base, cfg->n_cpus_hint); 133 #endif 134 135 return (base); 136 }
(1)調用mm_calloc函數分配一塊大小為sizeof(struct event_base)的內存空間。
(2)如果形參cfg不為NULL,則將base.flags賦值為cfg->flags。
(3)第71-90行設置了實際使用的后端機制,for循環從遍歷數組eventops,直到找到一個可用的后端為止,可以看一下eventops。

1 #ifdef EVENT__HAVE_EVENT_PORTS 2 extern const struct eventop evportops; 3 #endif 4 #ifdef EVENT__HAVE_SELECT 5 extern const struct eventop selectops; 6 #endif 7 #ifdef EVENT__HAVE_POLL 8 extern const struct eventop pollops; 9 #endif 10 #ifdef EVENT__HAVE_EPOLL 11 extern const struct eventop epollops; 12 #endif 13 #ifdef EVENT__HAVE_WORKING_KQUEUE 14 extern const struct eventop kqops; 15 #endif 16 #ifdef EVENT__HAVE_DEVPOLL 17 extern const struct eventop devpollops; 18 #endif 19 #ifdef _WIN32 20 extern const struct eventop win32ops; 21 #endif 22 23 /* Array of backends in order of preference. */ 24 static const struct eventop *eventops[] = { 25 #ifdef EVENT__HAVE_EVENT_PORTS 26 &evportops, 27 #endif 28 #ifdef EVENT__HAVE_WORKING_KQUEUE 29 &kqops, 30 #endif 31 #ifdef EVENT__HAVE_EPOLL 32 &epollops, 33 #endif 34 #ifdef EVENT__HAVE_DEVPOLL 35 &devpollops, 36 #endif 37 #ifdef EVENT__HAVE_POLL 38 &pollops, 39 #endif 40 #ifdef EVENT__HAVE_SELECT 41 &selectops, 42 #endif 43 #ifdef _WIN32 44 &win32ops, 45 #endif 46 NULL 47 };
從代碼中可以看到,根據宏定義來決定某些后端機制是否存在,這樣就可以找到運行機子上支持的一個可用的后端機制,而且需要注意,epool、pool、select的順序,所以如果支持epoll就不會選擇poll,如果支持poll就不會選擇select,select機制是最后的選擇。

1 /** Structure to define the backend of a given event_base. */ 2 struct eventop { 3 /** The name of this backend. */ 4 const char *name; 5 /** Function to set up an event_base to use this backend. It should 6 * create a new structure holding whatever information is needed to 7 * run the backend, and return it. The returned pointer will get 8 * stored by event_init into the event_base.evbase field. On failure, 9 * this function should return NULL. */ 10 void *(*init)(struct event_base *); 11 /** Enable reading/writing on a given fd or signal. 'events' will be 12 * the events that we're trying to enable: one or more of EV_READ, 13 * EV_WRITE, EV_SIGNAL, and EV_ET. 'old' will be those events that 14 * were enabled on this fd previously. 'fdinfo' will be a structure 15 * associated with the fd by the evmap; its size is defined by the 16 * fdinfo field below. It will be set to 0 the first time the fd is 17 * added. The function should return 0 on success and -1 on error. 18 */ 19 int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); 20 /** As "add", except 'events' contains the events we mean to disable. */ 21 int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); 22 /** Function to implement the core of an event loop. It must see which 23 added events are ready, and cause event_active to be called for each 24 active event (usually via event_io_active or such). It should 25 return 0 on success and -1 on error. 26 */ 27 int (*dispatch)(struct event_base *, struct timeval *); 28 /** Function to clean up and free our data from the event_base. */ 29 void (*dealloc)(struct event_base *); 30 /** Flag: set if we need to reinitialize the event base after we fork. 31 */ 32 int need_reinit; 33 /** Bit-array of supported event_method_features that this backend can 34 * provide. */ 35 enum event_method_feature features; 36 /** Length of the extra information we should record for each fd that 37 has one or more active events. This information is recorded 38 as part of the evmap entry for each fd, and passed as an argument 39 to the add and del functions above. 40 */ 41 size_t fdinfo_len; 42 };
struct eventop結構定義了后端機制的一個公共接口,至於每個后端是如何將自己的函數封裝成符合這個接口的,我下面會逐個分析。
(4)然后調用init函數來初始化event_base對象。init函數的具體實現根據不同的后端機制會有所不同。
3、event_new函數
1 struct event * 2 event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg) 3 { 4 struct event *ev; 5 ev = mm_malloc(sizeof(struct event)); 6 if (ev == NULL) 7 return (NULL); 8 if (event_assign(ev, base, fd, events, cb, arg) < 0) { 9 mm_free(ev); 10 return (NULL); 11 } 12 13 return (ev); 14 }
(1)調用mm_malloc函數分配一塊大小為sizeof(struct event)的內存空間。
(2)event_new的實現類似於event_base_new函數類似,分配好空間之后,調用了event_assign函數來填充結構體。
4、event_assign函數
1 int 2 event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg) 3 { 4 if (!base) 5 base = current_base; 6 if (arg == &event_self_cbarg_ptr_) 7 arg = ev; 8 9 event_debug_assert_not_added_(ev); 10 11 ev->ev_base = base; 12 13 ev->ev_callback = callback; 14 ev->ev_arg = arg; 15 ev->ev_fd = fd; 16 ev->ev_events = events; 17 ev->ev_res = 0; 18 ev->ev_flags = EVLIST_INIT; 19 ev->ev_ncalls = 0; 20 ev->ev_pncalls = NULL; 21 22 if (events & EV_SIGNAL) { 23 if ((events & (EV_READ|EV_WRITE|EV_CLOSED)) != 0) { 24 event_warnx("%s: EV_SIGNAL is not compatible with " 25 "EV_READ, EV_WRITE or EV_CLOSED", __func__); 26 return -1; 27 } 28 ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL; 29 } else { 30 if (events & EV_PERSIST) { 31 evutil_timerclear(&ev->ev_io_timeout); 32 ev->ev_closure = EV_CLOSURE_EVENT_PERSIST; 33 } else { 34 ev->ev_closure = EV_CLOSURE_EVENT; 35 } 36 } 37 38 min_heap_elem_init_(ev); 39 40 if (base != NULL) { 41 /* by default, we put new events into the middle priority */ 42 ev->ev_pri = base->nactivequeues / 2; 43 } 44 45 event_debug_note_setup_(ev); 46 47 return 0; 48 }
(1)event_assign函數的主要操作是給形參struct event *ev的成員賦值,包括ev->ev_base、ev->ev_callback、ev->ev_arg、ev->ev_fd、ev->ev_events等
總結:event_new、event_assign函數會把傳遞進來的struct event_base* base保存在獲取到的strut event結構體內部。
5、event_add函數
1 int 2 event_add(struct event *ev, const struct timeval *tv) 3 { 4 int res; 5 6 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 7 event_warnx("%s: event has no event_base set.", __func__); 8 return -1; 9 } 10 11 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 12 13 res = event_add_nolock_(ev, tv, 0); 14 15 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 16 17 return (res); 18 }
(1)event_add函數調用了event_add_nolock_函數進行實際的操作。
6、event_add_nolock_函數
1 /* Implementation function to add an event. Works just like event_add, 2 * except: 1) it requires that we have the lock. 2) if tv_is_absolute is set, 3 * we treat tv as an absolute time, not as an interval to add to the current 4 * time */ 5 int 6 event_add_nolock_(struct event *ev, const struct timeval *tv, 7 int tv_is_absolute) 8 { 9 struct event_base *base = ev->ev_base; 10 int res = 0; 11 int notify = 0; 12 13 EVENT_BASE_ASSERT_LOCKED(base); 14 event_debug_assert_is_setup_(ev); 15 16 event_debug(( 17 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p", 18 ev, 19 EV_SOCK_ARG(ev->ev_fd), 20 ev->ev_events & EV_READ ? "EV_READ " : " ", 21 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ", 22 ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ", 23 tv ? "EV_TIMEOUT " : " ", 24 ev->ev_callback)); 25 26 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); 27 28 if (ev->ev_flags & EVLIST_FINALIZING) { 29 /* XXXX debug */ 30 return (-1); 31 } 32 33 /* 34 * prepare for timeout insertion further below, if we get a 35 * failure on any step, we should not change any state. 36 */ 37 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { 38 if (min_heap_reserve_(&base->timeheap, 39 1 + min_heap_size_(&base->timeheap)) == -1) 40 return (-1); /* ENOMEM == errno */ 41 } 42 43 /* If the main thread is currently executing a signal event's 44 * callback, and we are not the main thread, then we want to wait 45 * until the callback is done before we mess with the event, or else 46 * we can race on ev_ncalls and ev_pncalls below. */ 47 #ifndef EVENT__DISABLE_THREAD_SUPPORT 48 if (base->current_event == event_to_event_callback(ev) && 49 (ev->ev_events & EV_SIGNAL) 50 && !EVBASE_IN_THREAD(base)) { 51 ++base->current_event_waiters; 52 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 53 } 54 #endif 55 56 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) && 57 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { 58 if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED)) 59 res = evmap_io_add_(base, ev->ev_fd, ev); 60 else if (ev->ev_events & EV_SIGNAL) 61 res = evmap_signal_add_(base, (int)ev->ev_fd, ev); 62 if (res != -1) 63 event_queue_insert_inserted(base, ev); 64 if (res == 1) { 65 /* evmap says we need to notify the main thread. */ 66 notify = 1; 67 res = 0; 68 } 69 } 70 71 /* 72 * we should change the timeout state only if the previous event 73 * addition succeeded. 74 */ 75 if (res != -1 && tv != NULL) { 76 struct timeval now; 77 int common_timeout; 78 #ifdef USE_REINSERT_TIMEOUT 79 int was_common; 80 int old_timeout_idx; 81 #endif 82 83 /* 84 * for persistent timeout events, we remember the 85 * timeout value and re-add the event. 86 * 87 * If tv_is_absolute, this was already set. 88 */ 89 if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute) 90 ev->ev_io_timeout = *tv; 91 92 #ifndef USE_REINSERT_TIMEOUT 93 if (ev->ev_flags & EVLIST_TIMEOUT) { 94 event_queue_remove_timeout(base, ev); 95 } 96 #endif 97 98 /* Check if it is active due to a timeout. Rescheduling 99 * this timeout before the callback can be executed 100 * removes it from the active list. */ 101 if ((ev->ev_flags & EVLIST_ACTIVE) && 102 (ev->ev_res & EV_TIMEOUT)) { 103 if (ev->ev_events & EV_SIGNAL) { 104 /* See if we are just active executing 105 * this event in a loop 106 */ 107 if (ev->ev_ncalls && ev->ev_pncalls) { 108 /* Abort loop */ 109 *ev->ev_pncalls = 0; 110 } 111 } 112 113 event_queue_remove_active(base, event_to_event_callback(ev)); 114 } 115 116 gettime(base, &now); 117 118 common_timeout = is_common_timeout(tv, base); 119 #ifdef USE_REINSERT_TIMEOUT 120 was_common = is_common_timeout(&ev->ev_timeout, base); 121 old_timeout_idx = COMMON_TIMEOUT_IDX(&ev->ev_timeout); 122 #endif 123 124 if (tv_is_absolute) { 125 ev->ev_timeout = *tv; 126 } else if (common_timeout) { 127 struct timeval tmp = *tv; 128 tmp.tv_usec &= MICROSECONDS_MASK; 129 evutil_timeradd(&now, &tmp, &ev->ev_timeout); 130 ev->ev_timeout.tv_usec |= 131 (tv->tv_usec & ~MICROSECONDS_MASK); 132 } else { 133 evutil_timeradd(&now, tv, &ev->ev_timeout); 134 } 135 136 event_debug(( 137 "event_add: event %p, timeout in %d seconds %d useconds, call %p", 138 ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback)); 139 140 #ifdef USE_REINSERT_TIMEOUT 141 event_queue_reinsert_timeout(base, ev, was_common, common_timeout, old_timeout_idx); 142 #else 143 event_queue_insert_timeout(base, ev); 144 #endif 145 146 if (common_timeout) { 147 struct common_timeout_list *ctl = 148 get_common_timeout_list(base, &ev->ev_timeout); 149 if (ev == TAILQ_FIRST(&ctl->events)) { 150 common_timeout_schedule(ctl, &now, ev); 151 } 152 } else { 153 struct event* top = NULL; 154 /* See if the earliest timeout is now earlier than it 155 * was before: if so, we will need to tell the main 156 * thread to wake up earlier than it would otherwise. 157 * We double check the timeout of the top element to 158 * handle time distortions due to system suspension. 159 */ 160 if (min_heap_elt_is_top_(ev)) 161 notify = 1; 162 else if ((top = min_heap_top_(&base->timeheap)) != NULL && 163 evutil_timercmp(&top->ev_timeout, &now, <)) 164 notify = 1; 165 } 166 } 167 168 /* if we are not in the right thread, we need to wake up the loop */ 169 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) 170 evthread_notify_base(base); 171 172 event_debug_note_add_(ev); 173 174 return (res); 175 }
(1)
三、event_base_dispatch函數
1、event_base_dispatch函數
1 int 2 event_base_dispatch(struct event_base *event_base) 3 { 4 return (event_base_loop(event_base, 0)); 5 }
(1)可以看到,event_base_dispatch函數間接調用了 event_base_loop函數
2、event_base_loop函數
1 int 2 event_base_loop(struct event_base *base, int flags) 3 { 4 const struct eventop *evsel = base->evsel; 5 struct timeval tv; 6 struct timeval *tv_p; 7 int res, done, retval = 0; 8 9 /* Grab the lock. We will release it inside evsel.dispatch, and again 10 * as we invoke user callbacks. */ 11 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 12 13 if (base->running_loop) { 14 event_warnx("%s: reentrant invocation. Only one event_base_loop" 15 " can run on each event_base at once.", __func__); 16 EVBASE_RELEASE_LOCK(base, th_base_lock); 17 return -1; 18 } 19 20 base->running_loop = 1; 21 22 clear_time_cache(base); 23 24 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) 25 evsig_set_base_(base); 26 27 done = 0; 28 29 #ifndef EVENT__DISABLE_THREAD_SUPPORT 30 base->th_owner_id = EVTHREAD_GET_ID(); 31 #endif 32 33 base->event_gotterm = base->event_break = 0; 34 35 while (!done) { 36 base->event_continue = 0; 37 base->n_deferreds_queued = 0; 38 39 /* Terminate the loop if we have been asked to */ 40 if (base->event_gotterm) { 41 break; 42 } 43 44 if (base->event_break) { 45 break; 46 } 47 48 tv_p = &tv; 49 if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { 50 timeout_next(base, &tv_p); 51 } else { 52 /* 53 * if we have active events, we just poll new events 54 * without waiting. 55 */ 56 evutil_timerclear(&tv); 57 } 58 59 /* If we have no events, we just exit */ 60 if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) && 61 !event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { 62 event_debug(("%s: no events registered.", __func__)); 63 retval = 1; 64 goto done; 65 } 66 67 event_queue_make_later_events_active(base); 68 69 clear_time_cache(base); 70 71 res = evsel->dispatch(base, tv_p); 72 73 if (res == -1) { 74 event_debug(("%s: dispatch returned unsuccessfully.", 75 __func__)); 76 retval = -1; 77 goto done; 78 } 79 80 update_time_cache(base); 81 82 timeout_process(base); 83 84 if (N_ACTIVE_CALLBACKS(base)) { 85 int n = event_process_active(base); 86 if ((flags & EVLOOP_ONCE) 87 && N_ACTIVE_CALLBACKS(base) == 0 88 && n != 0) 89 done = 1; 90 } else if (flags & EVLOOP_NONBLOCK) 91 done = 1; 92 } 93 event_debug(("%s: asked to terminate loop.", __func__)); 94 95 done: 96 clear_time_cache(base); 97 base->running_loop = 0; 98 99 EVBASE_RELEASE_LOCK(base, th_base_lock); 100 101 return (retval); 102 }
(1)event_base_loop函數的主要邏輯是就一個死循環,在循環中不斷的調用由不同多路分發機制提供的后端接口。71行。
(2)調用后端接口返回后,調用event_process_active函數處理激活的事件。
3、event_process_active函數
1 /* 2 * Active events are stored in priority queues. Lower priorities are always 3 * process before higher priorities. Low priority events can starve high 4 * priority ones. 5 */ 6 7 static int 8 event_process_active(struct event_base *base) 9 { 10 /* Caller must hold th_base_lock */ 11 struct evcallback_list *activeq = NULL; 12 int i, c = 0; 13 const struct timeval *endtime; 14 struct timeval tv; 15 const int maxcb = base->max_dispatch_callbacks; 16 const int limit_after_prio = base->limit_callbacks_after_prio; 17 if (base->max_dispatch_time.tv_sec >= 0) { 18 update_time_cache(base); 19 gettime(base, &tv); 20 evutil_timeradd(&base->max_dispatch_time, &tv, &tv); 21 endtime = &tv; 22 } else { 23 endtime = NULL; 24 } 25 26 for (i = 0; i < base->nactivequeues; ++i) { 27 if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { 28 base->event_running_priority = i; 29 activeq = &base->activequeues[i]; 30 if (i < limit_after_prio) 31 c = event_process_active_single_queue(base, activeq, 32 INT_MAX, NULL); 33 else 34 c = event_process_active_single_queue(base, activeq, 35 maxcb, endtime); 36 if (c < 0) { 37 goto done; 38 } else if (c > 0) 39 break; /* Processed a real event; do not 40 * consider lower-priority events */ 41 /* If we get here, all of the events we processed 42 * were internal. Continue. */ 43 } 44 } 45 46 done: 47 base->event_running_priority = -1; 48 49 return c; 50 }
(1)第26-44行,循環遍歷激活的事件,然后調用event_process_active_single_queue函數。
4、event_process_active_single_queue函數
1 /* 2 Helper for event_process_active to process all the events in a single queue, 3 releasing the lock as we go. This function requires that the lock be held 4 when it's invoked. Returns -1 if we get a signal or an event_break that 5 means we should stop processing any active events now. Otherwise returns 6 the number of non-internal event_callbacks that we processed. 7 */ 8 static int 9 event_process_active_single_queue(struct event_base *base, 10 struct evcallback_list *activeq, 11 int max_to_process, const struct timeval *endtime) 12 { 13 struct event_callback *evcb; 14 int count = 0; 15 16 EVUTIL_ASSERT(activeq != NULL); 17 18 for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) { 19 struct event *ev=NULL; 20 if (evcb->evcb_flags & EVLIST_INIT) { 21 ev = event_callback_to_event(evcb); 22 23 if (ev->ev_events & EV_PERSIST || ev->ev_flags & EVLIST_FINALIZING) 24 event_queue_remove_active(base, evcb); 25 else 26 event_del_nolock_(ev, EVENT_DEL_NOBLOCK); 27 event_debug(( 28 "event_process_active: event: %p, %s%s%scall %p", 29 ev, 30 ev->ev_res & EV_READ ? "EV_READ " : " ", 31 ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", 32 ev->ev_res & EV_CLOSED ? "EV_CLOSED " : " ", 33 ev->ev_callback)); 34 } else { 35 event_queue_remove_active(base, evcb); 36 event_debug(("event_process_active: event_callback %p, " 37 "closure %d, call %p", 38 evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback)); 39 } 40 41 if (!(evcb->evcb_flags & EVLIST_INTERNAL)) 42 ++count; 43 44 45 base->current_event = evcb; 46 #ifndef EVENT__DISABLE_THREAD_SUPPORT 47 base->current_event_waiters = 0; 48 #endif 49 50 switch (evcb->evcb_closure) { 51 case EV_CLOSURE_EVENT_SIGNAL: 52 EVUTIL_ASSERT(ev != NULL); 53 event_signal_closure(base, ev); 54 break; 55 case EV_CLOSURE_EVENT_PERSIST: 56 EVUTIL_ASSERT(ev != NULL); 57 event_persist_closure(base, ev); 58 break; 59 case EV_CLOSURE_EVENT: { 60 void (*evcb_callback)(evutil_socket_t, short, void *); 61 EVUTIL_ASSERT(ev != NULL); 62 evcb_callback = *ev->ev_callback; 63 EVBASE_RELEASE_LOCK(base, th_base_lock); 64 evcb_callback(ev->ev_fd, ev->ev_res, ev->ev_arg); 65 } 66 break; 67 case EV_CLOSURE_CB_SELF: { 68 void (*evcb_selfcb)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_selfcb; 69 EVBASE_RELEASE_LOCK(base, th_base_lock); 70 evcb_selfcb(evcb, evcb->evcb_arg); 71 } 72 break; 73 case EV_CLOSURE_EVENT_FINALIZE: 74 case EV_CLOSURE_EVENT_FINALIZE_FREE: { 75 void (*evcb_evfinalize)(struct event *, void *); 76 int evcb_closure = evcb->evcb_closure; 77 EVUTIL_ASSERT(ev != NULL); 78 base->current_event = NULL; 79 evcb_evfinalize = ev->ev_evcallback.evcb_cb_union.evcb_evfinalize; 80 EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING)); 81 EVBASE_RELEASE_LOCK(base, th_base_lock); 82 evcb_evfinalize(ev, ev->ev_arg); 83 event_debug_note_teardown_(ev); 84 if (evcb_closure == EV_CLOSURE_EVENT_FINALIZE_FREE) 85 mm_free(ev); 86 } 87 break; 88 case EV_CLOSURE_CB_FINALIZE: { 89 void (*evcb_cbfinalize)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_cbfinalize; 90 base->current_event = NULL; 91 EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING)); 92 EVBASE_RELEASE_LOCK(base, th_base_lock); 93 evcb_cbfinalize(evcb, evcb->evcb_arg); 94 } 95 break; 96 default: 97 EVUTIL_ASSERT(0); 98 } 99 100 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 101 base->current_event = NULL; 102 #ifndef EVENT__DISABLE_THREAD_SUPPORT 103 if (base->current_event_waiters) { 104 base->current_event_waiters = 0; 105 EVTHREAD_COND_BROADCAST(base->current_event_cond); 106 } 107 #endif 108 109 if (base->event_break) 110 return -1; 111 if (count >= max_to_process) 112 return count; 113 if (count && endtime) { 114 struct timeval now; 115 update_time_cache(base); 116 gettime(base, &now); 117 if (evutil_timercmp(&now, endtime, >=)) 118 return count; 119 } 120 if (base->event_continue) 121 break; 122 } 123 return count; 124 }
(1)