epoll
關於Linux下I/O多路轉接之epoll函數,什么返回值,什么參數,我不想再多的解釋,您不想移駕,我給你移來:
http://blog.csdn.net/colder2008/article/details/5812487 返回值,參數說明等;
最后將一個用epoll設計的網絡服務器貼上代碼,以供借閱:
下面我們從流開始說起,再到select-->poll-->epoll:(原文來自:http://www.tuicool.com/articles/6NrMJn),文中這樣說:
一個流可以是文件,socket,pipe等等可以進行I/O操作的內核對象。
不管是文件,還是套接字,還是管道,我們都可以把他們看作流。
之后我們來討論I/O的操作,通過read,我們可以從流中讀入數據;通過write,我們可以往流寫入數據。現在假定一個情形,我們需要從流中讀數據,但是流中還沒有數據,(典型的例子為,客戶端要從socket讀如數據,但是服務器還沒有把數據傳回來),這時候該怎么辦?
阻塞:阻塞是個什么概念呢?比如某個時候你在等快遞,但是你不知道快遞什么時候過來,而且你沒有別的事可以干(或者說接下來的事要等快遞來了才能做);那么你可以去睡覺了,因為你知道快遞把貨送來時一定會給你打個電話(假定一定能叫醒你)。
非阻塞忙輪詢:接着上面等快遞的例子,如果用忙輪詢的方法,那么你需要知道快遞員的手機號,然后每分鍾給他掛個電話:“你到了沒?”
很明顯一般人不會用第二種做法,不僅顯很無腦,浪費話費不說,還占用了快遞員大量的時間。
大部分程序也不會用第二種做法,因為第一種方法經濟而簡單,經濟是指消耗很少的CPU時間,如果線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。
為了了解阻塞是如何進行的,我們來討論緩沖區,以及內核緩沖區,最終把I/O事件解釋清楚。緩沖區的引入是為了減少頻繁I/O操作而引起頻繁的系統調用(你知道它很慢的),當你操作一個流時,更多的是以緩沖區為單位進行操作,這是相對於用戶空間而言。對於內核來說,也需要緩沖區。
假設有一個管道,進程A為管道的寫入方,B為管道的讀出方。
假設一開始內核緩沖區是空的,B作為讀出方,被阻塞着。然后首先A往管道寫入,這時候內核緩沖區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之為“緩沖區非空”。
但是“緩沖區非空”事件通知B后,B卻還沒有讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩沖區中,如果內核也緩沖區滿了,B仍未開始讀數據,最終內核緩沖區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,我們把這個事件定義為“緩沖區滿”。
假設后來B終於開始讀數據了,於是內核的緩沖區空了出來,這時候內核會告訴A,內核緩沖區有空位了,你可以從長眠中醒來了,繼續寫數據了,我們把這個事件叫做“緩沖區非滿”
也許事件Y1已經通知了A,但是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩沖區空了。這個時候內核就告訴B,你需要阻塞了!,我們把這個時間定為“緩沖區空”。
這四個情形涵蓋了四個I/O事件,緩沖區滿,緩沖區空,緩沖區非空,緩沖區非滿(注都是說的內核緩沖區,且這四個術語都是我生造的,僅為解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(如果不能理解“同步”是什么概念,請學習操作系統的鎖,信號量,條件變量等任務同步方面的相關知識)。
然后我們來說說阻塞I/O的缺點。但是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。如果想要同時處理多個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。
於是再來考慮非阻塞忙輪詢的I/O方式,我們發現我們可以同時處理多個流了(把一個流從阻塞模式切換到非阻塞模式再此不予討論):
while true {
for i in stream[]; {
if i has data
read until unavailable
}
}
我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為如果所有的流都沒有數據,那么只會白白浪費CPU。這里要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略。
為了避免CPU空轉,可以引進了一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可以把“忙”字去掉了)。代碼長這樣:
while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}
於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。
但是使用select,我們有O(n)的無差別輪詢復雜度,同時處理的流越多,沒一次無差別輪詢時間就越長。再次
說了這么多,終於能好好解釋epoll了
epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll之會把哪個流發生了怎樣的I/O事件通知我們。此時我們對這些流的操作都是有意義的。(復雜度降低到了O(1))
在討論epoll的實現細節之前,先把epoll的相關操作列出:
epoll_create 創建一個epoll對象,一般epollfd = epoll_create()
epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增加/刪除某一個流的某一個事件
比如
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注冊緩沖區非空事件,即有數據流入
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注冊緩沖區非滿事件,即流可以被寫入
epoll_wait(epollfd,...)等待直到注冊的事件發生
(注:當對一個非阻塞流的讀寫發生緩沖區滿或緩沖區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩沖區非滿和緩沖區非空事件)。
一個epoll模式的代碼大概的樣子是:
while true {
active_stream[] = epoll_wait(epollfd)
for i in active_stream[] {
read or write till
}
}
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>
void usage(const char* argv)
{
printf("%s:[ip][port]\n",argv);
}
void set_nonblock(int fd)
{
int fl = fcntl(fd,F_GETFL);
fcntl(fd,F_SETFL,fl | O_NONBLOCK);
}
int startup(char* _ip,int _port) //創建一個套接字,綁定,檢測服務器
{
//sock
//1.創建套接字
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("sock");
exit(2);
}
int opt = 1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//2.填充本地 sockaddr_in 結構體(設置本地的IP地址和端口)
struct sockaddr_in local;
local.sin_port=htons(_port);
local.sin_family=AF_INET;
local.sin_addr.s_addr=inet_addr(_ip);
//3.bind()綁定
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(3);
}
//4.listen()監聽 檢測服務器
if(listen(sock,5)<0)
{
perror("listen");
exit(4);
}
return sock; //這樣的套接字返回
}
int main(int argc,char *argv[])
{
if(argc!=3) //檢測參數個數是否正確
{
usage(argv[0]);
exit(1);
}
int listen_sock=startup(argv[1],atoi(argv[2])); //創建一個綁定了本地 ip 和端口號的套接字描述符
//1.創建epoll
int epfd = epoll_create(256); //可處理的最大句柄數256個
if(epfd < 0)
{
perror("epoll_create");
exit(5);
}
struct epoll_event _ev; //epoll結構填充
_ev.events = EPOLLIN; //初始關心事件為讀
_ev.data.fd = listen_sock;
//2.托管
epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&_ev); //將listen sock添加到epfd中,關心讀事件
struct epoll_event revs[64];
int timeout = -1;
int num = 0;
int done = 0;
while(!done)
{
//epoll_wait()相當於在檢測事件
switch((num = epoll_wait(epfd,revs,64,timeout))) //返回需要處理的事件數目 64表示 事件有多大
{
case 0: //返回0 ,表示監聽超時
printf("timeout\n");
break;
case -1: //出錯
perror("epoll_wait");
break;
default: //大於零 即就是返回了需要處理事件的數目
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int i;
for(i=0;i < num;i++)
{
int rsock = revs[i].data.fd; //准確獲取哪個事件的描述符
if(rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立鏈接
{
int new_fd = accept(listen_sock,(struct sockaddr*)&peer,&len);
if(new_fd > 0)
{
printf("get a new client:%s:%d\n",inet_ntoa(peer.sin_addr),ntohs(peer.sin_port));
set_nonblock(new_fd);
_ev.events = EPOLLIN | EPOLLET;
_ev.data.fd = new_fd;
epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&_ev); //二次托管
}
}
else // 接下來對num - 1 個事件處理
{
if(revs[i].events & EPOLLIN)
{
char buf[1024];
ssize_t _s = read(rsock,buf,sizeof(buf)-1);
if(_s > 0)
{
buf[_s] = '\0';
printf("client:%s\n",buf);
_ev.events = EPOLLOUT | EPOLLET;
_ev.data.fd = rsock;
epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,&_ev); //二次托管
}
else if(_s == 0) //client:close
{
printf("client:%d close\n",rsock);
epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);
close(rsock);
}
else
{
perror("read");
}
}
else if(revs[i].events & EPOLLOUT)
{
const char *msg = "HTTP/1.0.200 OK\r\n\r\n<html><h2>李寧愛張寧!</h2></html>\r\n";
write(rsock,msg,strlen(msg));
epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);
close(rsock);
}
else
{}
}
}
}
break;
}
}
return 0;
}
賜教!
