IO多路復用技術總結


來源:微信公眾號「編程學習基地」

IO 多路復用概述

I/O 多路復用技術是為了解決進程或線程阻塞到某個 I/O 系統調用而出現的技術,使進程不阻塞於某個特定的 I/O 系統調用。

在IO多路復用技術描述前,先講解下同步,異步,阻塞,非阻塞的概念。

網絡IO模型

linux網絡IO中涉及到的模型如下:

(1)阻塞式IO

(2)非阻塞式IO

(3)IO多路復用

(4)信號驅動IO

(5)異步IO

今天不談信號驅動IO,略過..

同步/異步

在學習IO模型的時候,我們必須明確一個概念,處理 IO 的時候,阻塞和非阻塞都是同步 IO。

只有使用了特殊的 API 才是異步 IO,例如Linux網絡中的AIO。

再看下POSIX對同步和異步這兩個術語的定義:

  • 同步IO操作:導致請求進程阻塞,直到I/O操作完成;
  • 異步IO操作:不導致請求進程阻塞;

通俗的理解下同步和異步

  • 同步:當執行系統調用read時,需要用戶等待內核完成從內核緩沖區到用戶緩沖區的數據拷貝。

  • 異步:當執行異步IO操作例如aio_read時,用戶不需要等待,只需要接收內核完成操作的通知,由內核來完成數據的讀取。

阻塞/非阻塞

在知曉阻塞和非阻塞都是同步 IO后,阻塞和非阻塞就很好理解了

阻塞IO:由系統調用read,導致線程一直等待數據返回。

阻塞等待模型

非阻塞IO:系統調用read后立即返回一個狀態,當數據達到內核緩沖區之前都是非阻塞的,即返回一個系統調用狀態。

非阻塞等地模型

ps:閃客的動圖做的非常的形象,上述gif動圖來源「低並發編程」

IO多路復用

IO多路復用是一種同步IO模型,實現一個線程可以監視多個文件句柄;

select

select 是操作系統提供的系統調用函數,select()用來等待文件描述詞(普通文件、終端、偽終端、管道、FIFO、套接字及其他類型的字符型)狀態的改變。是一個輪循函數,循環詢問文件節點,可設置超時時間,超時時間到了就跳過代碼繼續往下執行。

通過select,我們可以把一個文件描述符的數組發給操作系統, 讓操作系統去遍歷,確定哪個文件描述符可以讀寫, 然后告訴我們去處理:

請添加圖片描述

頭文件

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

select調用

擁塞函數,擁塞等待文件描述符事件的到來

int select(int maxfdp
	, fd_set *readset
	, fd_set *writeset
	, fd_set *exceptset
	,struct timeval *timeout);

參數說明:

maxfdp:被監聽的文件描述符的最大值,它比所有文件描述符集合中的文件描述符的最大值大1,因為文件描述符是從0開始計數的;

readfds、writefds、exceptset:分別指向可讀、可寫和異常等事件對應的描述符集合。

timeout:用於設置select函數的超時時間,即告訴內核select等待多長時間之后就放棄等待。timeout == NULL 表示等待無限長的時間,timeout == 0,select立即返回

timeval結構體

struct timeval
{      
    long tv_sec;   /*秒 */
    long tv_usec;  /*微秒 */   
};

select置位

int FD_ZERO(int fd, fd_set *fdset);   //一個 fd_set類型變量的所有位都設為 0
int FD_CLR(int fd, fd_set *fdset);  //清除某個位時可以使用
int FD_SET(int fd, fd_set *fd_set);   //設置變量的某個位置位
int FD_ISSET(int fd, fd_set *fdset); //測試某個位是否被置位

當聲明了一個文件描述符集后,必須用FD_ZERO將所有位置零

調用 select函數,擁塞等待文件描述符事件的到來 ;如果超過設定的時間,則不再等待,繼續往下執行

select返回后,用FD_ISSET測試給定位是否置位:

