網絡編程:epoll


原理

select 的幾個缺點:
1)每次調用select,都需要把fd集合從用戶空間拷貝到內核空間,這個開銷在fd很多時會很大
2)每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也會很大
3)select支持的文件描述符數量太小了,默認是1024

在調用接口上,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函數:epoll_create、epoll_ctl和epoll_wait。epoll_create是創建一個epoll句柄,epoll_ctl是注冊要監聽的事件類型,epoll_wait是等待事件的產生。
對於第一個缺點,epoll的解決方案在epoll_ctl函數中。 每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝一次。
對於第二個缺點,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍,並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者,就會調用這個回調函數,而這個 回調函數會把就緒的fd加入一個就緒鏈表。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd(就緒鏈表是否為空)。
對於第三個缺點,epoll沒有這個限制,它所 支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,具體可以cat /proc/sys/fs/file-max查看,在1GB內存的機器上大約是10萬左右。

epoll的回調機制:

/* 
 * This is the callback that is used to add our wait queue to the 
 * target file wakeup lists. 
 */  
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,  
                 poll_table *pt)  
{  
    struct epitem *epi = ep_item_from_epqueue(pt);  
    struct eppoll_entry *pwq;  
  
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {  
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);  
        pwq->whead = whead;  
        pwq->base = epi;  
        add_wait_queue(whead, &pwq->wait);  
        list_add_tail(&pwq->llink, &epi->pwqlist);  
        epi->nwait++;  
    } else {  
        /* We have to signal that an error occurred */  
        epi->nwait = -1;  
    }  
}  

其中init_waitqueue_func_entry的實現如下:

static inline void init_waitqueue_func_entry(wait_queue_t *q,  
                    wait_queue_func_t func)  
{  
    q->flags = 0;  
    q->private = NULL;  
    q->func = func;  
}  

可以看到,總體上和select實現是類似的,只不過它是創建了一個epoll_entry結構pwq,pwq->wait的func成員被設置成了回調函數ep_poll_callback(而不是default_wake_function,所以這里並不會有喚醒操作而只是執行回調函數),private成員被設置成了NULL。最后把pwq->wait鏈入到whead中(也就是設備等待隊列中)。這樣,當設備等待隊列中的進程被喚醒時,就會調用ep_poll_callback了。

epoll的流程:
當epoll_wait時,它會判斷就緒鏈表中有沒有就緒的fd,如果沒有,則把current進程加入到一個等待隊列(file->private_data->wq)中,並在一個while(1)循環中判斷就緒隊列是否為空,並結合schedule_timeout實現睡一會。如果current進程在睡眠中,設備就緒了,就會調用回調函數。在回調函數中,會把就緒的fd放到就緒鏈表,並喚醒等待隊列(file->private_data->wq)中的current進程,這樣epoll_wait又能繼續執行下去了。

API

epoll 不僅提供了默認的 level-triggered(條件觸發)機制,還提供了性能更為強勁的 edge-triggered(邊緣觸發)機制
使用 epoll 進行網絡程序的編寫,需要三個步驟,分別是 epoll_createepoll_ctl epoll_wait

  • epoll_create:用於創建一個epoll實例
int epoll_create(int size);
int epoll_create1(int flags); 
返回值: 若成功返回一個大於0的值,表示epoll實例;若返回-1表示出錯

size參數:用來告知內核期望監控的文件描述字大小,然后內核使用這部分的信息來初始化內核數據結構。現在,對size設置為一個大於0的整數就 可以
flags參數:輸入flags為0,則和epoll_create一樣,內核自動忽略

  • epoll_ctl :往這個epoll實例中添加刪除監控的事件
 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
        返回值: 若成功返回0;若返回-1表示出錯

第一個參數epfd:調用epoll_create創建的epoll實例描述字,可簡單理解為epoll的句柄
第二個參數op:表示增加/刪除一個監控事件,有三個選項可供選擇:

  • EPOLL_CTL_ADD: 向 epoll 實例注冊文件描述符對應的事件;
  • EPOLL_CTL_DEL:向 epoll 實例刪除文件描述符對應的事件;
  • EPOLL_CTL_MOD: 修改文件描述符對應的事件。
    第三個參數fd:注冊的事件的文字描述符,比如一個監聽套接字
    第四個參數event:表示注冊的事件類型,並且可以在這個結構體里設置用戶需要的數據,其中最為常見的是使用聯合結構里的fd字段,表示事件所對應的文件描述符
typedef union epoll_data {
     void        *ptr;
     int          fd;
     uint32_t     u32;
     uint64_t     u64;
 } epoll_data_t;

 struct epoll_event {
     uint32_t     events;      /* Epoll events */
     epoll_data_t data;        /* User data variable */
 };

重點看一下這幾種事件類型:

  • EPOLLIN:表示對應的文件描述字可以讀;
  • EPOLLOUT:表示對應的文件描述字可以寫;
  • EPOLLRDHUP:表示套接字的一端已經關閉,或者半關閉;
  • EPOLLHUP:表示對應的文件描述字被掛起;
  • EPOLLET:設置為 edge-triggered,默認為 level-triggered。
  • epoll_wait:調用者進程被掛起,在等待內核I/O事件的分發
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  返回值: 成功返回的是一個大於0的數,表示事件的個數;返回0表示的是超時時間到;若出錯返回-1.
  • 第一個參數epfd是 epoll 實例描述字,也就是 epoll 句柄。
  • 第二個參數events返回給用戶空間需要處理的 I/O 事件,這是一個數組,數組的大小由 epoll_wait 的返回值決定,這個數組的每個元素都是一個需要待處理的 I/O 事件,其中 events 表示具體的事件類型,事件類型取值和 epoll_ctl 可設置的值一樣,這個 epoll_event 結構體里的 data 值就是在 epoll_ctl 那里設置的 data,也就是用戶空間和內核空間調用時需要的數據。
  • 第三個參數maxevents是一個大於 0 的整數,表示 epoll_wait 可以返回的最大事件值。
  • 第四個參數timeout是 epoll_wait 阻塞調用的超時值,如果這個值設置為 -1,表示不超時;如果設置為 0 則立即返回,即使沒有任何 I/O 事件發生

實踐

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>

#define SERV_PORT 43211
#define LISTENQ 1024
#define MAXEVENTS 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}


int tcp_nonblocking_server_listen(int port)
{
    int listen_fd;
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    make_nonblocking(listen_fd);

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(port);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    int rt1 = bind(listen_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if(rt1 < 0)
    {
        perror("bind error\n");
        return -1;
    }

    int rt2 = listen(listen_fd, LISTENQ);
    if(rt2 < 0)
    {
        perror("listen failed");
        return -1;
    }

    signal(SIGPIPE, SIG_IGN);
    return listen_fd;

}


int main(int argc, char *argv[])
{
    int listen_fd, socket_fd;
    int n, i;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;


    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

    //為epoll創建實例
    efd = epoll_create1(0);
    if(efd == -1)
    {
        perror("epoll create failed");
        return -1;
    }

    event.data.fd = listen_fd;
    event.events = EPOLLIN | EPOLLET;
    // 調用epoll_ctl將監聽字對應的I/O事件進行注冊,有新的連接建立,就可以感知,采用edge-triggered邊緣觸發
    if(epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1)
    {
        perror("epoll_ctl add listen fd failed");
        return -1;
    }

    // Buffer where events are returned
    events = calloc(MAXEVENTS, sizeof(event));

    while(1)
    {
        // 調用epoll_wait函數分發I/O事件,當epoll_wait成功返回后,通過遍歷返回的event數組,就可知道發生的I/O事件
        n = epoll_wait(efd, events, MAXEVENTS, -1);
        printf("epoll_waite wakeup\n");
        for(i = 0; i < n; i++)
        {
            if((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!events[i].events & EPOLLIN))
                {
                    fprintf(stderr, "epoll error\n");
                    close(events[i].data.fd);
                    continue;
                }
                else if(listen_fd == events[i].data.fd)
                {
                    struct sockaddr_storage ss;
                    socklen_t slen = sizeof(ss);
                    int fd = accept(listen_fd, (struct sockaddr *)&ss, &slen);
                    if(fd < 0)
                    {
                        perror("accept failed");
                        return -1;
                    }
                     else
                    {
                        // accept建立連接,並將該連接設置為非阻塞,在調用epoll_ctl把已連接套接字對應的可讀事件
                        // 注冊到epoll實例中,這里使用了event_data里面的fd字段,將連接套接字存儲器中
                        make_nonblocking(fd);
                        event.data.fd = fd;
                        event.events = EPOLLIN | EPOLLET;// edge-triggered
                        if(epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1)
                        {
                            perror("epoll_ctl add connection fd failed");
                            return -1;
                        }
                    }
                    continue;
                }
                else
                {
                    socket_fd = events[i].data.fd;
                    printf("get event on socket fd == %d\n",socket_fd);
                    while(1)
                    {
                        char buf[512];
                        if((n = read(socket_fd, buf, sizeof(buf))) < 0)
                        {
                            if(errno != EAGAIN)
                            {
                                perror("read error");
                                close(socket_fd);
                            }
                            break;
                        }
                        else if(n == 0)
                        {
                            close(socket_fd);
                            break;
                        }
                        else 
                        {
                            for(i = 0;i < n; ++i)
                            {
                                buf[i] = rot13_char(buf[i]);
                            }
                            if(write(socket_fd, buf, n) < 0)
                            {
                                perror("write error");
                                return -1;
                            }
                        }
                    }
                }
               
        }
    }
    free(events);
    close(listen_fd);
    return 0;
}

運行結果

邊緣觸發vs水平觸發

條件觸發的意思是只要滿足事件的條件,比如有數據需要讀,就一直不斷地把這個事件傳遞給用戶;而邊緣觸發的意思是只有第一次滿足條件的時候才觸發,之后就不會再傳遞同樣的事件了。
一般認為,邊緣觸發的效率比條件觸發的效率要高
邊緣觸發:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>

#define SERV_PORT 43211
#define LISTENQ 1024
#define MAXEVENTS 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}


int tcp_nonblocking_server_listen(int port)
{
    int listen_fd;
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    make_nonblocking(listen_fd);

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(port);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    int rt1 = bind(listen_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if(rt1 < 0)
    {
        perror("bind error\n");
        return -1;
    }

    int rt2 = listen(listen_fd, LISTENQ);
    if(rt2 < 0)
    {
        perror("listen failed");
        return -1;
    }

    signal(SIGPIPE, SIG_IGN);
    return listen_fd;

}


int main(int argc, char *argv[])
{
    int listen_fd, socket_fd;
    int n, i;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;


    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

    efd = epoll_create1(0);
    if(efd == -1)
    {
        perror("epoll create failed");
        return -1;
    }

    event.data.fd = listen_fd;
    event.events = EPOLLIN | EPOLLET;
    if(epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1)
    {
        perror("epoll_ctl add listen fd failed");
        return -1;
    }

    // Buffer where events are returned
    events = calloc(MAXEVENTS, sizeof(event));

    while(1)
    {
        n = epoll_wait(efd, events, MAXEVENTS, -1);
        printf("epoll_waite wakeup\n");
        for(i = 0; i < n; i++)
        {
            if((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!events[i].events & EPOLLIN))
                {
                    fprintf(stderr, "epoll error\n");
                    close(events[i].data.fd);
                    continue;
                }
                else if(listen_fd == events[i].data.fd)
                {
                    struct sockaddr_storage ss;
                    socklen_t slen = sizeof(ss);
                    int fd = accept(listen_fd, (struct sockaddr *)&ss, &slen);
                    if(fd < 0)
                    {
                        perror("accept failed");
                        return -1;
                    }
                     else
                    {
                        make_nonblocking(fd);
                        event.data.fd = fd;
                        event.events = EPOLLIN | EPOLLET;// edge-triggered
                        if(epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1)
                        {
                            perror("epoll_ctl add connection fd failed");
                            return -1;
                        }
                    }
                    continue;
                }
                else
                {
                    socket_fd = events[i].data.fd;
                    printf("get event on socket fd == %d\n",socket_fd);
                }
               
        }
    }
    free(events);
    close(listen_fd);
    return 0;
}

執行效果:

可發現,邊緣觸發情況下,開啟這個服務器程序,用 telnet 連接上,輸入一些字符,我們看到,服務器端只從 epoll_wait 中蘇醒過一次,就是第一次有數據可讀的時候。
水平觸發:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>

#define SERV_PORT 43211
#define LISTENQ 1024
#define MAXEVENTS 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}


int tcp_nonblocking_server_listen(int port)
{
    int listen_fd;
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    make_nonblocking(listen_fd);

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(port);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    int rt1 = bind(listen_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if(rt1 < 0)
    {
        perror("bind error\n");
        return -1;
    }

    int rt2 = listen(listen_fd, LISTENQ);
    if(rt2 < 0)
    {
        perror("listen failed");
        return -1;
    }

    signal(SIGPIPE, SIG_IGN);
    return listen_fd;

}


int main(int argc, char *argv[])
{
    int listen_fd, socket_fd;
    int n, i;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;


    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

    efd = epoll_create1(0);
    if(efd == -1)
    {
        perror("epoll create failed");
        return -1;
    }

    event.data.fd = listen_fd;
    event.events = EPOLLIN | EPOLLET;
    if(epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1)
    {
        perror("epoll_ctl add listen fd failed");
        return -1;
    }

    // Buffer where events are returned
    events = calloc(MAXEVENTS, sizeof(event));

    while(1)
    {
        n = epoll_wait(efd, events, MAXEVENTS, -1);
        printf("epoll_waite wakeup\n");
        for(i = 0; i < n; i++)
        {
            if((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!events[i].events & EPOLLIN))
                {
                    fprintf(stderr, "epoll error\n");
                    close(events[i].data.fd);
                    continue;
                }
                else if(listen_fd == events[i].data.fd)
                {
                    struct sockaddr_storage ss;
                    socklen_t slen = sizeof(ss);
                    int fd = accept(listen_fd, (struct sockaddr *)&ss, &slen);
                    if(fd < 0)
                    {
                        perror("accept failed");
                        return -1;
                    }
                     else
                    {
                        make_nonblocking(fd);
                        event.data.fd = fd;
                        event.events = EPOLLIN;// level-triggered
                        if(epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1)
                        {
                            perror("epoll_ctl add connection fd failed");
                            return -1;
                        }
                    }
                    continue;
                }
                else
                {
                    socket_fd = events[i].data.fd;
                    printf("get event on socket fd == %d\n",socket_fd);
                   
                }
               
        }
    }
    free(events);
    close(listen_fd);
    return 0;
}

效果:

然后按照同樣的步驟來一次,觀察服務器端,可看到,服務器端不斷地從 epoll_wait 中蘇醒,告訴我們有數據需要讀取。

小結

epoll 通過改進的接口設計,避免了用戶態 - 內核態頻繁的數據拷貝,大大提高了系統性能。在使用 epoll 的時候,我們一定要理解條件觸發和邊緣觸發兩種模式。條件觸發的意思是只要滿足事件的條件,比如有數據需要讀,就一直不斷地把這個事件傳遞給用戶;而邊緣觸發的意思是只有第一次滿足條件的時候才觸發,之后就不會再傳遞同樣的事件了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM