Linux下I/O多路轉接之epoll(絕對經典)


epoll

關於Linux下I/O多路轉接之epoll函數,什么返回值,什么參數,我不想再多的解釋,您不想移駕,我給你移來:

http://blog.csdn.net/colder2008/article/details/5812487      返回值,參數說明等;

最后將一個用epoll設計的網絡服務器貼上代碼,以供借閱:

下面我們從流開始說起,再到select-->poll-->epoll:(原文來自:http://www.tuicool.com/articles/6NrMJn),文中這樣說:

一個流可以是文件,socket,pipe等等可以進行I/O操作的內核對象。

不管是文件,還是套接字,還是管道,我們都可以把他們看作流。

  之后我們來討論I/O的操作,通過read,我們可以從流中讀入數據;通過write,我們可以往流寫入數據。現在假定一個情形,我們需要從流中讀數據,但是流中還沒有數據,(典型的例子為,客戶端要從socket讀如數據,但是服務器還沒有把數據傳回來),這時候該怎么辦?

  阻塞:阻塞是個什么概念呢?比如某個時候你在等快遞,但是你不知道快遞什么時候過來,而且你沒有別的事可以干(或者說接下來的事要等快遞來了才能做);那么你可以去睡覺了,因為你知道快遞把貨送來時一定會給你打個電話(假定一定能叫醒你)。

  非阻塞忙輪詢:接着上面等快遞的例子,如果用忙輪詢的方法,那么你需要知道快遞員的手機號,然后每分鍾給他掛個電話:“你到了沒?”

很明顯一般人不會用第二種做法,不僅顯很無腦,浪費話費不說,還占用了快遞員大量的時間。

大部分程序也不會用第二種做法,因為第一種方法經濟而簡單,經濟是指消耗很少的CPU時間,如果線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。

為了了解阻塞是如何進行的,我們來討論緩沖區,以及內核緩沖區,最終把I/O事件解釋清楚。緩沖區的引入是為了減少頻繁I/O操作而引起頻繁的系統調用(你知道它很慢的),當你操作一個流時,更多的是以緩沖區為單位進行操作,這是相對於用戶空間而言。對於內核來說,也需要緩沖區。

假設有一個管道,進程A為管道的寫入方,B為管道的讀出方。

假設一開始內核緩沖區是空的,B作為讀出方,被阻塞着。然后首先A往管道寫入,這時候內核緩沖區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之為“緩沖區非空”。

但是“緩沖區非空”事件通知B后,B卻還沒有讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩沖區中,如果內核也緩沖區滿了,B仍未開始讀數據,最終內核緩沖區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,我們把這個事件定義為“緩沖區滿”。

假設后來B終於開始讀數據了,於是內核的緩沖區空了出來,這時候內核會告訴A,內核緩沖區有空位了,你可以從長眠中醒來了,繼續寫數據了,我們把這個事件叫做“緩沖區非滿”

也許事件Y1已經通知了A,但是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩沖區空了。這個時候內核就告訴B,你需要阻塞了!,我們把這個時間定為“緩沖區空”。

這四個情形涵蓋了四個I/O事件,緩沖區滿,緩沖區空,緩沖區非空,緩沖區非滿(注都是說的內核緩沖區,且這四個術語都是我生造的,僅為解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(如果不能理解“同步”是什么概念,請學習操作系統的鎖,信號量,條件變量等任務同步方面的相關知識)。

然后我們來說說阻塞I/O的缺點。但是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。如果想要同時處理多個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。

於是再來考慮非阻塞忙輪詢的I/O方式,我們發現我們可以同時處理多個流了(把一個流從阻塞模式切換到非阻塞模式再此不予討論):

while true {
    for i in stream[]; {
        if i has data
        read until unavailable
    }
}

 我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為如果所有的流都沒有數據,那么只會白白浪費CPU。這里要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略。

為了避免CPU空轉,可以引進了一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可以把“忙”字去掉了)。代碼長這樣:

while true {
    select(streams[])
    for i in streams[] {
        if i has data
        read until unavailable
    }
}

於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。

但是使用select,我們有O(n)的無差別輪詢復雜度,同時處理的流越多,沒一次無差別輪詢時間就越長。再次

說了這么多,終於能好好解釋epoll了

epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll之會把哪個流發生了怎樣的I/O事件通知我們。此時我們對這些流的操作都是有意義的。(復雜度降低到了O(1))

在討論epoll的實現細節之前,先把epoll的相關操作列出:

epoll_create 創建一個epoll對象,一般epollfd = epoll_create()

epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增加/刪除某一個流的某一個事件
比如
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注冊緩沖區非空事件,即有數據流入
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注冊緩沖區非滿事件,即流可以被寫入
epoll_wait(epollfd,...)等待直到注冊的事件發生
(注:當對一個非阻塞流的讀寫發生緩沖區滿或緩沖區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩沖區非滿和緩沖區非空事件)。

一個epoll模式的代碼大概的樣子是:

while true {
    active_stream[] = epoll_wait(epollfd)
    for i in active_stream[] {
        read or write till
    }
 }
以上不會有太多的實例,基本就是原理,並且略帶的說了select、poll和epoll的比較,得出為什么要用epoll:下面是服務器代碼:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>


void usage(const char* argv)
{ 
  printf("%s:[ip][port]\n",argv);
}

void set_nonblock(int fd)
{
    int fl = fcntl(fd,F_GETFL);
    fcntl(fd,F_SETFL,fl | O_NONBLOCK);
}

int startup(char* _ip,int _port)  //創建一個套接字,綁定,檢測服務器
{
  //sock
  //1.創建套接字
  int sock=socket(AF_INET,SOCK_STREAM,0);   
  if(sock<0)
  {
      perror("sock");
      exit(2);
  }
  
  int opt = 1;
  setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

  //2.填充本地 sockaddr_in 結構體(設置本地的IP地址和端口)
  struct sockaddr_in local;       
  local.sin_port=htons(_port);
  local.sin_family=AF_INET;
  local.sin_addr.s_addr=inet_addr(_ip);

  //3.bind()綁定
  if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0) 
  {
      perror("bind");
      exit(3);
  }
  //4.listen()監聽 檢測服務器
  if(listen(sock,5)<0)
  {
      perror("listen");
      exit(4);
  }
  return sock;    //這樣的套接字返回
}

int main(int argc,char *argv[])                         
{
    if(argc!=3)     //檢測參數個數是否正確
    {
        usage(argv[0]);
        exit(1);
    }

    int listen_sock=startup(argv[1],atoi(argv[2]));      //創建一個綁定了本地 ip 和端口號的套接字描述符


    //1.創建epoll    
    int epfd = epoll_create(256);    //可處理的最大句柄數256個
    if(epfd < 0)
    {
	perror("epoll_create");
	exit(5);
    }

    struct epoll_event _ev;       //epoll結構填充 
    _ev.events = EPOLLIN;         //初始關心事件為讀
    _ev.data.fd = listen_sock;   

    //2.托管
    epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&_ev);  //將listen sock添加到epfd中,關心讀事件

    struct epoll_event revs[64];

    int timeout = -1;
    int num = 0;
    int done = 0;

    while(!done)
    {
	//epoll_wait()相當於在檢測事件
        switch((num = epoll_wait(epfd,revs,64,timeout)))  //返回需要處理的事件數目  64表示 事件有多大
        {
            case 0:                  //返回0 ,表示監聽超時
                printf("timeout\n");
                break;
            case -1:                 //出錯
                perror("epoll_wait");
                break;
            default:                 //大於零 即就是返回了需要處理事件的數目
              {
		struct sockaddr_in peer;
		socklen_t len = sizeof(peer);

		int i;
                for(i=0;i < num;i++)
                {
		    int rsock = revs[i].data.fd; //准確獲取哪個事件的描述符
                    if(rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立鏈接
                    {
                        int new_fd = accept(listen_sock,(struct sockaddr*)&peer,&len);  

                    	if(new_fd > 0)
                    	{
                             printf("get a new client:%s:%d\n",inet_ntoa(peer.sin_addr),ntohs(peer.sin_port));

			     set_nonblock(new_fd);
			     _ev.events = EPOLLIN | EPOLLET;
			     _ev.data.fd = new_fd;
			     epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&_ev);    //二次托管

                        }
		    }
		    else // 接下來對num - 1 個事件處理
		    {
			if(revs[i].events & EPOLLIN)
			{
			    char buf[1024];
			    ssize_t _s = read(rsock,buf,sizeof(buf)-1);
			    if(_s > 0)
			    {
				buf[_s] = '\0';
				printf("client:%s\n",buf);

			        _ev.events = EPOLLOUT | EPOLLET;
			        _ev.data.fd = rsock;
			        epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,&_ev);    //二次托管
			    }
			    else if(_s == 0)  //client:close
			    {
				printf("client:%d close\n",rsock);
				epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);

				close(rsock);
			    }
			    else
			    {
				perror("read");
			    }
			}
			else if(revs[i].events & EPOLLOUT)
			{
			    const char *msg = "HTTP/1.0.200 OK\r\n\r\n<html><h2>李寧愛張寧!</h2></html>\r\n";
			    write(rsock,msg,strlen(msg));
			    epoll_ctl(epfd,EPOLL_CTL_DEL,rsock,NULL);
			    close(rsock);
			}
			else
			{}
		    }
		}
	      }
	      break;
	}
    }
    return 0;
}
賜教!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM