Libevent源碼分析—event_base_dispatch()


我們知道libevent是一個Reactor模式的事件驅動的網絡庫。
 
到目前為止,我們已經看了核心的event和event_base結構體的源碼,看了初始化這兩個結構體的源碼,看了注冊event的源碼,也將event注冊到I/O多路復用監聽的事件上了。現在准備工作都做好了,下面就是看運行時的主循環了,在這個主循環中,是如何檢測事件、分發事件、調用事件的回調函數的。這一步就是libevent的核心框架流程了。
 
Reactor模式中的Event、Event Handler、Reactor目前都完成了,下面就剩Event Demultiplexer了。
這一步通過event_base_dispatch()完成
int
event_base_dispatch(struct event_base *event_base)
{
  return (event_base_loop(event_base, 0));  //調用event_base_loop()
}

可以看到,該函數只是做了調用event_base_loop()這一個動作,所以工作實際是在函數event_base_loop()內完成的。

event_base_loop()

該函數完成以下工作:
1.信號標記被設置,則調用信號的回調函數
2.根據定時器最小時間,設置I/O多路復用的最大等待時間,這樣即使沒有I/O事件發生,也能在最小定時器超時時返回。
3.調用I/O多路復用,監聽事件,將活躍事件添加到活躍事件鏈表中
4. 檢查定時事件,將就緒的定時事件從小根堆中刪除,插入到活躍事件鏈表中
5.對event_base的活躍事件鏈表中的事件,調用event_process_active()函數,在該函數內調用event的回調函數,優先級高的event先處理。
 
該函數內部調用了eventop.dispatch()監聽事件,event_sigcb函數指針處理信號事件,timeout_process()將超時的定時事件加入到活躍事件鏈表中,event_process_active()處理活躍事件鏈表中的事件,調用相應的回調函數。
int
event_base_loop(struct event_base *base, int flags)
{
    const struct eventop *evsel = base->evsel;
    void *evbase = base->evbase;  //event_base的I/O多路復用
    struct timeval tv;
    struct timeval *tv_p;
    int res, done;
    /* clear time cache */
    //清空時間緩存
    base->tv_cache.tv_sec = 0;
    //處理Signal事件時,指定信號所屬的event_base
    if (base->sig.ev_signal_added)
        evsignal_base = base;
    done = 0;
    while (!done) {  //進入事件主循環
        /* Terminate the loop if we have been asked to */
        //設置event_base的標記,以表明是否需要跳出循環
        if (base->event_gotterm) {  //event_loopexit_cb()可設置
            base->event_gotterm = 0;
            break;
        }
        if (base->event_break) {  //event_base_loopbreak()可設置
            base->event_break = 0;
            break;
        }
        /* You cannot use this interface for multi-threaded apps */
        //當event_gotsig被設置時,則event_sigcb就是信號處理的回調函數
        while (event_gotsig) {
            event_gotsig = 0;
            if (event_sigcb) {
                res = (*event_sigcb)();  //調用信號處理的回調函數
                if (res == -1) {
                    errno = EINTR;
                    return (-1);
                }
            }
        }
        timeout_correct(base, &tv);  //校准時間
        tv_p = &tv;
        //根據定時器堆中最小超時時間計算I/O多路復用的最大等待時間tv_p
        if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
            timeout_next(base, &tv_p);
        } else {
            /* 
             * if we have active events, we just poll new events
             * without waiting.
             */
            evutil_timerclear(&tv);
        }
        
        /* If we have no events, we just exit */
        //沒有注冊事件,則退出
        if (!event_haveevents(base)) {
            event_debug(("%s: no events registered.", __func__));
            return (1);
        }
        /* update last old time */
        gettime(base, &base->event_tv);
        /* clear time cache */
        base->tv_cache.tv_sec = 0;
        //調用I/O多路復用,監聽事件
        res = evsel->dispatch(base, evbase, tv_p);
        if (res == -1)
            return (-1);
        //將time cache賦值為當前系統時間
        gettime(base, &base->tv_cache);
        
        //檢查定時事件,將就緒的定時事件從小根堆中刪除,插入到活躍事件鏈表中
        timeout_process(base);
        if (base->event_count_active) {
            //處理event_base的活躍鏈表中的事件
            //調用event的回調函數,優先級高的event先處理
            event_process_active(base);  
            if (!base->event_count_active && (flags & EVLOOP_ONCE))
                done = 1;
        } else if (flags & EVLOOP_NONBLOCK)
            done = 1;
    }
    /* clear time cache */
    //循環結束,清空時間緩存
    base->tv_cache.tv_sec = 0;
    event_debug(("%s: asked to terminate loop.", __func__));
    return (0);
}

epoll_dispatch()

在上面我們看到,event_base_loop()中通過I/O多路復用的dispatch()函數完成監聽事件功能。在之前的event_init()中我們看到,通過遍歷eventops數組,從中選擇一個I/O多路復用機制,所以不同的I/O多路復用機制有不同的eventop結構體,相應的也就有不同的dispatch()函數。下面,再次看下eventop結構體(event-internal.h)
struct eventop {
        const char *name;
        void *(*init)(struct event_base *);  //初始化
        int (*add)(void *, struct event *);  //注冊事件
        int (*del)(void *, struct event *);  //刪除事件
        int (*dispatch)(struct event_base *, void *, struct timeval *);  //事件分發
        void (*dealloc)(struct event_base *, void *);  //注銷,釋放資源
        /* set if we need to reinitialize the event base */
        int need_reinit;
};
在event_add()中通過add()成員函數注冊event到監聽事件中,現在在event_base_loop()中通過dispatch()成員函數監聽事件。
libevent支持多種I/O多路復用機制,下面先看下epoll的eventop結構體(epoll.c)
const struct eventop epollops = {
    "epoll",
    epoll_init,
    epoll_add,
    epoll_del,
    epoll_dispatch,
    epoll_dealloc,
    1 /* need reinit */
};
然后看下epoll的dispatch()函數(epoll.c)
從下面源碼可見,epoll_dispatch()的工作主要有:
1.調用epoll_wait()監聽事件
2.如果有信號發生,調用evsignal_process()處理信號
3.將活躍的event根據其活躍的類型注冊到活躍事件鏈表上
4.如果events數組大小不夠,則重新分配為原來2倍大小
static int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
    struct epollop *epollop = arg;
    struct epoll_event *events = epollop->events;
    struct evepoll *evep;
    int i, res, timeout = -1;
    if (tv != NULL)
        timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;  //轉換為微米
    if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {  //設置最大超時時間
        /* Linux kernels can wait forever if the timeout is too big;
         * see comment on MAX_EPOLL_TIMEOUT_MSEC. */
        timeout = MAX_EPOLL_TIMEOUT_MSEC;
    }
    res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);  //監聽事件發生
    if (res == -1) {
        if (errno != EINTR) {
            event_warn("epoll_wait");
            return (-1);
        }
        evsignal_process(base);  //由於Signal事件發生中斷,處理Signal事件
        return (0);
    } else if (base->sig.evsignal_caught) {
        evsignal_process(base);  //有Signal事件發生,處理Signal事件
    }
    event_debug(("%s: epoll_wait reports %d", __func__, res));
    for (i = 0; i < res; i++) {  //處理活躍事件
        int what = events[i].events;  //活躍類型
        struct event *evread = NULL, *evwrite = NULL;
        int fd = events[i].data.fd;  //event的文件描述符
        if (fd < 0 || fd >= epollop->nfds)
            continue;
        evep = &epollop->fds[fd];
        if (what & (EPOLLHUP|EPOLLERR)) {  //判斷epoll的events類型,並找到注冊的event
            evread = evep->evread;
            evwrite = evep->evwrite;
        } else {
            if (what & EPOLLIN) {
                evread = evep->evread;
            }
            if (what & EPOLLOUT) {
                evwrite = evep->evwrite;
            }
        }
        if (!(evread||evwrite))
            continue;
        
        //添加event到活躍事件鏈表中
        if (evread != NULL)
            event_active(evread, EV_READ, 1);
        if (evwrite != NULL)
            event_active(evwrite, EV_WRITE, 1);
    }
    //如果注冊的事件全部變為活躍,則增大events數組為原來兩倍
    if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) {
        /* We used all of the event space this time.  We should
           be ready for more events next time. */
        int new_nevents = epollop->nevents * 2;
        struct epoll_event *new_events;
        new_events = realloc(epollop->events,
            new_nevents * sizeof(struct epoll_event));
        if (new_events) {
            epollop->events = new_events;
            epollop->nevents = new_nevents;
        }
    }
    return (0);
}

event_process_active()

好了,現在活躍的I/O事件、定時器事件已經全部添加到活躍事件鏈表中了。下面就開始調用這些event的回調函數進行處理了,這步是在event_base_loop()中調用event_process_active()來完成的。
該函數從event_base的activequeueus鏈表數組上取出一個鏈表;對該鏈表上的event調用回調函數;優先調用優先級值最小的event
/*
 * Active events are stored in priority queues.  Lower priorities are always
 * process before higher priorities.  Low priority events can starve high
 * priority ones.
 */
static void
event_process_active(struct event_base *base)
{
    struct event *ev;
    struct event_list *activeq = NULL;
    int i;
    short ncalls;
    for (i = 0; i < base->nactivequeues; ++i) {  //取出第一個活躍鏈表
        if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
            activeq = base->activequeues[i];
            break;
        }
    }
    assert(activeq != NULL);

    //優先處理優先級值最小的event
    for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
        if (ev->ev_events & EV_PERSIST)
            event_queue_remove(base, ev, EVLIST_ACTIVE);  //持久事件,則從活躍鏈表移除
        else
            event_del(ev);  //不是持久事件,則直接刪除該事件
        
        /* Allows deletes to work */
        ncalls = ev->ev_ncalls;
        ev->ev_pncalls = &ncalls;
        while (ncalls) {
            ncalls--;
            ev->ev_ncalls = ncalls;
            //調用該event的回調函數,event.ev_res保存返回值
            (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);  
            if (event_gotsig || base->event_break) {
                  ev->ev_pncalls = NULL;
                return;
            }
        }
        ev->ev_pncalls = NULL;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM