php-fpm epoll封裝


 

參考 http://www.jianshu.com/p/dac223d7d9ad

 

事件對象結構

//fpm_event.h
struct fpm_event_s { int fd;                   /* IO 文件句柄*/
    struct timeval timeout; struct timeval frequency; void (*callback)(struct fpm_event_s *, short, void *); /* 回調函數 */
    void *arg; /* 回調函數的參數 */
    int flags; int index; short which; }; //隊列,多個事件對象的容器
typedef struct fpm_event_queue_s { struct fpm_event_queue_s *prev; struct fpm_event_queue_s *next; struct fpm_event_s *ev; } fpm_event_queue;

 

事件模塊封裝結構

struct fpm_event_module_s { const char *name; int support_edge_trigger; int (*init)(int max_fd); int (*clean)(void); //等待多個事件
    int (*wait)(struct fpm_event_queue_s *queue, unsigned long int timeout); int (*add)(struct fpm_event_s *ev); int (*remove)(struct fpm_event_s *ev); };

 

 

//events/epoll.c //環境變量,在編譯時確定是否納入.
#if HAVE_EPOLL #include <sys/epoll.h> #include <errno.h>

static int fpm_event_epoll_init(int max); static int fpm_event_epoll_clean(); static int fpm_event_epoll_wait(struct fpm_event_queue_s *queue, unsigned long int timeout); static int fpm_event_epoll_add(struct fpm_event_s *ev); static int fpm_event_epoll_remove(struct fpm_event_s *ev); static struct fpm_event_module_s epoll_module = { .name = "epoll", .support_edge_trigger = 1, .init = fpm_event_epoll_init, .clean = fpm_event_epoll_clean, .wait = fpm_event_epoll_wait, .add = fpm_event_epoll_add, .remove = fpm_event_epoll_remove, }; //全局變量
static struct epoll_event *epollfds = NULL; static int nepollfds = 0; static int epollfd = -1; #endif /* HAVE_EPOLL */

//這是使用時,獲取模塊對象的函數 //系統不支持返回NULL,這個函數總是編入二進制文件里.
struct fpm_event_module_s *fpm_event_epoll_module() /* {{{ */ { #if HAVE_EPOLL
    return &epoll_module; #else
    return NULL; #endif /* HAVE_EPOLL */ } /* }}} */

#if HAVE_EPOLL

/* * Init the module */
static int fpm_event_epoll_init(int max) /* {{{ */ { if (max < 1) { return 0; } /* init epoll */ epollfd = epoll_create(max + 1); if (epollfd < 0) { zlog(ZLOG_ERROR, "epoll: unable to initialize"); return -1; } /* allocate fds */ epollfds = malloc(sizeof(struct epoll_event) * max); if (!epollfds) { zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max); return -1; } memset(epollfds, 0, sizeof(struct epoll_event) * max); /* save max */ nepollfds = max; return 0; } /* }}} */

/* * Clean the module */
static int fpm_event_epoll_clean() /* {{{ */ { /* free epollfds */
    if (epollfds) { free(epollfds); epollfds = NULL; } if (epollfd != -1) { close(epollfd); epollfd = -1; } nepollfds = 0; return 0; } /* }}} */

/* * wait for events or timeout */
static int fpm_event_epoll_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */ { int ret, i; /* ensure we have a clean epoolfds before calling epoll_wait() */ memset(epollfds, 0, sizeof(struct epoll_event) * nepollfds); /* wait for inconming event or timeout */ ret = epoll_wait(epollfd, epollfds, nepollfds, timeout); if (ret == -1) { /* trigger error unless signal interrupt */
        if (errno != EINTR) { zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno); return -1; } } /* events have been triggered, let's fire them */
    for (i = 0; i < ret; i++) { /* do we have a valid ev ptr ? */
        if (!epollfds[i].data.ptr) { continue; } /* fire the event */ fpm_event_fire((struct fpm_event_s *)epollfds[i].data.ptr); /* sanity check */
        if (fpm_globals.parent_pid != getpid()) { return -2; } } return ret; } /* }}} */

/* * Add a FD to the fd set */
static int fpm_event_epoll_add(struct fpm_event_s *ev) /* {{{ */ { struct epoll_event e; /* fill epoll struct */ e.events = EPOLLIN; e.data.fd = ev->fd; //data.ptr 設為自定義對象, 事件觸發時,以此獲取自定義對象
    e.data.ptr = (void *)ev; if (ev->flags & FPM_EV_EDGE) { e.events = e.events | EPOLLET; } /* add the event to epoll internal queue */
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ev->fd, &e) == -1) { zlog(ZLOG_ERROR, "epoll: unable to add fd %d", ev->fd); return -1; } /* mark the event as registered */ ev->index = ev->fd; return 0; } /* }}} */

/* * Remove a FD from the fd set */
static int fpm_event_epoll_remove(struct fpm_event_s *ev) /* {{{ */ { struct epoll_event e; /* fill epoll struct the same way we did in fpm_event_epoll_add() */ e.events = EPOLLIN; e.data.fd = ev->fd; e.data.ptr = (void *)ev; if (ev->flags & FPM_EV_EDGE) { e.events = e.events | EPOLLET; } /* remove the event from epoll internal queue */
    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, &e) == -1) { zlog(ZLOG_ERROR, "epoll: unable to remove fd %d", ev->fd); return -1; } /* mark the event as not registered */ ev->index = -1; return 0; } /* }}} */

#endif /* HAVE_EPOLL */

 

 

 

 

 

fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);

 

 

初始化變量 ev

int fpm_event_set(struct fpm_event_s *ev, int fd, int flags, void (*callback)(struct fpm_event_s *, short, void *), void *arg) /* {{{ */
{
    if (!ev || !callback || fd < -1) {
        return -1;
    }
    memset(ev, 0, sizeof(struct fpm_event_s));
    ev->fd = fd;
    ev->callback = callback;
    ev->arg = arg;
    ev->flags = flags;
    return 0;
}

 

 

 

fpm_event_add(&signal_fd_event, 0);

 

 

如果是io事件 則放到epoll_ctl,如果是定時器,則放到定時器隊列里 

int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency) /* {{{ */
{
    struct timeval now;
    struct timeval tmp;

    if (!ev) {
        return -1;
    }

    ev->index = -1;

    /* it's a triggered event on incoming data */
    if (ev->flags & FPM_EV_READ) {
        ev->which = FPM_EV_READ;
        if (fpm_event_queue_add(&fpm_event_queue_fd, ev) != 0) {
            return -1;
        }
        return 0;
    }

    /* it's a timer event */
    ev->which = FPM_EV_TIMEOUT;

    fpm_clock_get(&now);
    if (frequency >= 1000) {
        tmp.tv_sec = frequency / 1000;
        tmp.tv_usec = (frequency % 1000) * 1000;
    } else {
        tmp.tv_sec = 0;
        tmp.tv_usec = frequency * 1000;
    }
    ev->frequency = tmp;
    fpm_event_set_timeout(ev, now);

    if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {
        return -1;
    }

    return 0;
}

 

 

 

如果是io事件,則放到epoll_ctl里

static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev) /* {{{ */
{
    struct fpm_event_queue_s *elt;

    if (!queue || !ev) {
        return -1;
    }

    if (fpm_event_queue_isset(*queue, ev)) {
        return 0;
    }

    if (!(elt = malloc(sizeof(struct fpm_event_queue_s)))) {
        zlog(ZLOG_SYSERROR, "Unable to add the event to queue: malloc() failed");
        return -1;
    }
    elt->prev = NULL;
    elt->next = NULL;
    elt->ev = ev;

    if (*queue) {
        (*queue)->prev = elt;
        elt->next = *queue;
    }
    *queue = elt;

    /* ask the event module to add the fd from its own queue */
    if (*queue == fpm_event_queue_fd && module->add) {
        module->add(ev);
    }

    return 0;    
}

 

 

執行事件

void fpm_event_fire(struct fpm_event_s *ev) /* {{{ */
{
    if (!ev || !ev->callback) {
        return;
    }

    (*ev->callback)( (struct fpm_event_s *) ev, ev->which, ev->arg);    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM