参考 http://www.jianshu.com/p/dac223d7d9ad
事件对象结构
//fpm_event.h
struct fpm_event_s { int fd; /* IO 文件句柄*/
struct timeval timeout; struct timeval frequency; void (*callback)(struct fpm_event_s *, short, void *); /* 回调函数 */
void *arg; /* 回调函数的参数 */
int flags; int index; short which; }; //队列,多个事件对象的容器
typedef struct fpm_event_queue_s { struct fpm_event_queue_s *prev; struct fpm_event_queue_s *next; struct fpm_event_s *ev; } fpm_event_queue;
事件模块封装结构
struct fpm_event_module_s { const char *name; int support_edge_trigger; int (*init)(int max_fd); int (*clean)(void); //等待多个事件
int (*wait)(struct fpm_event_queue_s *queue, unsigned long int timeout); int (*add)(struct fpm_event_s *ev); int (*remove)(struct fpm_event_s *ev); };
//events/epoll.c //环境变量,在编译时确定是否纳入.
#if HAVE_EPOLL #include <sys/epoll.h> #include <errno.h>
static int fpm_event_epoll_init(int max); static int fpm_event_epoll_clean(); static int fpm_event_epoll_wait(struct fpm_event_queue_s *queue, unsigned long int timeout); static int fpm_event_epoll_add(struct fpm_event_s *ev); static int fpm_event_epoll_remove(struct fpm_event_s *ev); static struct fpm_event_module_s epoll_module = { .name = "epoll", .support_edge_trigger = 1, .init = fpm_event_epoll_init, .clean = fpm_event_epoll_clean, .wait = fpm_event_epoll_wait, .add = fpm_event_epoll_add, .remove = fpm_event_epoll_remove, }; //全局变量
static struct epoll_event *epollfds = NULL; static int nepollfds = 0; static int epollfd = -1; #endif /* HAVE_EPOLL */
//这是使用时,获取模块对象的函数 //系统不支持返回NULL,这个函数总是编入二进制文件里.
struct fpm_event_module_s *fpm_event_epoll_module() /* {{{ */ { #if HAVE_EPOLL
return &epoll_module; #else
return NULL; #endif /* HAVE_EPOLL */ } /* }}} */
#if HAVE_EPOLL
/* * Init the module */
static int fpm_event_epoll_init(int max) /* {{{ */ { if (max < 1) { return 0; } /* init epoll */ epollfd = epoll_create(max + 1); if (epollfd < 0) { zlog(ZLOG_ERROR, "epoll: unable to initialize"); return -1; } /* allocate fds */ epollfds = malloc(sizeof(struct epoll_event) * max); if (!epollfds) { zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max); return -1; } memset(epollfds, 0, sizeof(struct epoll_event) * max); /* save max */ nepollfds = max; return 0; } /* }}} */
/* * Clean the module */
static int fpm_event_epoll_clean() /* {{{ */ { /* free epollfds */
if (epollfds) { free(epollfds); epollfds = NULL; } if (epollfd != -1) { close(epollfd); epollfd = -1; } nepollfds = 0; return 0; } /* }}} */
/* * wait for events or timeout */
static int fpm_event_epoll_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */ { int ret, i; /* ensure we have a clean epoolfds before calling epoll_wait() */ memset(epollfds, 0, sizeof(struct epoll_event) * nepollfds); /* wait for inconming event or timeout */ ret = epoll_wait(epollfd, epollfds, nepollfds, timeout); if (ret == -1) { /* trigger error unless signal interrupt */
if (errno != EINTR) { zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno); return -1; } } /* events have been triggered, let's fire them */
for (i = 0; i < ret; i++) { /* do we have a valid ev ptr ? */
if (!epollfds[i].data.ptr) { continue; } /* fire the event */ fpm_event_fire((struct fpm_event_s *)epollfds[i].data.ptr); /* sanity check */
if (fpm_globals.parent_pid != getpid()) { return -2; } } return ret; } /* }}} */
/* * Add a FD to the fd set */
static int fpm_event_epoll_add(struct fpm_event_s *ev) /* {{{ */ { struct epoll_event e; /* fill epoll struct */ e.events = EPOLLIN; e.data.fd = ev->fd; //data.ptr 设为自定义对象, 事件触发时,以此获取自定义对象
e.data.ptr = (void *)ev; if (ev->flags & FPM_EV_EDGE) { e.events = e.events | EPOLLET; } /* add the event to epoll internal queue */
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ev->fd, &e) == -1) { zlog(ZLOG_ERROR, "epoll: unable to add fd %d", ev->fd); return -1; } /* mark the event as registered */ ev->index = ev->fd; return 0; } /* }}} */
/* * Remove a FD from the fd set */
static int fpm_event_epoll_remove(struct fpm_event_s *ev) /* {{{ */ { struct epoll_event e; /* fill epoll struct the same way we did in fpm_event_epoll_add() */ e.events = EPOLLIN; e.data.fd = ev->fd; e.data.ptr = (void *)ev; if (ev->flags & FPM_EV_EDGE) { e.events = e.events | EPOLLET; } /* remove the event from epoll internal queue */
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, &e) == -1) { zlog(ZLOG_ERROR, "epoll: unable to remove fd %d", ev->fd); return -1; } /* mark the event as not registered */ ev->index = -1; return 0; } /* }}} */
#endif /* HAVE_EPOLL */
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
初始化变量 ev
int fpm_event_set(struct fpm_event_s *ev, int fd, int flags, void (*callback)(struct fpm_event_s *, short, void *), void *arg) /* {{{ */ { if (!ev || !callback || fd < -1) { return -1; } memset(ev, 0, sizeof(struct fpm_event_s)); ev->fd = fd; ev->callback = callback; ev->arg = arg; ev->flags = flags; return 0; }
fpm_event_add(&signal_fd_event, 0);
如果是io事件 则放到epoll_ctl,如果是定时器,则放到定时器队列里
int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency) /* {{{ */ { struct timeval now; struct timeval tmp; if (!ev) { return -1; } ev->index = -1; /* it's a triggered event on incoming data */ if (ev->flags & FPM_EV_READ) { ev->which = FPM_EV_READ; if (fpm_event_queue_add(&fpm_event_queue_fd, ev) != 0) { return -1; } return 0; } /* it's a timer event */ ev->which = FPM_EV_TIMEOUT; fpm_clock_get(&now); if (frequency >= 1000) { tmp.tv_sec = frequency / 1000; tmp.tv_usec = (frequency % 1000) * 1000; } else { tmp.tv_sec = 0; tmp.tv_usec = frequency * 1000; } ev->frequency = tmp; fpm_event_set_timeout(ev, now); if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) { return -1; } return 0; }
如果是io事件,则放到epoll_ctl里
static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev) /* {{{ */ { struct fpm_event_queue_s *elt; if (!queue || !ev) { return -1; } if (fpm_event_queue_isset(*queue, ev)) { return 0; } if (!(elt = malloc(sizeof(struct fpm_event_queue_s)))) { zlog(ZLOG_SYSERROR, "Unable to add the event to queue: malloc() failed"); return -1; } elt->prev = NULL; elt->next = NULL; elt->ev = ev; if (*queue) { (*queue)->prev = elt; elt->next = *queue; } *queue = elt; /* ask the event module to add the fd from its own queue */ if (*queue == fpm_event_queue_fd && module->add) { module->add(ev); } return 0; }
执行事件
void fpm_event_fire(struct fpm_event_s *ev) /* {{{ */ { if (!ev || !ev->callback) { return; } (*ev->callback)( (struct fpm_event_s *) ev, ev->which, ev->arg); }