Linux 網絡編程的5種IO模型:多路復用(select/poll/epoll)
背景
我們在上一講 Linux 網絡編程的5種IO模型:阻塞IO與非阻塞IO中,對於其中的 阻塞/非阻塞IO 進行了說明。
這一講我們來看 多路復用機制。
IO復用模型 ( I/O multiplexing )
所謂I/O多路復用機制,就是說通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。這種機制的使用需要額外的功能來配合: select、poll、epoll
select、poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒后自己負責進行讀寫,也就是說這個讀寫過程是阻塞的。
select時間復雜度O(n)
它僅僅知道了,有I/O事件發生了,卻並不知道是哪那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。所以select具有O(n)的無差別輪詢復雜度,同時處理的流越多,無差別輪詢時間就越長。
poll時間復雜度O(n)
poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,然后查詢每個fd對應的設備狀態, 但是它沒有最大連接數的限制,原因是它是基於鏈表來存儲的.
epoll時間復雜度O(1)
epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll會把哪個流發生了怎樣的I/O事件通知我們。所以我們說epoll實際上是事件驅動(每個事件關聯上fd)的,此時我們對這些流的操作都是有意義的。(復雜度降低到了O(1))
在多路復用IO模型中,會有一個內核線程不斷去輪詢多個socket的狀態,只有當真正讀寫事件發生時,才真正調用實際的IO讀寫操作。因為在多路復用IO模型中,只需要使用一個線程就可以管理多個socket,系統不需要建立新的進程或者線程,也不必維護這些線程和進程,並且只有在真正有讀寫事件進行時,才會使用IO資源,所以它大大減少了資源占用。
select
使用select來監視文件描述符時,要向內核傳遞的信息包括:
1、我們要監視的文件描述符個數
2、每個文件描述符,我們可以監視它的一種或多種狀態,包括:可讀,可寫,發生異常三種。
3、要等待的時間,監視是一個過程,我們希望內核監視多長時間,然后返回給我們監視結果呢?(可以永遠等待,等待一段時間,或者不等待直接返回)
4、監視結果包括:准備好了的文件描述符個數,對於讀,寫,異常,分別是哪兒個文件描述符准備好了。
fd_set 模型的原理:(理解select模型的關鍵在於理解fd_set,假設取fd_set長度為1字節):
fd_set中的每一位可以對應一個文件描述符fd。則1字節長的fd_set最大可以對應8個fd。
執行FD_ZERO(&set);則set用位表示是0000,0000。
若fd=5,執行FD_SET(fd,&set);后set變為0001,0000(第5位置為1)
若再加入fd=2,fd=1,則set變為0001,0011
執行select(6,&set,0,0,0)阻塞等待...
若fd=1,fd=2上都發生可讀事件,則select返回,此時set變為0000,0011。注意:沒有事件發生的fd=5被清空。
基於上面的討論,可以輕松得出select模型的特點:
(1)可監控的文件描述符個數取決與sizeof(fd_set)的值。我這邊服務器上sizeof(fd_set)=512,每bit表示一個文件描述符,則我服務器上支持的最大文件描述符是512*8=4096。據說可調,另有說雖然可調,但調整上限受於編譯內核時的變量值。
(2)將fd加入select監控集的同時,還要再使用一個數據結構array保存放到select監控集中的fd,一是用於在select返回后,array作為源數據和fd_set進行FD_ISSET判斷。二是select返回后會把以前加入的但並無事件發生的fd清空,則每次開始 select前都要重新從array取得fd逐一加入(FD_ZERO最先),掃描array的同時取得fd最大值maxfd,用於select的第一個參數。
(3)可見select模型必須在select前循環array(加fd,取maxfd),select返回后循環array(FD_ISSET判斷是否有事件發生)。
select 實現原理 :
1、使用copy_from_user從用戶空間拷貝fd_set到內核空間
2、注冊回調函數__pollwait
3、遍歷所有fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或者datagram_poll)
4、以tcp_poll為例,其核心實現就是__pollwait,也就是上面注冊的回調函數。
5、__pollwait的主要工作就是把current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對於tcp_poll來說,其等待隊列是sk->sk_sleep(注意:把進程掛到等待隊列中並不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數據(磁盤設備)后,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。
6、poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。
7、如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是current)進入睡眠。當設備驅動發生自身資源可讀寫后,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則調用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。
8、把fd_set從內核空間拷貝到用戶空間。
select的幾大缺點:
1)每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大
2)同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
3)select支持的文件描述符數量太小了,默認是1024
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set); //清除某一個被監視的文件描述符。
int FD_ISSET(int fd, fd_set *set); //測試一個文件描述符是否是集合中的一員
void FD_SET(int fd, fd_set *set); //添加一個文件描述符,將set中的某一位設置成1;
void FD_ZERO(fd_set *set); //清空集合中的文件描述符,將每一位都設置為0;
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
struct timeval{
long tv_sec; //秒
long tv_usec;//微秒
}
struct timespec{
time_t tv_sec;//秒
long tv_nsec;//納秒
}
select和pselect有三個主要的區別:
1、select超時使用的是struct timeval,用秒和微秒計時,而pselect使用struct timespec ,用秒和納秒。
2、select會更新超時參數timeout 以指示還剩下多少時間,pselect不會。
3、select沒有sigmask參數.
-
sigmask:這個參數保存了一組內核應該打開的信號(即:從調用線程的信號掩碼中刪除)
-
當pselect的sigmask 為 NULL時pselect和select一樣;當sigmask!=NULL時,等效於以下原子操作:
sigset_t origmask;
sigprocmask(SIG_SETMASK, &sigmask, &origmask);
ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
sigprocmask(SIG_SETMASK, &origmask, NULL);
接收信號的程序通常只使用信號處理程序來引發全局標志。全局標志將指示事件必須被處理。在程序的主循環中。一個信號將導致select和pselect返回-1 並將erron=EINTR。
我們經常要在主循環中處理信號,主循環的某個位置將會檢查全局標志,那么我們會問:如果信號在條件之后,select之前到達怎么辦。答案是select會無限期阻塞。
這種情況很少見,但是這就是為什么出現了pselect。因為他是類似原子操作的。
描述: 允許程序監視多個文件描述符,等待所監視的一個或者多個文件描述符變為“准備好”的狀態。所謂的”准備好“狀態是指:文件描述符不再是阻塞狀態,可以用於某類IO操作了,包括可讀,可寫,發生異常三種 。
參數說明:
nfds: 一個整數值, 表示集合中所有文件描述符的范圍,即所有文件描述符的最大值+1。
注意, 待測試的描述集總是從0, 1, 2, …開始的。 所以, 假如你要檢測的描述符為8, 9, 10, 那么系統實際也要監測0, 1, 2, 3, 4, 5, 6, 7, 此時真正待測試的描述符的個數為11個, 也就是max(8, 9, 10) + 1
readfds/writefds/exceptfds:這些都是fd_set類型的,代表文件描述符集合; 可以認為一個fd_set變量是由很多個二進制構成的數組,每一位表示一個文件描述符是否需要監視。
-
readfds:監視文件描述符的一個集合,我們監視其中的文件描述符是不是可讀,或者更准確的說,讀取是不是不阻塞了。
-
writefds:監視文件描述符的一個集合,我們監視其中的文件描述符是不是可寫,或者更准確的說,寫入是不是不阻塞了。
-
exceptfds:用來監視發生錯誤異常文件
timeout: 表示select返回之前的時間上限。 設為0(NULL),代表無期限等待下去。 這個等待可以被一個信號中斷,只有當一個描述符准備好,或者捕獲到一個信號時函數才會返回。如果是捕獲到信號,select返回-1,並將變量errno設置成EINTR。
如果timeout ->tv_sec 為0 且 timeout->tv_sec 為0 ,不等待直接返回,加入的描述符都會被測試,並且返回滿足要求的描述符個數,這種方法通過輪詢,無阻塞地獲得了多個文件描述符狀態。
如果timeout->tv_sec!=0 || timeout->tv_sec!=0 ,等待指定的時間。當有描述符復合條件或者超過超時時間的話,函數返回。等待總是會被信號中斷。
返回值
成功時:返回三種描述符集合中”准備好了“的文件描述符數量。
超時:返回0
錯誤:返回-1,並設置 errno
- EBADF:集合中包含無效的文件描述符。(文件描述符已經關閉了,或者文件描述符上已經有錯誤了)。
- EINTR:捕獲到一個信號。
- EINVAL:nfds是負的或者timeout中包含的值無效。
- ENOMEM:無法為內部表分配內存。
例程:基於 select的 TCP 服務器
server.c
/*
# Copyright By Schips, All Rights Reserved
# https://gitee.com/schips/
#
# File Name: server.c
# Created : Sat 25 Mar 2020 14:43:39 PM CST
*/
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
typedef struct _info {
char name[10];
char text[54];
}info;
int main(int argc, char *argv[])
{
int my_socket;
unsigned int len;
int ret, i, j;
// 創建套接字
my_socket = socket(AF_INET, SOCK_STREAM, 0); // IPV4, TCP socket
if(my_socket == -1) { perror("Socket"); }
printf("Creat a socket :[%d]\n", my_socket);
// 用於接收消息
info buf ={0};
// 指定地址
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 地址協議族
addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
addr.sin_port = htons(12345); //指定端口號
int set = 1;
int get = 0;
int getlen = 0;
// 服務器 綁定
bind(my_socket, (struct sockaddr *)&addr, sizeof(addr));
ret = listen(my_socket, 10);
if(-1 == ret) { perror("listen"); }
printf("Listening\n");
int connect_sockets[100] = {0}; // 我們規定,為 0 的成員為無效socket
int connected_cnt = 0;
//struct sockaddr_in new = {0};
//int new_addr_size = {0};
fd_set read_sets;
int max_fd = my_socket; // 一開始時,只有 一個新的 文件描述:my_socket ,所以它是最大的
while(1) // 在循環中等待連接請求
{
FD_ZERO(&read_sets); // 每次都需要初始化
FD_SET(my_socket, &read_sets); // 添加 要監聽的 socket
// 添加 之后經過 connect 過來的 套接字數組(一般在第一次循環時是空的)
for( i = 0; i < connected_cnt; i++)
{
if(connect_sockets[i])
{
FD_SET(connect_sockets[i], &read_sets); // 添加經過accept保存下來,需要進行讀響應的套接字到集合中
}
}
// 設置監聽超時時間
// timeout.tv_sec = 2;
// timeout.tv_usec = 0;
ret = select(max_fd + 1, &read_sets, NULL, NULL, NULL);
// 判斷返回值
switch (ret) {
case 0 :
printf("Time out.\n"); // 監聽超時
break;
case -1 :
printf("Err occurs.\n"); // 監聽錯誤
break;
default :
if(FD_ISSET(my_socket, &read_sets)) //這個是原的被動socket,如果是它,則 意味着有新的連接進來了
{
connect_sockets[connected_cnt] = accept(my_socket, NULL, NULL);
max_fd = connect_sockets[connected_cnt];
printf("New socket is %d\n", connect_sockets[connected_cnt]);
connected_cnt ++;
printf("Now we has [%d] connecter\n", connected_cnt);
}else{ // 如果不是 被動socket,那么就意味着是 現有的連接 有消息發來(我們有數據可讀)
printf("New message came in.\n");
// 求出是那個文件描述符可讀
for(i = 0; i < connected_cnt; i++)
{
if(FD_ISSET(connect_sockets[i], &read_sets) == 1) break;
}
if( i >= connected_cnt) { continue; }
printf("Socket [%d] send to server.\n", connect_sockets[i]);
// 接收消息
ret = recv(connect_sockets[i], &buf, sizeof(buf), 0);
if( ret <= 0 )
{
// 遠程客戶端斷開處理(如果不處理,會導致服務器也斷開)
printf("[%d]/[%d] Client [%d] disconnected.\n", i+1, connected_cnt, connect_sockets[i]);
close(connect_sockets[i]);
// 我們需要將對應的客戶端從數組中移除 且 連接數 -1 (移除的方法: 數組成員前移覆蓋)
for (j = i; j < connected_cnt - 1; ++j)
{
connect_sockets[j] = connect_sockets[ j + 1];
}
connected_cnt --;
}
// 打印消息
printf("[%s] : %s\n", buf.name, buf.text);
// 回復消息
sprintf(buf.name, "Server");
sprintf(buf.text, "Had recvied your[%d] message", connect_sockets[i]);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(connect_sockets[i], &buf, sizeof(buf), 0);
}
break;
}
printf("while loop\n");
}
// 關閉連接
//shutdown(my_socket, SHUT_RDWR); perror("shutdown");
for(i = 0; i < connected_cnt; i++)
{
close(connect_sockets[i]); perror("close");
}
return close(my_socket);
}
client.c
/*
# Copyright By Schips, All Rights Reserved
# https://gitee.com/schips/
#
# File Name: client.c
# Created : Sat 25 Mar 2020 14:44:19 PM CST
*/
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
typedef struct _info {
char name[10];
char text[54];
}info;
int main(int argc, char *argv[])
{
int my_socket;
unsigned int len;
int ret, i = 0;
// 創建套接字
my_socket = socket(AF_INET, SOCK_STREAM, 0); // IPV4, TCP socket
if(my_socket == -1) { perror("Socket"); }
printf("Creat a socket :[%d]\n", my_socket);
// 用於接收消息
info buf ={0};
// 指定地址
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 地址協議族
addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
addr.sin_port = htons(12345); //指定端口號
int new_socket;
struct sockaddr_in new = {0};
int new_addr_size;
connect(my_socket, (struct sockaddr *)(&addr), sizeof(struct sockaddr_in));
if(-1 == ret) { perror("connect"); }
printf("connected\n");
// 回復消息
sprintf(buf.name, "Client");
sprintf(buf.text, "Hello tcp text.");
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(my_socket, &buf, sizeof(buf), 0);
perror("sendto");
// 接收並打印消息
//recvfrom(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
recv(my_socket, &buf, sizeof(buf), 0);
perror("recvfrom");
printf("[%s] : %s\n", buf.name, buf.text);
for (i = 0; i < 5; ++i)
{
sleep(2);
// 回復消息
sprintf(buf.name, "Client");
sprintf(buf.text, "Hello tcp text [%d].", i++);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(my_socket, &buf, sizeof(buf), 0);
perror("sendto");
// 接收並打印消息
//recvfrom(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
recv(my_socket, &buf, sizeof(buf), 0);
printf("[%s] : %s\n", buf.name, buf.text);
perror("recvfrom");
}
// 關閉連接
//shutdown(my_socket, SHUT_RDWR); perror("shutdown");
//printf("%d\n", errno);
return close(my_socket); perror("close");
printf("%d\n", errno);
return errno;
}
poll
select() 和 poll() 系統調用的本質一樣,poll() 的機制與 select() 類似,與 select() 在本質上沒有多大差別,管理多個描述符也是進行輪詢,根據描述符的狀態進行處理,但是 poll() 沒有最大文件描述符數量的限制( 基於鏈表來存儲的,但是數量過大后性能也是會下降)。poll() 和 select() 同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <signal.h>
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask);
struct pollfd{
int fd; //文件描述符
short events; //等待的事件
short revents; //實際發生的事件
};
描述: 監視並等待多個文件描述符的屬性變化
參數解析:
fds : 指向一個struct pollfd數組的指針,用於指定測試某個給定的fd的條件
- fd :每一個 pollfd 結構體指定了一個被監視的文件描述符,可以傳遞多個結構體,指示 poll() 監視多個文件描述符。
- events: 指定監測fd的事件(輸入、輸出、錯誤),每一個事件有多個取值
- revents:revents 域是文件描述符的操作結果事件,內核在調用返回時設置這個域。events 域中請求的任何事件都可能在 revents 域中返回。
注意:每個結構體的 events 域是由用戶來設置,告訴內核我們關注的是什么,而 revents 域是返回時內核設置的,以說明對該描述符發生了什么事件
nfds : 指定 fds 數組元素個數 ,相當於要檢查多少個文件描述符
timeout:指定等待的毫秒數,無論 I/O 是否准備好,poll() 都會返回。
-
-1 : 永遠等待,直到事件發生
-
0:立即返回
-
> 0:等待指定的毫秒數
返回值:
成功時,poll() 返回結構體中 revents 域不為 0 的文件描述符個數;如果在超時前沒有任何事件發生,poll()返回 0;
失敗時,poll() 返回 -1,並設置 errno 為下列值之一:
- EBADF:一個或多個結構體中指定的文件描述符無效。
- EFAULT:fds 指針指向的地址超出進程的地址空間。
- EINTR:請求的事件之前產生一個信號,調用可以重新發起。
- EINVAL:nfds 參數超出 PLIMIT_NOFILE 值。
- ENOMEM:可用內存不足,無法完成請求。
events & revents的取值如下:
| 事件 | 描述 | 是否可作為輸入(events) | 是否可作為輸出(revents) |
|---|---|---|---|
| POLLIN | 數據可讀(包括普通數據&優先數據) | 是 | 是 |
| POLLOUT | 數據可寫(普通數據&優先數據) | 是 | 是 |
| POLLRDNORM | 普通數據可讀 | 是 | 是 |
| POLLRDBAND | 優先級帶數據可讀(linux不支持) | 是 | 是 |
| POLLPRI | 高優先級數據可讀,比如TCP帶外數據 | 是 | 是 |
| POLLWRNORM | 普通數據可寫 | 是 | 是 |
| POLLWRBAND | 優先級帶數據可寫 | 是 | 是 |
| POLLRDHUP | TCP連接被對端關閉,或者關閉了寫操作,由GNU引入 | 是 | 是 |
| POPPHUP | 掛起 | 否 | 是 |
| POLLERR | 錯誤 | 否 | 是 |
| POLLNVAL | 文件描述符沒有打開 | 否 | 是 |
https://blog.csdn.net/coolgw2015/article/details/79719328
poll例程
使用poll函數監控標准輸入
#include <stdio.h>
#include <unistd.h>
#include <poll.h>
int main()
{
struct pollfd poll_fd;
char buf[1024];
poll_fd.fd = 0;
poll_fd.events=POLLIN;
for(;;)
{
int ret = poll(&poll_fd,1,2000);
if(ret<0)
{
perror("poll");
continue;
}
if(ret==0)
{
printf("poll timeout!\n");
continue;
}
if(poll_fd.revents==POLLIN)
{
read(0,buf,sizeof(buf)-1);
printf("sdin:%s",buf);
}
}
}
poll 處理 tcp通信
#include <stdio.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/poll.h>
#define MAX_SOCKET 5
typedef struct _info {
char name[10];
char text[54];
}info;
int main(int argc, char *argv[])
{
info send_buf;
info recv_buf;
int listen_socket,newsk;
int connected_sockets[MAX_SOCKET];
int connected_cnt = 0;
int i;
// 1 創建一個套接字,用於網絡通信
listen_socket = socket(PF_INET, SOCK_STREAM, 0);
if (listen_socket == -1)
{
perror("socket");
return -1;
}
// 2 綁定服務的IP與端口
struct sockaddr_in ser_addr;
ser_addr.sin_family = PF_INET;
ser_addr.sin_port = htons (12345) ;
ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int ret = bind(listen_socket, (struct sockaddr *)&ser_addr,sizeof(ser_addr));
if (ret == -1)
{
perror("bind");
return -1;
}
// 3 監聽端口的數據
ret = listen(listen_socket,MAX_SOCKET);
if (ret == -1)
{
perror("listen");
return -1;
}
// 監聽的總套接字數組
struct pollfd pollfds[MAX_SOCKET+1];
int poll_ret;
// 主套接字信息
pollfds[0].fd = listen_socket;
pollfds[0].events = POLLIN|POLLPRI; //設置為 任意優先級可讀 事件監聽
while(1)
{
printf("Poll all fds\n");
poll_ret = poll(pollfds, connected_cnt + 1, -1); //-1 阻塞模式進行監聽
printf("ret = %d\n", poll_ret); //返回值為1 : 正確監聽到變化
printf("Need to handld %d fd(s)\n", poll_ret); //返回值為1 : 正確監聽到變化
if(poll_ret == 0)
{
printf("timeout!\n"); //監聽超時
} else if( poll_ret == -1 )
{
perror("err!"); //監聽出錯
}
//正確監聽到變化
if(pollfds[0].revents & POLLIN || pollfds[0].revents & POLLPRI) //如果新客戶端連接
{
// 響應用戶連接
connected_sockets[connected_cnt] = accept(listen_socket,NULL,NULL); //返回 新的連接響應的套接字
if(connected_sockets[connected_cnt] == -1)
{
perror("accept");
return -1;
}
printf("new accept from %d!\n", connected_sockets[connected_cnt]);
//更新客戶端套接字集合
pollfds[connected_cnt+1].fd = connected_sockets[connected_cnt];
pollfds[connected_cnt+1].events = POLLIN|POLLPRI; //任意優先級可讀
connected_cnt++;
} else
{
printf("new read/write !\n");
for(i = 0; !(pollfds[i+1].revents & POLLIN) ;i++ ) ;//如果是 客戶端發生了數據可讀)
//4 接收與發送數據
newsk = connected_sockets[i];
memset(&recv_buf, 0, sizeof(recv_buf));
ret = recv(newsk, &recv_buf, sizeof(recv_buf), 0);
if (ret == -1)
{
perror("recv");
return -1;
}
if(errno == EINTR) continue;
if(ret == 0 ) //客戶端斷開
{
printf("%d disconnectded\n", connected_sockets[i]);
close(connected_sockets[i]);
connected_sockets[i] = -1;
memset(&pollfds[i+1] , 0, sizeof(struct pollfd )); //清空斷開套接字監聽信息
continue;
}
printf("[%d],[%s] : %s\n", connected_sockets[i], recv_buf.name, recv_buf.text);
sprintf(send_buf.name, "Server");
sprintf(send_buf.text, "Had recvied your[%d] message", connected_sockets[i]);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(connected_sockets[i], &send_buf, sizeof(send_buf), 0);
}
sleep(2);
}
// 5 關閉套接字
for(i = 0; i < connected_cnt; i++)
{
close(connected_sockets[i]);
}
return 0;
}
poll函數的優缺點
通過poll函數的結構以及小測試程序的編寫,我們不難發現poll函數的一些特點:
1、優點
(1)poll() 不要求開發者計算最大文件描述符加一的大小。
(2)poll() 在應付大數目的文件描述符的時候速度更快,相比於select。
(3)它沒有最大連接數的限制,原因是它是基於鏈表來存儲的。
(4)在調用函數時,只需要對參數進行一次設置就好了
2、缺點
(1)大量的fd的數組被整體復制於用戶態和內核地址空間之間,而不管這樣的復制是不是有意義。
(2)與select一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符,這樣會使性能下降
(3)同時連接的大量客戶端在一時刻可能只有很少的就緒狀態,因此隨着監視的描述符數量的增長,其效率也會線性下降
epoll
epoll是Linux下多路復用IO接口select/poll的增強版本,它能顯著減少程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率,因為它不會復用文件描述符集合來傳遞結果而迫使開發者每次等待事件之前都必須重新准備要被偵聽的文件描述符集合,另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select/poll 那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。
情景導入:
有100萬用戶同時與一個進程保持着TCP連接,而每一時刻只有幾十個或幾百個TCP連接是活躍的(接收TCP包),也就是說在每一時刻進程只需要處理這100萬連接中的一小部分連接。那么,如何才能高效的處理這種場景呢?進程是否在每次詢問操作系統收集有事件發生的TCP連接時,把這100萬個連接告訴操作系統,然后由操作系統找出其中有事件發生的幾百個連接呢?實際上,在Linux2.4版本以前,那時的select或者poll事件驅動方式是這樣做的。
這里有個非常明顯的問題,即在某一時刻,進程收集有事件的連接時,其實這100萬連接中的大部分都是沒有事件發生的。因此如果每次收集事件時,都把100萬連接的套接字傳給操作系統(這首先是用戶態內存到內核態內存的大量復制),而由操作系統內核尋找這些連接上有沒有未處理的事件,將會是巨大的資源浪費,然后select和poll就是這樣做的,因此它們最多只能處理幾千個並發連接。而epoll不這樣做,它在Linux內核中申請了一個簡易的文件系統,把原先的一個select或poll調用分成了3部分
-
調用epoll_create建立一個epoll對象(在epoll文件系統中給這個句柄分配資源);
-
調用epoll_ctl向epoll對象中添加這100萬個連接的套接字;
-
調用epoll_wait收集發生事件的連接。
這樣只需要在進程啟動時建立1個epoll對象,並在需要的時候向它添加或刪除連接就可以了,因此,在實際收集事件時,epoll_wait的效率就會非常高,因為調用epoll_wait時並沒有向它傳遞這100萬個連接,內核也不需要去遍歷全部的連接。
epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
描述:創建一個epoll對象(實際上它也是一個文件描述符),用於添加或刪除指定的連接。
參數解析:
size:想關注的文件描述符數量(用於在內核申請一片空間,用來存放你想關注的socket fd上是否發生以及發生了什么事件。)
返回值:成功返回一個epoll文件描述符。失敗返回-1。
注意: epoll會占用一個fd值**,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。
epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/* 有關的結構體 */
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
描述: 控制epoll事件,可以是注冊、修改或刪除一個fd
參數解析:
epfd:epoll_create 返回的對象
op:操作類型
- EPOLL_CTL_ADD:注冊新的fd到epfd中
- EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件
- EPOLL_CTL_DEL:從epfd中刪除一個fd
fd:需要監聽的fd
event:監聽的事件
- events:下面值的相或結果
EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符可以寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。缺省是水平觸發(Level Triggered)。
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里。
epoll工作模式epoll有2種工作方式:LT和ET。
- LT(level-triggered)是缺省的工作方式,並且同時支持block和no-block socket。在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的。所以,這種模式編程出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表。
- ET (edge-triggered)是高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個文件描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如,你在發送,接收或者接收請求,或者發送接收的數據少於一定量時導致了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),內核不會發送更多的通知(only once),不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。
- data:用戶數據,在TCP中一般傳遞我們需要監聽的fd
返回值:成功返回0,失敗返回-1
epoll_wait
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
/* 有關的結構體 */
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
描述: 等待事件的產生,類似於select()調用。
epoll_wait運行的原理是:等侍注冊在epfd上的socket fd的事件的發生,如果發生則將發生的sokct fd和事件類型放入到events數組中。並且將注冊在epfd上的socket fd的事件類型給清空,所以如果下一個循環你還要關注這個socket fd的話,則需要
epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)來重新設置socket fd的事件類型。這時不用EPOLL_CTL_ADD,因為socket fd並未清空,只是事件類型清空。這一步非常重要。
參數解析:
epfd:epoll_create 返回的對象
events:用來從內核得到所有的讀寫事件(從內核返回給用戶),
maxevents:告訴內核需要監聽的所有的socket的句柄數(從用戶傳給內核),值不能大於創建epoll_create()時的size。
timeout:超時時間(毫秒,0會立即返回,-1永久等待)。
返回值:成功返回需要處理的事件數目,若已超時則返回0;失敗返回-1。
epoll 使用流程
通過在包含一個頭文件#include <sys/epoll.h> 以及幾個簡單的API將可以大大的提高你的網絡服務器的支持人數。
首先通過create_epoll(int maxfds)來創建一個epoll的句柄,其中maxfds為你epoll所支持的最大句柄數。這個函數會返回一個新的epoll句柄,之后的所有操作將通過這個句柄來進行操作。在用完之后,記得用close()來關閉這個創建出來的epoll句柄。
之后在你的網絡主循環里面,每一幀的調用epoll_wait(int epfd, strcuct epoll_event* events, int maxevents, int timeout)來查詢所有的網絡接口,看哪一個可以讀,哪一個可以寫了。基本的語法為:
nfds = epoll_wait(kdpfd, events, maxevents, -1);
其中kdpfd為用epoll_create創建之后的句柄,events是一個epoll_event*的指針,當epoll_wait這個函數操作成功之后,epoll_events里面將儲存所有的讀寫事件。maxevents是當前需要監聽的所有socket句柄數。
epoll_wait范圍之后應該是一個循環,遍利所有的事件。
// 幾乎所有的epoll程序都使用下面的框架
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //有新的連接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept這個連接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //將新的fd添加到epoll的監聽隊列中
}
else if( events[i].events&EPOLLIN ) //接收到數據,讀socket
{
n = read(sockfd, line, MAXLINE)) < 0 //讀
ev.data.ptr = md; //md為自定義類型,添加數據
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改標識符,等待下一個循環時發送數據,異步處理的精髓
}
else if(events[i].events&EPOLLOUT) //有數據待發送,寫socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取數據
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //發送數據
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改標識符,等待下一個循環時接收數據
}
else
{
//其他的處理
}
}
}
例程
使用epoll來讀取設備文件
#include<stdio.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<sys/epoll.h>
int main(int argc, char *argv[])
{
int epollfd = epoll_create(512);
int fd_key = open("/dev/input/event1", O_RDONLY|O_NONBLOCK);
int fd_mice = open("/dev/input/mice", O_RDONLY|O_NONBLOCK);
struct epoll_event ev;
ev.events = EPOLLIN; // 監控可讀
ev.data.fd = fd_key;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd_key, &ev);
ev.data.fd = fd_mice;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd_mice, &ev);
struct epoll_event evout[2];
char buf[1024];
while(1)
{
int ret = epoll_wait(epollfd, evout, 2, 5000);
if(ret == 0) continue; // 超時
if(ret < 0 && errno == EINTR) continue; // 被信號打斷
if(ret < 0) break; // 錯誤發生了
// ret > 0情況
int i;
for(i=0; i<ret; ++i)
{
int fd = evout[i].data.fd;
if(read(fd, buf, sizeof(buf)) < 0)
{
// close自動將它從epoll中移除
close(fd);
}
if(fd == fd_key) printf("鍵盤有消息\n");
else if(fd == fd_mice) printf("鼠標有消息\n");
}
}
}
使用 epoll來實現 tcp-server
// http://www.manongjc.com/article/54633.html
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#define IPADDRESS "127.0.0.1"
#define PORT 12345
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
//函數聲明
//創建套接字並進行綁定
static int socket_bind(const char* ip,int port);
//IO多路復用epoll
static void do_epoll(int listenfd);
//事件處理函數
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//處理接收到的連接
static void handle_accpet(int epollfd,int listenfd);
//讀處理
static void do_read(int epollfd,int fd,char *buf);
//寫處理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//刪除事件
static void delete_event(int epollfd,int fd,int state);
int main(int argc,char *argv[])
{
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT); //創建套接字並進行綁定
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}
static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
//inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//創建一個描述符
epollfd = epoll_create(FDSIZE);
//添加監聽描述符事件
add_event(epollfd,listenfd,EPOLLIN);
for ( ; ; )
{
//獲取已經准備好的描述符事件
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,listenfd,buf);
}
close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
int i;
int fd;
//進行選好遍歷
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根據描述符的類型和事件類型進行處理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
static void handle_accpet(int epollfd,int listenfd)
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//添加一個客戶描述符和事件
add_event(epollfd,clifd,EPOLLIN);
}
}
static void do_read(int epollfd,int fd,char *buf)
{
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else if (nread == 0)
{
fprintf(stderr,"client close.\n");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else
{
printf("read message is : %s",buf);
//修改描述符對應的事件,由讀改為寫
modify_event(epollfd,fd,EPOLLOUT);
}
}
static void do_write(int epollfd,int fd,char *buf)
{
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
delete_event(epollfd,fd,EPOLLOUT);
}
else
modify_event(epollfd,fd,EPOLLIN);
memset(buf,0,MAXSIZE);
}
static void add_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}
static void delete_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
4.信號驅動IO模型
在信號驅動IO模型中,當用戶線程發起一個IO請求操作,會給對應的socket注冊一個信號函數,然后用戶線程會繼續執行,當內核數據就緒時會發送一個信號給用戶線程,用戶線程接收到信號之后,便在信號函數中調用IO讀寫操作來進行實際的IO請求操作。這個一般用於UDP中,對TCP套接口幾乎是沒用的,原因是該信號產生得過於頻繁,並且該信號的出現並沒有告訴我們發生了什么事情

5.異步IO模型
異步IO模型才是最理想的IO模型,在異步IO模型中,當用戶線程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從內核的角度,當它受到一個asynchronous read之后,它會立刻返回,說明read請求已經成功發起了,因此不會對用戶線程產生任何block。然后,內核會等待數據准備完成,然后將數據拷貝到用戶線程,當這一切都完成之后,內核會給用戶線程發送一個信號,告訴它read操作完成了。也就說用戶線程完全不需要關心實際的整個IO操作是如何進行的,只需要先發起一個請求,當接收內核返回的成功信號時表示IO操作已經完成,可以直接去使用數據了。
也就說在異步IO模型中,IO操作的兩個階段都不會阻塞用戶線程,這兩個階段都是由內核自動完成,然后發送一個信號告知用戶線程操作已完成。用戶線程中不需要再次調用IO函數進行具體的讀寫。這點是和信號驅動模型有所不同的,在信號驅動模型中,當用戶線程接收到信號表示數據已經就緒,然后需要用戶線程調用IO函數進行實際的讀寫操作;而在異步IO模型中,收到信號表示IO操作已經完成,不需要再在用戶線程中調用iO函數進行實際的讀寫操作。
注意,異步IO是需要操作系統的底層支持,在Java 7中,提供了Asynchronous IO。簡稱AIO
前面四種IO模型實際上都屬於同步IO,只有最后一種是真正的異步IO,因為無論是多路復用IO還是信號驅動模型,IO操作的第2個階段都會引起用戶線程阻塞,也就是內核進行數據拷貝的過程都會讓用戶線程阻塞。

兩種高性能IO設計模式
在傳統的網絡服務設計模式中,有兩種比較經典的模式:
一種是多線程,一種是線程池。
對於多線程模式,也就說來了client,服務器就會新建一個線程來處理該client的讀寫事件,如下圖所示:

這種模式雖然處理起來簡單方便,但是由於服務器為每個client的連接都采用一個線程去處理,使得資源占用非常大。因此,當連接數量達到上限時,再有用戶請求連接,直接會導致資源瓶頸,嚴重的可能會直接導致服務器崩潰。
因此,為了解決這種一個線程對應一個客戶端模式帶來的問題,提出了采用線程池的方式,也就說創建一個固定大小的線程池,來一個客戶端,就從線程池取一個空閑線程來處理,當客戶端處理完讀寫操作之后,就交出對線程的占用。因此這樣就避免為每一個客戶端都要創建線程帶來的資源浪費,使得線程可以重用。
但是線程池也有它的弊端,如果連接大多是長連接,因此可能會導致在一段時間內,線程池中的線程都被占用,那么當再有用戶請求連接時,由於沒有可用的空閑線程來處理,就會導致客戶端連接失敗,從而影響用戶體驗。因此,線程池比較適合大量的短連接應用。
因此便出現了下面的兩種高性能IO設計模式:Reactor和Proactor。
在Reactor模式中,會先對每個client注冊感興趣的事件,然后有一個線程專門去輪詢每個client是否有事件發生,當有事件發生時,便順序處理每個事件,當所有事件處理完之后,便再轉去繼續輪詢,如下圖所示:

從這里可以看出,上面的五種IO模型中的多路復用IO就是采用Reactor模式。注意,上面的圖中展示的 是順序處理每個事件,當然為了提高事件處理速度,可以通過多線程或者線程池的方式來處理事件。Java NIO使用的就是這種
在Proactor模式中,當檢測到有事件發生時,會新起一個異步操作,然后交由內核線程去處理,當內核線程完成IO操作之后,發送一個通知告知操作已完成,可以得知,異步IO模型采用的就是Proactor模式。Java AIO使用的這種。
異步IO
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <aio.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
static char *memBuffer;
static int sFileDesc;
static struct sigaction sOldSigAction;
static void MySigQuitHandler(int sig)
{
printf("Signal Quit! The number is: %d\n", sig);
}
static void MyFileReadCompleteProcedure(int sig, siginfo_t *si, void *ucontext)
{
printf("The file length is: %zu, and the content is: %s\n", strlen(memBuffer), memBuffer);
int status = close(sFileDesc);
if(status == 0)