if(FD_ISSET(fd, &rset)   
{ 
    ... 
    //do something  
}

fd_set結構體

fd_set其實這是一個數組的宏定義,實際上是一long類型的數組,每一個數組元素都能與一打開的文件句柄(socket、文件、管道、設備等)建立聯系,建立聯系的工作由程序員完成,當調用select()時,由內核根據IO狀態修改fd_set的內容,由此來通知執行了select()的進程哪個句柄可讀。

select使用

整個 select 的流程圖如下:
請添加圖片描述

Demo1:select示例

Server

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <sys/types.h>

#define MAXBUF 1024
#define LISTEN_NUM 2

int main(int argc, char **argv)
{
    int default_port = 8000;
    int optch = 0;
    while ((optch = getopt(argc, argv, "s:p:")) != -1)
    {
        switch (optch)
        {
        case 'p':
            default_port = atoi(optarg);
            printf("port: %s\n", optarg);
            break;
        case '?':
            printf("Unknown option: %c\n", (char)optopt);
            break;
        default:
            break;
        }
    }

    int sockfd, new_fd;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    char buf[MAXBUF + 1];
    fd_set rfds;            // select
    struct timeval tv;      //超時時間
    int retval, maxfd = -1; // select返回值 select監聽句柄的最大數量
    
    if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(default_port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1)
    {
        perror("bind");
        exit(EXIT_FAILURE);
    }
    if (listen(sockfd, LISTEN_NUM) == -1)
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    /*				數據處理				*/
    while (1)
    {
        printf("\n----wait for new connect port:%d\n",default_port);
        len = sizeof(struct sockaddr);
        if ((new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &len)) == -1)
        {
            perror("accept");
            exit(errno);
        }
        else
            printf("server: got connection from %s, port %d, socket %d\n",
                   inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);
        while (1)
        {
            FD_ZERO(&rfds);
            FD_SET(0, &rfds);
            FD_SET(new_fd, &rfds);
            maxfd = new_fd;
            tv.tv_sec = 1;
            tv.tv_usec = 0;
            retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
            if (retval == -1)
            {
                perror("select");
                exit(EXIT_FAILURE);
            }
            else if (retval == 0)
            {
                continue;
            }
            else
            {
                /*標准輸入*/
                if (FD_ISSET(0, &rfds))
                {
                    bzero(buf, MAXBUF + 1);
                    fgets(buf, MAXBUF, stdin);
                    if (!strncasecmp(buf, "quit", 4))
                    {
                        printf("i will quit!\n");
                        break;
                    }
                    len = send(new_fd, buf, strlen(buf) - 1, 0);
                    if (len > 0)
                        printf("send successful,%d byte send..\n", len);
                    else
                    {
                        printf("send failure!");
                        break;
                    }
                }
                if (FD_ISSET(new_fd, &rfds))
                {
                    bzero(buf, MAXBUF + 1);
                    len = recv(new_fd, buf, MAXBUF, 0);
                    if (len > 0)
                        printf("recv success :'%s', %d byte recv..\n", buf, len);
                    else
                    {
                        if (len < 0)
                            printf("recv failure\n");
                        else
                        {
                            printf("the client close ,quit\n");
                            break;
                        }
                    }
                }
            }
        }
        close(new_fd);
        printf("need othe connecdt (no->quit)");
        fflush(stdout);
        bzero(buf, MAXBUF + 1);
        fgets(buf, MAXBUF, stdin);
        if (!strncasecmp(buf, "no", 2))
        {
            printf("quit!\n");
            break;
        }
    }
    close(sockfd);
    return 0;
}

makefile:

TARGET=server
SRC = $(wildcard *.cpp *.c)
OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
DEFS =
CFLAGS = -g
CC =g++
LIBS =  -lpthread
$(TARGET):$(OBJ)
	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
.PHONY:
clean:
	rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select$ make
g++ -g  -o server select.c -lpthread
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select$ ./server
----wait for new connect port:8000

client

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <resolv.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>

#define MAXBUF 1024
int main(int argc, char **argv)
{
    int sockfd, len;
    struct sockaddr_in dest;
    char buffer[MAXBUF + 1];
    fd_set rfds;
    struct timeval tv;
    int retval, maxfd = -1;

    int optch,ret = -1;
    const char*server_addr;
    int default_port = 8000;

    /*判斷是否為合法輸入 必須傳入一個參數:服務器Ip*/
    if(argc<3)
    {
        printf("usage:tcpcli <IPaddress>");
        return 0;
    }
    while((optch = getopt(argc, argv, "s:p:")) != -1)
	{
		switch (optch)
		{
        case 's':
            server_addr = optarg;
            break;
        case 'p':
            default_port = atoi(optarg);
            printf("port: %s\n", optarg);
            break;
        case '?':
            printf("Unknown option: %c\n",(char)optopt);    
            break;
        default:
            break;
		}
	}
    
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    {
        perror("Socket");
        exit(EXIT_FAILURE);
    }

    bzero(&dest, sizeof(dest));
    dest.sin_family = AF_INET;
    dest.sin_port = htons(default_port);
    if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) 
    {
        perror(server_addr);
        exit(EXIT_FAILURE);
    }

    if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) 
    {
        perror("Connect ");
        exit(EXIT_FAILURE);
    }

    printf("\nget ready message chat:\n");
    while (1) 
	{
        FD_ZERO(&rfds);
        FD_SET(0, &rfds);
        FD_SET(sockfd, &rfds);
        maxfd = sockfd;
        tv.tv_sec = 1;
        tv.tv_usec = 0;
        retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
        if (retval == -1) 
		{
            printf("select %s", strerror(errno));
            break;
        } 
		else if (retval == 0)
            continue;
		else
		{
            if (FD_ISSET(sockfd, &rfds)) 
			{
                bzero(buffer, MAXBUF + 1);
                len = recv(sockfd, buffer, MAXBUF, 0);
                if (len > 0)
                    printf ("recv message:'%s', %d byte recv..\n",buffer, len);
                else 
				{
                    if (len < 0)
                        printf ("message recv failure\n");
                    else
					{
                        printf("server close ,quit\n");
                    	break;
					}
                }
            }
            if (FD_ISSET(0, &rfds))
			{
                bzero(buffer, MAXBUF + 1);
                fgets(buffer, MAXBUF, stdin);
                if (!strncasecmp(buffer, "quit", 4)) {
                    printf("i will quit\n");
                    break;
                }
                len = send(sockfd, buffer, strlen(buffer) - 1, 0);
                if (len < 0) {
                    printf ("message send failure");
                    break;
                } else
                    printf
                        ("send success,%d byte send..\n",len);
            }
        }
    }
    close(sockfd);
    return 0;
} 
TARGET=server
SRC = $(wildcard *.cpp *.c)
OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
DEFS =
CFLAGS = -g
CC =g++
LIBS =  -lpthread
$(TARGET):$(OBJ)
	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
.PHONY:
clean:
	rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select/client$ make
g++ -g  -o client client.c -lpthread
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select/client$ ./client -s 0.0.0.0

get ready message chat:

簡易聊天室select版本

server

#include<stdio.h>
#include<sys/types.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<sys/select.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<string.h>

#define _BACKLOG_ 5 //監聽隊列里允許等待的最大值
#define MAX_CONNECT 20
int fds[MAX_CONNECT];        //用來存放需要處理的IO事件
int listen_sock = -1;
int creat_sock(int port)
{
    int sock = socket(AF_INET,SOCK_STREAM,0);
    if(sock < 0){
        perror("creat_sock error");
        exit(1);
    }

    struct sockaddr_in local;
    local.sin_family = AF_INET;
    local.sin_port = htons(port);
    local.sin_addr.s_addr = INADDR_ANY; //inet_addr(0.0.0.0)
    
    // 設置允許socket立即重用
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&sock, sizeof(sock));  

    if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){
        perror("bind");
        exit(2);
     }

    if(listen(sock,_BACKLOG_) < 0 ){
        perror("listen");
        exit(4);
     }

    return sock;
}

int accept_sock(){
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    int accept_sock = accept(listen_sock, (struct sockaddr *)&client, &len);
    if (accept_sock < 0)
    {
        perror("accept");
        exit(5);
    }

    printf("connect by a client, ip:%s port:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

    size_t i = 0;
    for (; i < MAX_CONNECT; ++i) //將新接受的描述符存入集合中
    {
        if (fds[i] == -1)
        {
            fds[i] = accept_sock;
            break;
        }
    }
    if (i == MAX_CONNECT)
    {
        printf("accept is upper limit..\n");
        close(accept_sock);
    }
}

int groupChat(int sockFd,void* pBuf,int iSize){
    for(int index=0;index<MAX_CONNECT;index++){
        if(fds[index] == sockFd || fds[index] == listen_sock)
        {
            continue;
        }
        if(fds[index]!=-1){
            printf("write fd:%d..socketFd:%d\n",fds[index],sockFd);
            write(fds[index],pBuf,iSize);
        }
    }
}

int handle_read(int* socketFd)
{
    int socket = *socketFd;
    char buf[1024];
    memset(buf, '\0', sizeof(buf));
    ssize_t size = read(socket, buf, sizeof(buf) - 1);
    if (size < 0)
    {
        perror("read");
        exit(6);
    }
    else if (size == 0)
    {
        printf("client close..\n");
        close(socket);
        *socketFd = -1;
    }
    else
    {
        printf("client say: %s\n", buf);
        groupChat(socket, buf, size);
    }
}

int main(int argc,char* argv[])
{
    int default_port = 8000;
    int optch = 0;
    while ((optch = getopt(argc, argv, "s:p:")) != -1)
    {
        switch (optch)
        {
        case 'p':
            default_port = atoi(optarg);
            printf("port: %s\n", optarg);
            break;
        case '?':
            printf("Unknown option: %c\n", (char)optopt);
            break;
        default:
            break;
        }
    }

    listen_sock = creat_sock(default_port);

    size_t fds_num = sizeof(fds)/sizeof(fds[0]);
    size_t i = 0;
    for(;i < fds_num;++i)
    {
        fds[i] = -1;
    }
    
    int max_fd = listen_sock;
	fds[0] = listen_sock;
    fd_set rset;
    while(1)
	{
        FD_ZERO(&rset);
        FD_SET(listen_sock,&rset);
        struct timeval timeout = {10 , 0};

        size_t i = 0;
        for(;i < fds_num;++i)
        {
            if(fds[i] > 0 ){
                FD_SET(fds[i] ,&rset);
                if(max_fd < fds[i])
				{
                    max_fd = fds[i];
                }
            }
        }

        switch(select(max_fd+1,&rset,NULL,NULL,&timeout))
        {
            case -1:
                perror("select");
                break;
            case 0:
                printf("time out..\n");
				break;
            default:
            {
                size_t i = 0;
                for(;i < fds_num;++i)
                {
					//連接請求
					//當為 listen_socket 事件就緒的時候,就表明有新的連接請求
                    if(FD_ISSET(fds[i],&rset) && fds[i] == listen_sock)
                    {
                        accept_sock();
                    }
                    //普通請求
                    else if(FD_ISSET(fds[i],&rset) && (fds[i] > 0))
                    {
                        handle_read(&fds[i]);
                    }
                    else{}
                }
            }
            break;
        }
    }
    return 0;
}

makfeile

TARGET=server
SRC = $(wildcard *.cpp *.c)
OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
DEFS =
CFLAGS = -g
CC =g++
LIBS =  -lpthread
$(TARGET):$(OBJ)
	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
.PHONY:
clean:
	rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select$ make
g++ -g  -o server select.c -lpthread
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select$ ./server
connect by a client, ip:127.0.0.1 port:42964

client

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <resolv.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>

#define MAXBUF 1024
#define MAXNAME 64
int main(int argc, char **argv)
{
    int sockfd, len;
    struct sockaddr_in dest;
    char buffer[MAXBUF + 1];
    fd_set rfds;
    struct timeval tv;
    int retval, maxfd = -1;

    int optch,ret = -1;
    const char*server_addr;
    int default_port = 8000;
    char* clientName = "佚名";

    /*判斷是否為合法輸入 必須傳入一個參數:服務器Ip*/
    if(argc<3)
    {
        printf("usage:tcpcli <IPaddress>");
        return 0;
    }
    while((optch = getopt(argc, argv, "s:p:n:")) != -1)
	{
		switch (optch)
		{
        case 's':
            server_addr = optarg;
            break;
        case 'p':
            default_port = atoi(optarg);
            printf("port: %s\n", optarg);
            break;
        case 'n':
            clientName = optarg;
            printf("client Name: %s\n", optarg);
            break;
        case '?':
            printf("Unknown option: %c\n",(char)optopt);    
            break;
        default:
            break;
		}
	}
    
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    {
        perror("Socket");
        exit(EXIT_FAILURE);
    }

    bzero(&dest, sizeof(dest));
    dest.sin_family = AF_INET;
    dest.sin_port = htons(default_port);
    if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) 
    {
        perror(server_addr);
        exit(EXIT_FAILURE);
    }

    if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) 
    {
        perror("Connect ");
        exit(EXIT_FAILURE);
    }

    printf("get ready message chat:\n");
    while (1) 
	{
        FD_ZERO(&rfds);
        FD_SET(0, &rfds);
        FD_SET(sockfd, &rfds);
        maxfd = sockfd;
        tv.tv_sec = 1;
        tv.tv_usec = 0;
        retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
        if (retval == -1) 
		{
            printf("select %s", strerror(errno));
            break;
        } 
		else if (retval == 0)
            continue;
		else
		{
            if (FD_ISSET(sockfd, &rfds)) 
			{
                bzero(buffer, MAXBUF + 1);
                len = recv(sockfd, buffer, MAXBUF, 0);
                if (len > 0)
                    printf ("recv byte %d, %s\n",len, buffer);
                else 
				{
                    if (len < 0)
                        printf ("message recv failure\n");
                    else
					{
                        printf("server close ,quit\n");
                    	break;
					}
                }
            }
            if (FD_ISSET(0, &rfds))
			{
                char name_msg[MAXNAME + MAXBUF];
                bzero(buffer, MAXBUF + 1);
                fgets(buffer, MAXBUF, stdin);
                if (!strncasecmp(buffer, "quit", 4)) {
                    printf("i will quit\n");
                    break;
                }
                sprintf(name_msg, "[%s]: %s", clientName, buffer);
                len = send(sockfd, name_msg, strlen(name_msg) - 1, 0);
                if (len < 0) {
                    printf ("message send failure");
                    break;
                } else
                    printf("send success,%d byte send..\n",len);
            }
        }
    }
    close(sockfd);
    return 0;
} 

makefile

TARGET=server
SRC = $(wildcard *.cpp *.c)
OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
DEFS =
CFLAGS = -g
CC =g++
LIBS =  -lpthread
$(TARGET):$(OBJ)
	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
.PHONY:
clean:
	rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/select/client$ ./client -s 0.0.0.0 -n 夢凡
client Name: 夢凡
get ready message chat:

poll調用

Poll就是監控文件是否可讀的一種機制,作用與select一樣。

#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);

參數說明

struct pollfd

fds:是一個struct pollfd結構類型的數組,列出了我們需要poll()檢查的文件描述符

typedef struct pollfd {
        int fd;           /* 需要被檢測或選擇的文件描述符*/
        short events;     /* 對文件描述符fd上感興趣的事件 */
        short revents;    /* 文件描述符fd上當前實際發生的事件*/
} pollfd_t;

​ events:想要監聽的事件

​ revents:實際上發生的事件

POLLIN
POLLOUT
POLLPRI
POLLRDHUB
POLLHUP
POLLERR

nfds

指定了fds中元素的個數,nfds_t為無符號整形

timeout

決定阻塞行為,一般如下:

  • -1:一直阻塞到fds數組中有一個達到就緒態或者捕獲到一個信號

  • 0:不會阻塞,立即返回

  • >0:阻塞時間

返回值

  • >0:數組fds中准備好讀、寫或出錯狀態的那些socket描述符的總數量;

  • ==0:數組fds中沒有任何socket描述符准備好讀、寫,或出錯;此時poll超時

  • -1: poll函數調用失敗

poll使用

#include <stdio.h>
#include <poll.h>
#include <string.h>

int main()
{
	int timeout = 0;			   
	char buf[1024];
	struct pollfd fd_poll[1];	   //設置只有一個事件

	while(1){
		fd_poll[0].fd = 0;      
		fd_poll[0].events = POLLIN;
		fd_poll[0].revents = 0;	   

		memset(buf, '\0', sizeof(buf));
		switch( poll(fd_poll, 1, -1) ){
			case 0:
				perror("timeout!");
				break;
			case -1:
				perror("poll");
				break;
			default:
				{
					if( fd_poll[0].revents & POLLIN )
					{

						gets(buf);
						printf("buf : %s\n",buf);
					}
				}
				break;
		}
	}
	return 0;
}

makefile

tcp_poll:tcp_poll.c
    gcc -o $@ $^
.PHONY:clean
clean:
    rm -f tcp_poll

epoll調用

epoll沒有對描述符數目的限制,它所支持的文件描述符上限是整個系統最大可以打開的文件數目,例如,在1GB內存的機器上,這個限制大概為10萬左右。

epoll只有 epoll_createepoll_ctlepoll_wait 這三個系統調用。

第一步,創建一個 epoll 句柄

第二步,向內核添加、修改或刪除要監控的文件描述符。

第三步,發起了 select() 調用

請添加圖片描述

其定義如下:

#include <sys/epoll.h>

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll_create

#include <sys/epoll.h>

int epoll_create(int size);

調用epoll_create方法創建一個epoll的句柄,使用完epoll后使用close函數進行關閉

epoll_ctl

#include <sys/epoll.h>

int epoll_ctl(int epfd	//第一個參數epfd:epoll_create函數的返回值。
	, int op			//第二個參數events:表示動作類型。有三個宏來表示
	, int fd			//第三個參數fd:需要監聽的fd。
	, struct epoll_event *event);//第四個參數event:告訴內核需要監聽什么事件。

op:

  • EPOLL_CTL_ADD:注冊新的fd到epfd中;

  • EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;

  • EPOLL_CTL_DEL:從 epfd 中刪除一個 fd。

fd:需要注冊監視對象文件描述符

struct epoll_event

// 感興趣的事件和被觸發的事件
struct epoll_event {
    __uint32_t events; // Epoll events
    epoll_data_t data; // User data variable
};
// 保存觸發事件的某個文件描述符相關的數據
typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;
Epoll Events:

EPOLLIN:表示對應的文件描述符可讀(包括對端Socket);
EPOLLOUT:表示對應的文件描述符可寫;
EPOLLPRI:表示對應的文件描述符有緊急數據可讀(帶外數據);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET:將EPOLL設為邊緣觸發(Edge Triggered),這是相對於水平觸發(Level Triggered)而言的。
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket,需要再次添加

例如:

struct epoll_event ep_ev;
int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len);
ep_ev.events = EPOLLIN | EPOLLET;
ep_ev.data.fd = accept_sock;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)

epoll_wait

收集在epoll監控的事件中已經發生的事件

#include <sys/epoll.h>

int epoll_wait(int epfd		//第一個參數epfd:epoll_create函數的返回值。
	, struct epoll_event *events	
	, int maxevents
	, int timeout);			//超時時間(毫秒)

第一個參數epfd:epoll_create函數的返回值。

第二個參數events:是分配好的epoll_event結構體數組,epoll將會把發生的事件賦值到events數組中(events不可以是空指針,內核只負責把數據賦值到這個event數組中,不會去幫助我們在用戶態分配內存)

第三個參數maxevents:maxevents告訴內核這個events數組有多大,這個maxevents的值不能大於創建epoll_create時的size。

第四個參數:是超時時間(毫秒),如果函數調用成功,則返回對應IO上已准備好的文件描述符數目,如果返回0則表示已經超時。

基於epoll的簡易http服務器

基於epoll的簡單回顯服務器

#include<stdio.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<fcntl.h>
#include<stdlib.h>
#include<string.h>

int listen_sock = -1;
int epoll_fd = -1;

//設置非阻塞
int set_noblock(int sock)
{
    int opts = fcntl(sock,F_GETFL);
    return fcntl(sock,F_SETFL,opts | O_NONBLOCK);
}

int creat_socket(int port)
{
    int sock = socket(AF_INET,SOCK_STREAM,0);
    if(sock < 0){
        perror("socket");
        exit(2);
    }
    //調用setsockopt使當server先斷開時避免進入 TIME_WAIT 狀態,\
        將其屬性設定為SO_REUSEADDR,使其地址信息可被重用
    int opt = 1;
    if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){
        perror("setsockopt");
        exit(3);
    }
    struct sockaddr_in local;

    local.sin_family = AF_INET;
    local.sin_port = htons(port);
    local.sin_addr.s_addr = INADDR_ANY; //inet_addr("0.0.0.0");

    if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0 ){
        perror("bind");
        exit(4);
    }
    if(listen(sock,5) < 0){
        perror("listen");
        exit(5);
    }
    printf("listen port %d..\n",port);
    return sock;
}

int accept_socket(){
    struct sockaddr_in remote;
    socklen_t len = sizeof(remote);

    int accept_sock = accept(listen_sock, (struct sockaddr *)&remote, &len);
    if (accept_sock < 0)
    {
        perror("accept");
        return -1;
    }
    printf("accept a client..[ip]: %s,[port]: %d\n", inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));
    //將新的事件添加到epoll集合中
    struct epoll_event ep_ev;
    ep_ev.events = EPOLLIN | EPOLLET; // edge邊沿觸發,只觸發一次
    ep_ev.data.fd = accept_sock;

    set_noblock(accept_sock);

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_sock, &ep_ev) < 0)
    {
        perror("epoll_ctl");
        close(accept_sock);
        return -1;
    }
    return 0;
}

int handle_request(int socketFd){
    //申請空間同時存文件描述符和緩沖區地址
    char buf[102400];
    memset(buf, '\0', sizeof(buf));

    ssize_t _s = recv(socketFd, buf, sizeof(buf) - 1, 0);
    if (_s < 0)
    {
        perror("recv");
        return -1;
    }
    else if (_s == 0)
    {
        printf("remote close..\n");
        //遠端關閉了,進行善后
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);
        close(socketFd);
    }
    else
    {
        //讀取成功,輸出數據
        printf("client# %s", buf);
        fflush(stdout);

        //將事件改寫為關心事件,進行回寫
        struct epoll_event ep_ev;
        ep_ev.data.fd = socketFd;
        ep_ev.events = EPOLLOUT | EPOLLET;

        //在epoll實例中更改同一個事件,觸發socket可寫事件
        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socketFd, &ep_ev);
    }
    return 0;
}

int handle_response(int socketFd){
    const char *msg = "HTTP/1.1 200 OK \r\n\r\n<h1> hi girl </h1>\r\n";
    send(socketFd, msg, strlen(msg), 0);
    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);
    close(socketFd);
}
int main(int argc,char *argv[])
{
    int default_port = 8000;
    int optch = 0;
    while ((optch = getopt(argc, argv, "s:p:")) != -1)
    {
        switch (optch)
        {
        case 'p':
            default_port = atoi(optarg);
            printf("port: %s\n", optarg);
            break;
        case '?':
            printf("Unknown option: %c\n", (char)optopt);
            break;
        default:
            break;
        }
    }

    listen_sock = creat_socket(default_port);

    epoll_fd = epoll_create(256);
    if(epoll_fd < 0){
        perror("epoll creat");
        exit(6);
    }

    struct epoll_event ep_ev;
    ep_ev.events = EPOLLIN;         //數據的讀取
    ep_ev.data.fd = listen_sock;

    //添加關心的事件
    if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev) < 0){
        perror("epoll_ctl");
        exit(7);
    }

    struct epoll_event ready_ev[128];   //申請空間來放就緒的事件。
    int maxnum = 128;
    int timeout = -1;                   //設置超時時間,若為-1,則永久阻塞等待。
    int ret = 0;
    
    int done = 0;
    while(!done){
        switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout)){
            case -1:
                perror("epoll_wait");
                break;
            case 0:
                printf("time out...\n");
                break;
            default://至少有一個事件就緒
            {
                int i = 0;
                for(;i < ret;++i)
				{
                    //判斷是否為監聽套接字,是的話 accept
                    int fd = ready_ev[i].data.fd; 
                    if((fd == listen_sock) && (ready_ev[i].events & EPOLLIN)){
                        accept_socket();
                    }
                    else{//普通IO
                        if(ready_ev[i].events & EPOLLIN){
                            handle_request(fd);
                        }else if(ready_ev[i].events & EPOLLOUT){
                            handle_response(fd);
                        }
                    }
                }
            }
                break;
        }
    }
    close(listen_sock);
    return 0;
}

makefile

TARGET=server
SRC = $(wildcard *.cpp *.c)
OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
DEFS =
CFLAGS = -g
CC =g++
LIBS =  -lpthread
$(TARGET):$(OBJ)
	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
.PHONY:
clean:
	rm -rf *.o $(TARGET)

Build

make

Usage

ubuntu@VM-16-5-ubuntu:~/learnbase/IO復用/epoll$ ./server
listen port 8000

瀏覽器輸入:http://服務器ip:8000/例如,http://49.234.35.128:8000/

請添加圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM