第九章 I/O復用
I/O復用技術是重要的提高服務器工作效率和性能的手段,Linux下實現I/O復用的系統調用主要有select、poll和epoll。
首先我們來看一下select的函數原型和常用的宏:
1 #include<sys/select.h> 2 int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout); 3 FD_ZERO(fd_set *fdset); //清除fdset所有位 4 FD_SET(int fd, fd_set* fdset); //設置fdset的位fd 5 FD_CLR(int fd, fd_set* fdset); //清除fdset的位fd 6 int FD_ISSET(int fd, fd_set* fdset); //測試fdset的位fd是否被設置
首先來看select函數原型,nfds指定了被監聽的文件描述符的總數,其值通常被設定為所有文件描述符的最大值加一,接下來的三個fd_set*類型的參數分別指向可讀可寫和異常事件對應的文件描述符集合,最后一個參數是一個微秒級的定時器,表示select阻塞這個時間后繼續執行,如果為0則立即返回,如果為NULL將一直阻塞。
通過觀察fd_set結構體的原型,我們發現其僅包含一個整形數組,該數組的每一位都標記了一個文件描述符,所以select有最大可監控的文件描述符的限制。后面的宏是為了簡化對於fd_set的位操作。select函數成功時返回就緒文件描述符的總數,如果在超時時間內沒有任何文件描述符就緒,則select返回0,如果在select阻塞期間程序收到信號,則select立即返回-1並置errno為EINTR。
select在何種情況下會認為文件描述符產生了可讀、可寫或異常情況呢?首先,當socket處於以下狀態會認為其可讀:1)socket內核接收緩沖區中的字節數大於或等於其低水位標記,此時我們可以無阻塞地讀該socket,且讀操作返回值大於0;2)socket的對端關閉連接,此時讀操作返回0;3)監聽socket上有新的請求;4)socket上有未處理的錯誤。而以下狀態會認為socket可寫:1)socket內核發送緩沖區中的可用字節數大於或等於其低水位標記,此時我們可以無阻塞地寫該socket,且寫操作返回值大於0;2)socket的寫操作被關閉,對寫操作關閉的socket執行寫操作會觸發SIGPIPE信號;3)socket使用非阻塞connect連接成功或者失敗(超時)之后;4)socket上有未處理的錯誤。而異常情況只有一種,就是產生了帶外數據。
我們來用一個例子看一下select程序如何來寫以及select如何同時處理普通數據和帶外數據的:
1 /************************************************************************* 2 > File Name: 9-1.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sat 03 Feb 2018 07:23:52 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 int main(int argc, char** argv) { 12 if(argc <= 2) { 13 printf("usage: %s ip_address port_number\n", basename(argv[0])); 14 return 1; 15 } 16 const char* ip = argv[1]; 17 int port = atoi(argv[2]); 18 19 int ret = 0; 20 struct sockaddr_in address; 21 bzero(&address, sizeof(address)); 22 address.sin_family = AF_INET; 23 address.sin_port = htons(port); 24 inet_pton(AF_INET, ip, &address.sin_addr); 25 26 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 27 assert(listenfd >= 0); 28 29 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 30 assert(ret != -1); 31 32 ret = listen(listenfd, 5); 33 assert(ret != -1); 34 35 struct sockaddr_in client_address; 36 socklen_t client_addrlength = sizeof(client_address); 37 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 38 if(connfd < 0) { 39 printf("errno is: %d\n", errno); 40 } 41 42 char buf[1024]; 43 fd_set read_fds; 44 fd_set exception_fds; 45 FD_ZERO(&read_fds); 46 FD_ZERO(&exception_fds); 47 48 while(1) { 49 memset(buf, 0, sizeof(buf)); 50 FD_SET(connfd, &read_fds); 51 FD_SET(connfd, &exception_fds); 52 ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL); 53 if(ret < 0) { 54 printf("selection failure\n"); 55 break; 56 } 57 if(FD_ISSET(connfd, &read_fds)) { 58 ret = recv(connfd, buf, sizeof(buf), 0); 59 if(ret <= 0) break; 60 printf("get %d bytes of normal data: %s\n", ret, buf); 61 } 62 memset(buf, 0, sizeof(buf)); 63 if(FD_ISSET(connfd, &exception_fds)) { 64 ret = recv(connfd, buf, sizeof(buf), MSG_OOB); 65 if(ret <= 0) break; 66 printf("get %d bytes of oob data: %s\n", ret, buf); 67 } 68 } 69 close(connfd); 70 close(listenfd); 71 return 0; 72 }
客戶端我們使用了前面的5-6程序發送普通數據和帶外數據。在實際測試過程中發現了一個問題,即原書上的FD_ISSET里面判斷是readfd還是exceptionfd那里是if和else if,測試時只輸出了普通數據而沒有輸出帶外數據,將客戶端發送帶外數據后sleep一下就可以正常接收,或者像我上面程序一樣使用兩個if判斷也可以正常接收。分析一下原因,如果是if和else if,則這兩個分支只能觸發一個,而對面發送的數據是既有普通數據又有帶外數據的,所以導致了這個結果,而如果客戶端sleep,則服務器端可以返回后重新讀取一次帶外數據,而改為兩個if,就需要判斷兩次,這樣就不會有處理不了帶外數據的情況了。
poll的系統調用和select相似,也是在一段時間內輪詢一定數量的文件描述符。當timeout值為-1時poll調用將阻塞直到某個事件發生。
epoll是Linux特有的I/O復用函數,其實現和select、poll有很大區別。epoll將用戶關心的文件描述符上的事件放在內核里的一個事件表中,從而無須像select和poll那樣每次調用都要重復傳入文件描述符集或事件集,但是epoll需要一個額外的文件描述符來標識內核中的這個事件表。與poll不同的是,epoll如果檢測到事件,就將所有就緒時間從內核時間表中復制到events指向的數組中,這樣就極大提高了應用程序檢索就緒文件描述符的效率,從O(n)的時間復雜度降為了O(1)。我們來看一下epoll的幾個函數:
1 #include<sys/epoll.h> 2 int epoll_create(int size); 3 int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event); 4 int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
創建epoll的函數size參數現在是沒有用處的,只是給內核一個提示,告訴它事件表需要多大。操作epoll的函數中op參數指定了操作類型,一共有注冊、修改和刪除三種,而event參數則描述了事件。
epoll對於文件描述符的操作有兩種模式:LT和ET,其中LT是默認模式,這種模式下其效率相當於一個稍微改進的poll,效率沒有顯著提高,而ET模式則是epoll的高效工作模式。對於LT模式的文件描述符,當epoll_wait檢測到其上有事件發生並將此事件通知應用程序后,因公程序可以不立即處理該事件,這樣當epoll_wait再次被觸發,還會再向應用程序通告此事件,知道該事件被處理。而對於ET模式,當epoll_wait檢測到其上有事件發生,將其通告應用程序,應用程序必須馬上處理,因為后續的epoll_wait將不再向應用程序通知這一事件。可見,ET模式降低了同一個epoll事件被重復觸發的次數,所以效率更高。我們用一個實例來看一下:
1 /************************************************************************* 2 > File Name: 9-3.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sat 03 Feb 2018 10:35:56 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define BUFFER_SIZE 10 13 14 //設置文件描述符為非阻塞模式 15 int setnonblocking(int fd) { 16 int old_option = fcntl(fd, F_GETFL); 17 int new_option = old_option | O_NONBLOCK; 18 fcntl(fd, F_SETFL, new_option); 19 return old_option; 20 } 21 22 //以兩種不同模式將事件注冊到epoll中 23 void addfd(int epollfd, int fd, bool enable_et) { 24 epoll_event event; 25 event.data.fd = fd; 26 event.events = EPOLLIN; 27 if(enable_et) event.events |= EPOLLET; 28 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 29 setnonblocking(fd); 30 } 31 32 void lt(epoll_event* events, int number, int epollfd, int listenfd) { 33 char buf[BUFFER_SIZE]; 34 for(int i = 0; i < number; i ++) { 35 int sockfd = events[i].data.fd; 36 if(sockfd == listenfd) { 37 struct sockaddr_in client_address; 38 socklen_t client_addrlength = sizeof(client_address); 39 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 40 addfd(epollfd, connfd, false); 41 } 42 else if(events[i].events & EPOLLIN) { 43 printf("event trigger once\n"); 44 memset(buf, 0, sizeof(buf)); 45 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 46 if(ret <= 0) { 47 close(sockfd); 48 continue; 49 } 50 printf("get %d bytes of content: %s\n", ret, buf); 51 } 52 else printf("something else happened\n"); 53 } 54 } 55 56 void et(epoll_event* events, int number, int epollfd, int listenfd) { 57 char buf[BUFFER_SIZE]; 58 for(int i = 0; i < number; i ++) { 59 int sockfd = events[i].data.fd; 60 if(sockfd == listenfd) { 61 struct sockaddr_in client_address; 62 socklen_t client_addrlength = sizeof(client_address); 63 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 64 addfd(epollfd, connfd, true); 65 } 66 else if(events[i].events & EPOLLIN) { 67 //這段代碼不會被重復觸發,所以我們循環讀取 68 printf("event trigger once\n"); 69 while(1) { 70 memset(buf, 0, sizeof(buf)); 71 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 72 if(ret < 0) { 73 //非阻塞模式的I/O,當下面的條件成立表示數據已經全部取走 74 if((errno == EAGAIN) || (errno == EWOULDBLOCK)) { 75 printf("read later\n"); 76 break; 77 } 78 close(sockfd); 79 break; 80 } 81 else if(ret == 0) close(sockfd); 82 else printf("get %d bytes of content: %s\n", ret, buf); 83 } 84 } 85 else printf("something else happened\n"); 86 } 87 } 88 89 int main(int argc, char** argv) { 90 if(argc <= 2) { 91 printf("usage: %s ip_address port_number\n", basename(argv[0])); 92 return 1; 93 } 94 95 const char* ip = argv[1]; 96 int port = atoi(argv[2]); 97 98 int ret = 0; 99 struct sockaddr_in address; 100 bzero(&address, sizeof(address)); 101 address.sin_family = AF_INET; 102 address.sin_port = htons(port); 103 inet_pton(AF_INET, ip, &address.sin_addr); 104 105 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 106 assert(listenfd >= 0); 107 108 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 109 assert(ret != -1); 110 111 ret = listen(listenfd, 5); 112 assert(ret != -1); 113 114 epoll_event events[MAX_EVENT_NUMBER]; 115 int epollfd = epoll_create(5); 116 assert(epollfd != -1); 117 addfd(epollfd, listenfd, true); 118 119 while(1) { 120 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 121 if(ret < 0) { 122 printf("epoll failure\n"); 123 break; 124 } 125 //lt(events, ret, epollfd, listenfd); 126 et(events, ret, epollfd, listenfd); 127 } 128 close(listenfd); 129 return 0; 130 }
在上面的實驗中,同樣發送25個字符,第一個LT工作模式下epoll一共向應用程序通知了三次,而第二種的ET工作模式僅僅通知一次。
即使我們使用ET模式,那一個socket上的事件也有可能被觸發多次,比如一個線程讀取socket上的數據后開始處理這些數據,而在數據的處理過程中該socket上又有新數據可讀,此時另外一個線程來讀取這些數據,於是就出現了兩個線程同時操作一個socket的情況。而我們期望一個socket連接在任意時刻都只被一個線程處理,這一點我們可以用EPOLLONESHOT事件實現。對於注冊了EPOLLONESHOT的文件描述符,操作系統最多觸發其上注冊的一個可讀、可寫或異常事件,且只觸發一次,除非我們使用epoll_ctl重新注冊,這樣當一個線程在處理某個socket時,其他線程是不可能有機會操作這個socket的。所以當這個線程操作完畢,應該馬上注冊EPOLLONESHOT事件以確保其可以再次被觸發EPOLLIN事件。用一段代碼來看一下:
1 /************************************************************************* 2 > File Name: 9-4.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sun 04 Feb 2018 12:24:13 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define BUFFER_SIZE 1024 13 struct fds { 14 int epollfd; 15 int sockfd; 16 }; 17 18 int setnonblocking(int fd) { 19 int old_option = fcntl(fd, F_GETFL); 20 int new_option = old_option | O_NONBLOCK; 21 fcntl(fd, F_SETFL, new_option); 22 return old_option; 23 } 24 25 void addfd(int epollfd, int fd, bool enable_et) { 26 epoll_event event; 27 event.data.fd = fd; 28 event.events = EPOLLIN | EPOLLET; 29 if(enable_et) event.events |= EPOLLONESHOT; 30 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 31 setnonblocking(fd); 32 } 33 34 void reset_oneshot(int epollfd, int fd) { 35 epoll_event event; 36 event.data.fd = fd; 37 event.events = EPOLLIN | EPOLLONESHOT | EPOLLET; 38 epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); 39 } 40 41 void* worker(void* arg) { 42 int sockfd = ((fds*)arg)->sockfd; 43 int epollfd = ((fds*)arg)->epollfd; 44 printf("start new thread to receive data on fd: %d\n", sockfd); 45 char buf[BUFFER_SIZE]; 46 memset(buf, 0, sizeof(buf)); 47 while(1) { 48 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 49 if(ret == 0) { 50 close(sockfd); 51 printf("foreiner closed the connection\n"); 52 break; 53 } 54 else if(ret < 0) { 55 if(errno == EAGAIN) { 56 reset_oneshot(epollfd, sockfd); 57 printf("read later\n"); 58 break; 59 } 60 } 61 else { 62 printf("get content: %s\n", buf); 63 memset(buf, 0, sizeof(buf)); 64 sleep(5); 65 } 66 } 67 printf("end thread receiving data on fd: %d\n", sockfd); 68 } 69 70 int main(int argc, char** argv) { 71 if(argc <= 2) { 72 printf("usage: %s ip_address port_number\n", basename(argv[0])); 73 return 1; 74 } 75 76 const char* ip = argv[1]; 77 int port = atoi(argv[2]); 78 79 int ret = 0; 80 struct sockaddr_in address; 81 bzero(&address, sizeof(address)); 82 address.sin_family = AF_INET; 83 address.sin_port = htons(port); 84 inet_pton(AF_INET, ip, &address.sin_addr); 85 86 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 87 assert(listenfd >= 0); 88 89 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 90 assert(ret != -1); 91 92 ret = listen(listenfd, 5); 93 assert(ret != -1); 94 95 epoll_event events[MAX_EVENT_NUMBER]; 96 int epollfd = epoll_create(5); 97 assert(epollfd != -1); 98 addfd(epollfd, listenfd, false); 99 100 while(1) { 101 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 102 if(ret < 0) { 103 printf("epoll failure\n"); 104 break; 105 } 106 for(int i = 0; i < ret; i ++) { 107 int sockfd = events[i].data.fd; 108 if(sockfd == listenfd) { 109 struct sockaddr_in client_address; 110 socklen_t client_addrlength = sizeof(client_address); 111 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 112 addfd(epollfd, connfd, true); 113 } 114 else if(events[i].events & EPOLLIN) { 115 pthread_t thread; 116 fds fds_for_new_worker; 117 fds_for_new_worker.epollfd = epollfd; 118 fds_for_new_worker.sockfd = sockfd; 119 pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker); 120 } 121 else printf("something else happened\n"); 122 } 123 } 124 close(listenfd); 125 return 0; 126 }
代碼中我們用sleep(5)來模擬數據處理的過程,當數據在處理時,我們又一次發送了數據,而此時服務器並沒有調用另外一個線程繼續工作,而是由原線程處理,我們看到兩次數據都是由5號線程處理的。這樣保證了連接的完整性,從而避免了很多可能的競態條件。
具體分析了三種I/O復用模式,我們來對比一下他們的異同與優缺點。這三個函數都能在一定時間內監聽一定數量的文件描述符,若一個或多個文件描述符有時間返回時函數返回就緒的文件描述符的個數。select的參數類型fd_set沒有把文件描述符和事件綁定,因此我們需要提供這三個集合,而且select也只能監聽這三種事件,另一方面內核對fd_set集合會在線修改,所以應用程序下次調用select需要重置三個集合。而poll將所有事件放在了一起,使編程接口更加簡潔,且內核每次操作的是pollfd中的revents成員,events並未改變。最后epoll則完全不同,它在內核維護一個內核事件表,每次epoll_wait操作都是直接從該內核事件表中取得用戶注冊的事件,而無需從用戶空間重復讀入這些事件。
另外,poll和epoll可以監聽的最大文件描述符數目都能達到系統允許的最大值,即65535,而select由於fd_set的本質是一個整形數組,每一位代表一個文件描述符,所以其能監聽的最大數量有限制。
從效率和原理上來講,select和poll都工作在低效的LT模式,而且采用輪詢的方式,而epoll則不同,他支持高效的ET模式,而且可以支持EPOLLONESHOT事件以進一步減少可讀可寫和異常事件的觸發次數,不僅如此,它采用了回調的方式,內核檢測到就緒的文件描述符就觸發回調函數,回調函數將該文件描述符上的對應事件插入內核的就緒隊列,最后將就緒隊列中的內容拷貝到用戶空間。但是當活動連接較多時,epoll的效率未必比select和poll高,所以epoll_wait適用於連接數量多但是活動連接少的情況。
當我們對一個非阻塞的socket調用一個connect,而連接又沒有被馬上建立,這時,我們可以調用select、poll等函數來監聽socket上的可寫事件,當函數返回時利用getsockopt讀取錯誤碼並清除錯誤,如果錯誤碼是0則表示連接成功建立,否則失敗。我們來看一下這種情況怎么實現:
1 /************************************************************************* 2 > File Name: 9-5.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sun 04 Feb 2018 07:47:09 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define BUFFER_SIZE 1024 12 13 int setnonblocking(int fd) { 14 int old_option = fcntl(fd, F_GETFL); 15 int new_option = old_option | O_NONBLOCK; 16 fcntl(fd, F_SETFL, new_option); 17 return old_option; 18 } 19 20 //非阻塞連接,如果函數成功則返回連接的socket,不成功返回-1 21 int unblock_connect(const char* ip, int port, int time) { 22 int ret = 0; 23 struct sockaddr_in address; 24 bzero(&address, sizeof(address)); 25 address.sin_family = AF_INET; 26 inet_pton(AF_INET, ip, &address.sin_addr); 27 address.sin_port = htons(port); 28 29 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 30 int fdopt = setnonblocking(sockfd); 31 ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); 32 if(ret == 0) { 33 printf("connect with server immediately\n"); 34 fcntl(sockfd, F_SETFL, fdopt); 35 return sockfd; 36 } 37 else if(errno != EINPROGRESS) { 38 printf("unblock connect not support\n"); 39 return -1; 40 } 41 fd_set readfds; 42 fd_set writefds; 43 struct timeval timeout; 44 45 FD_ZERO(&readfds); 46 FD_ZERO(&writefds); 47 48 timeout.tv_sec = time; 49 timeout.tv_usec = 0; 50 51 ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout); 52 if(ret <= 0) { 53 printf("connection time out\n"); 54 close(sockfd); 55 return -1; 56 } 57 58 if(!FD_ISSET(sockfd, &writefds)) { 59 printf("no events on sockfd found\n"); 60 close(sockfd); 61 return -1; 62 } 63 64 int error = 0; 65 socklen_t length = sizeof(error); 66 if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) { 67 printf("get socket option failed\n"); 68 close(sockfd); 69 return -1; 70 } 71 72 if(errno != 0) { 73 printf("connection failed after select with the error: %d\n", error); 74 close(sockfd); 75 return -1; 76 } 77 78 printf("connection ready after select with the socket: %d\n", sockfd); 79 fcntl(sockfd, F_SETFL, fdopt); 80 return sockfd; 81 } 82 83 int main(int argc, char** argv) { 84 if(argc <= 2) { 85 printf("usage: %s ip_address port_number\n", basename(argv[0])); 86 return 1; 87 } 88 const char* ip = argv[1]; 89 int port = atoi(argv[2]); 90 int sockfd = unblock_connect(ip, port, 10); 91 if(sockfd < 0) return 1; 92 close(sockfd); 93 return 0; 94 }
很多服務器要一邊處理網絡連接一邊處理用戶輸入,比如聊天室程序,這樣的就可以用I/O復用來實現,我們以一個poll實現的聊天室程序來舉例說明一下:
首先看下客戶端:
1 /************************************************************************* 2 > File Name: 9-6.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sun 04 Feb 2018 08:27:40 PM PST 6 ************************************************************************/ 7 8 #define _GNU_SOURCE 1 9 #include"head.h" 10 using namespace std; 11 12 #define BUFFER_SIZE 64 13 14 int main(int argc, char** argv) { 15 if(argc <= 2) { 16 printf("usage: %s ip_address port_number\n", basename(argv[0])); 17 return 1; 18 } 19 const char* ip = argv[1]; 20 int port = atoi(argv[2]); 21 22 struct sockaddr_in server_address; 23 bzero(&server_address, sizeof(server_address)); 24 server_address.sin_family = AF_INET; 25 server_address.sin_port = htons(port); 26 inet_pton(AF_INET, ip, &server_address.sin_addr); 27 28 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 29 assert(sockfd >= 0); 30 if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { 31 printf("connection failed\n"); 32 close(sockfd); 33 return 1; 34 } 35 pollfd fds[2]; 36 fds[0].fd = 0; 37 fds[0].events = POLLIN; 38 fds[0].revents = 0; 39 fds[1].fd = sockfd; 40 fds[1].events = POLLIN | POLLRDHUP; 41 fds[1].revents = 0; 42 char read_buf[BUFFER_SIZE]; 43 int pipefd[2]; 44 int ret = pipe(pipefd); 45 assert(ret != -1); 46 47 while(1) { 48 ret = poll(fds, 2, -1); 49 if(ret < 0) { 50 printf("poll failure\n"); 51 break; 52 } 53 54 if(fds[1].revents & POLLRDHUP) { 55 printf("server close the connection\n"); 56 break; 57 } 58 else if(fds[1].revents & POLLIN) { 59 memset(read_buf, 0, sizeof(read_buf)); 60 recv(fds[1].fd, read_buf, BUFFER_SIZE, 0); 61 printf("%s\n", read_buf); 62 } 63 64 if(fds[0].revents & POLLIN) { 65 ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); 66 ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); 67 } 68 } 69 close(sockfd); 70 return 0; 71 }
再看下服務器端:
1 /************************************************************************* 2 > File Name: 9-7.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sun 04 Feb 2018 10:35:16 PM PST 6 ************************************************************************/ 7 8 #define _GNU_SOURCE 1 9 #include"head.h" 10 using namespace std; 11 12 #define USER_LIMIT 5 13 #define BUFFER_SIZE 64 14 #define FD_LIMIT 65535 15 16 struct client_data { 17 sockaddr_in address; 18 char* write_buf; 19 char buf[BUFFER_SIZE]; 20 }; 21 22 int setnonblocking(int fd) { 23 int old_option = fcntl(fd, F_GETFL); 24 int new_option = old_option | O_NONBLOCK; 25 fcntl(fd, F_SETFL, new_option); 26 return old_option; 27 } 28 29 int main(int argc, char** argv) { 30 if(argc <= 2) { 31 printf("usage: %s ip_address port_number\n", basename(argv[0])); 32 return 1; 33 } 34 const char* ip = argv[1]; 35 int port = atoi(argv[2]); 36 37 int ret = 0; 38 struct sockaddr_in address; 39 bzero(&address, sizeof(address)); 40 address.sin_family = AF_INET; 41 address.sin_port = htons(port); 42 inet_pton(AF_INET, ip, &address.sin_addr); 43 44 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 45 assert(listenfd >= 0); 46 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 47 assert(ret != -1); 48 49 ret = listen(listenfd, 5); 50 assert(ret != -1); 51 52 client_data* users = new client_data[FD_LIMIT]; 53 pollfd fds[USER_LIMIT + 1]; 54 int user_counter = 0; 55 for(int i = 1; i <= USER_LIMIT; i ++) { 56 fds[i].fd = -1; 57 fds[i].events = 0; 58 } 59 fds[0].fd = listenfd; 60 fds[0].events = POLLIN | POLLERR; 61 fds[0].revents = 0; 62 63 while(1) { 64 ret = poll(fds, user_counter + 1, -1); 65 if(ret < 0) { 66 printf("poll failure\n"); 67 break; 68 } 69 70 for(int i = 0; i < user_counter + 1;i ++) { 71 if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) { 72 struct sockaddr_in client_address; 73 socklen_t client_addrlength = sizeof(client_address); 74 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 75 76 if(connfd < 0) { 77 printf("errno is: %d\n", errno); 78 continue; 79 } 80 81 if(user_counter >= USER_LIMIT) { 82 const char* info = "too many users\n"; 83 printf("%s", info); 84 send(connfd, info, sizeof(info), 0); 85 close(connfd); 86 continue; 87 } 88 89 user_counter ++; 90 users[connfd].address = client_address; 91 setnonblocking(connfd); 92 fds[user_counter].fd = connfd; 93 fds[user_counter].events = POLLIN | POLLERR | POLLRDHUP; 94 fds[user_counter].revents = 0; 95 printf("comes a new user, now have %d users\n", user_counter); 96 } 97 else if(fds[i].revents & POLLERR) { 98 printf("get an error from %d\n", fds[i].fd); 99 char errors[100]; 100 memset(errors, 0, sizeof(errors)); 101 socklen_t length = sizeof(errors); 102 if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) { 103 printf("get socket option failed\n"); 104 } 105 continue; 106 } 107 else if(fds[i].revents &POLLRDHUP) { 108 users[fds[i].fd] = users[fds[user_counter].fd]; 109 close(fds[i].fd); 110 fds[i] = fds[user_counter]; 111 user_counter --; 112 printf("a client left\n"); 113 } 114 else if(fds[i].revents & POLLIN) { 115 int connfd = fds[i].fd; 116 memset(users[connfd].buf, '\0', BUFFER_SIZE); 117 ret = recv(connfd, users[connfd].buf, BUFFER_SIZE, 0); 118 if(ret < 0) { 119 if(errno != EAGAIN) { 120 close(connfd); 121 users[fds[i].fd] = users[fds[user_counter].fd]; 122 fds[i] = fds[user_counter]; 123 user_counter --; 124 i --; 125 } 126 } 127 else if(ret == 0) {} 128 else { 129 for(int j = 1; j <= user_counter; j ++) { 130 if(fds[j].fd == connfd) continue; 131 fds[j].events |= ~POLLIN; 132 fds[j].events |= POLLOUT; 133 users[fds[j].fd].write_buf = users[connfd].buf; 134 } 135 } 136 } 137 else if(fds[i].revents & POLLOUT) { 138 int connfd = fds[i].fd; 139 if(!users[connfd].write_buf) continue; 140 ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0); 141 users[connfd].write_buf = NULL; 142 143 fds[i].events |= ~POLLOUT; 144 fds[i].events |= POLLIN; 145 } 146 } 147 } 148 delete []users; 149 close(listenfd); 150 return 0; 151 }
對於這個程序,其難點只有一個,就是當用戶退出的時候,我們將整個用戶列表的最后一項移動到退出用戶的這一項,可以保證前user_counter項都是有客戶的。但是對於執行完POLLIN事件后,我們取消注冊了POLLIN,轉而注冊了POLLOUT,在我的理解下可能有些問題。我們考慮一個極端情況,如果取消注冊了POLLIN,注冊POLLOUT后,主線程轉而處理下一個poll事件,但此時剛剛處理過的文件描述符上由發來了數據,而由於此時POLLIN事件已經被取消注冊,那么就不會在下一輪輪詢中被觸發POLLIN。 我將代碼中下一步的else if改為了if,這樣判斷完POLLIN事件后會在判斷一次POLLOUT事件,經測試沒有問題。但是仍有待商榷,暫留疑問待日后解決。
最后,I/O復用還可以用於同時監聽TCP和UDP服務:
1 /************************************************************************* 2 > File Name: 9-8.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Tue 06 Feb 2018 03:23:19 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define TCP_BUFFER_SIZE 512 13 #define UDP_BUFFER_SIZE 1024 14 15 int setnonblocking(int fd) { 16 int old_option = fcntl(fd, F_GETFL); 17 int new_option = old_option | O_NONBLOCK; 18 fcntl(fd, F_SETFL, new_option); 19 return old_option; 20 } 21 22 void addfd(int epollfd, int fd) { 23 epoll_event event; 24 event.data.fd = fd; 25 event.events = EPOLLIN | EPOLLET; 26 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 27 setnonblocking(fd); 28 } 29 30 int main(int argc, char** argv) { 31 if(argc <= 2) { 32 printf("usage: %s ip_address port_number\n", basename(argv[0])); 33 return 1; 34 } 35 const char* ip = argv[1]; 36 int port = atoi(argv[2]); 37 38 int ret = 0; 39 struct sockaddr_in address; 40 bzero(&address, sizeof(address)); 41 address.sin_family = AF_INET; 42 inet_pton(AF_INET, ip, &address.sin_addr); 43 address.sin_port = htons(port); 44 45 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 46 assert(listenfd >= 0); 47 48 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 49 assert(ret != -1); 50 51 ret = listen(listenfd, 5); 52 assert(ret != -1); 53 54 bzero(&address, sizeof(address)); 55 address.sin_family = AF_INET; 56 inet_pton(AF_INET, ip, &address.sin_addr); 57 address.sin_port = htons(port); 58 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); 59 assert(udpfd >= 0); 60 61 ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address)); 62 assert(ret != -1); 63 epoll_event events[MAX_EVENT_NUMBER]; 64 int epollfd = epoll_create(5); 65 assert(epollfd != -1); 66 67 addfd(epollfd, listenfd); 68 addfd(epollfd, udpfd); 69 70 while(1) { 71 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 72 if(number < 0) { 73 printf("epoll failure\n"); 74 break; 75 } 76 for(int i = 0; i < number; i ++) { 77 int sockfd = events[i].data.fd; 78 if(sockfd == listenfd) { 79 struct sockaddr_in client_address; 80 socklen_t client_addrlength = sizeof(client_address); 81 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 82 addfd(epollfd, connfd); 83 } 84 else if(sockfd == udpfd) { 85 char buf[UDP_BUFFER_SIZE]; 86 memset(buf, 0, UDP_BUFFER_SIZE); 87 struct sockaddr_in client_address; 88 socklen_t client_addrlength = sizeof(client_address); 89 90 ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, &client_addrlength); 91 if(ret > 0) { 92 sendto(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, client_addrlength); 93 } 94 } 95 else if(events[i].events & EPOLLIN) { 96 char buf[TCP_BUFFER_SIZE]; 97 while(1) { 98 memset(buf, 0, TCP_BUFFER_SIZE); 99 ret = recv(sockfd, buf, TCP_BUFFER_SIZE, 0); 100 if(ret < 0) { 101 if((errno == EAGAIN) || (errno == EWOULDBLOCK)) break; 102 close(sockfd); 103 break; 104 } 105 else if(ret == 0) { 106 close(sockfd); 107 } 108 else send(sockfd, buf, ret, 0); 109 } 110 } 111 } 112 } 113 close(listenfd); 114 return 0; 115 }