Linux epoll
epoll是Kernel 2.6后新加入的事件機制,在高並發條件下,遠優於select。epoll最大的好處在於它不會隨着監聽fd數目的增長而降低效率。因為在內核中的select實現中,它是采用輪詢來處理的,輪詢的fd數目越多,自然耗時越多。並且,在linux/posix_types.h頭文件有這樣的聲明:
#define __FD_SETSIZE 1024 //select最多同時監聽1024個fd當然,可以通過修改頭文件再重編譯內核來擴大這個數目,但這似乎並不治本。
所以在Nginx中采用了epoll來實現其高並發特性。
工作方式
LT(level triggered):水平觸發,缺省方式,同時支持block和no-block socket,在這種做法中,內核告訴我們一個文件描述符是否被就緒了,如果就緒了,你就可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯的可能性較小。傳統的select\poll都是這種模型的代表。
ET(edge-triggered):邊沿觸發,高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒狀態時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如:你在發送、接受或者接受請求,或者發送接受的數據少於一定量時導致了一個EWOULDBLOCK錯誤)。但是請注意,如果一直不對這個fs做IO操作(從而導致它再次變成未就緒狀態),內核不會發送更多的通知。
區別:LT事件不會丟棄,而是只要讀buffer里面有數據可以讓用戶讀取,則不斷的通知你。而ET則只在事件發生之時通知。
主要的數據結構
epoll_event的結構如下:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;//保存觸發事件的某個文件描述符相關的數據 struct epoll_event { __uint32_t events; /* epoll event */ epoll_data_t data; /* User data variable */ };
events表示感興趣的事件和被觸發的事件,可能的取值為:
EPOLLIN | 對應的文件描述符可以讀 |
EPOLLOUT | 對應的文件描述符可以寫 |
EPOLLPRI | 對應的文件描述符有緊急的數可讀 |
EPOLLERR | 對應的文件描述符發生錯誤 |
EPOLLHUP | 對應的文件描述符被掛斷 |
EPOLLET | 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的 |
EPOLLONESHOT | 只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里 |
操作函數
epoll的接口非常簡單,用三個相關函數來創建epoll句柄、注冊epoll事件以及等待事件的發生。
創建epoll句柄:
int epoll_create(int size); //size表示內核需要監聽的數目
//return : epoll文件描述符
需要注意的是,當創建好epoll句柄后,它就是會占用一個fd值(文件標識符),在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。
epoll事件注冊函數:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) //epfd是epoll_create()的返回值 //op表示動作 /* op可被表示為: EPOLL_CTL_ADD:注冊新的fd到epfd中; EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件; EPOLL_CTL_DEL:從epfd中刪除一個fd; */ //fd是需要監聽的fd //event是內核需要監聽的事件
等待事件發生函數:
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) //epfd是函數返回值 //events是內核監聽事件的集合 //maxevents是epoll_wait可以處理的連接事件的最大限度值 //timeout是超時時間
//返回值:請求數
epoll工作流程
首先,需要調用epoll_create創建epoll,此后我們就可以進行socket/bind/listen,然后調用epoll_ctl進行注冊。接下來,就可以通過一個while(1)循環調用epoll_wait來等待事件的發生,然后循環查看接收到的事件並進行處理。如果事件是sever的socketfd我們就要進行accept,並且把接收到client的socketfd加入到要監聽的事件中。如果在監聽過程中,需要修改操作方式(讀/寫),可以調用epoll_ctl來重新修改。如果監聽到某一個客戶端關閉,那么我就需要再次調用epoll_ctl把它從epoll監聽事件中刪除。
實例
#include <stdio.h> #include <sys/time.h> #include <unistd.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/epoll.h> #include <netinet/in.h> #include <fcntl.h> #include <stdlib.h> #include <errno.h> void setnonblocking(int sockfd) { int opts; opts = fcntl(sockfd, F_GETFL); if(opts < 0){ perror("fcntl1 error!\n"); exit(1); } opts = opts | O_NONBLOCK; if(fcntl(sockfd, F_SETFL, opts) < 0){ perror("fcntl2 error!\n"); exit(1); } } int main() { int fd; int on; int rs; int len; int conn; char buffer[100]; int flag1, flag2; struct sockaddr_in serv_addr, clt_addr; struct timeval timeout; int i; int nfds; int epfd; int newfd; struct epoll_event ev; struct epoll_event events[20]; fd = socket(AF_INET, SOCK_STREAM, 0); on = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); timeout.tv_sec = 5; timeout.tv_usec = 0; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)); bzero(&serv_addr, sizeof(struct sockaddr_in)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(9090); serv_addr.sin_addr.s_addr = INADDR_ANY; rs = bind(fd, (struct sockaddr*)(&serv_addr), sizeof(struct sockaddr)); if(rs < 0){ perror(""); close(fd); return -1; } setnonblocking(fd); epfd = epoll_create(100); ev.data.fd = fd; ev.events = EPOLLIN|EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); rs = listen(fd, 5); if(rs < 0){ perror(""); close(fd); return -1; } len = sizeof(struct sockaddr); for(;;){ nfds = epoll_wait(epfd, events, 20, 500); for(i = 0; i < nfds; ++i){ if(events[i].data.fd == fd){ conn = accept(fd, (struct sockaddr*)(&clt_addr), (unsigned int*)(&len)); setnonblocking(conn); ev.data.fd = conn; ev.events = EPOLLIN|EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, conn, &ev); } else if(events[i].events & EPOLLIN){ if((newfd = events[i].data.fd) < 0) continue; bzero(buffer, sizeof(buffer)); flag1 = recv(newfd, buffer, 100, 0); printf("recv: %s\n", buffer); printf("recv return: %d\n", flag1); ev.data.fd = newfd; ev.events = EPOLLOUT|EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, newfd, &ev); } else if(events[i].events & EPOLLOUT){ if((newfd = events[i].data.fd) < 0) continue; bzero(buffer, sizeof(buffer)); strcpy(buffer, "ACK"); flag2 = send(newfd, buffer, sizeof(buffer), 0); printf("recv return: %d\n\n", flag2); ev.data.fd = newfd; ev.events = EPOLLIN|EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, newfd, &ev); } } } close(fd); return 0; }
參考
http://www.cnblogs.com/venow/archive/2012/11/30/2790031.html
http://hi.baidu.com/jingweiyoung/item/ae9fc81714be67dbbf9042b9
http://www.linuxidc.com/Linux/2011-04/35156p3.htm
http://www.cppblog.com/converse/archive/2008/04/29/48482.html