Linux網絡編程-IO復用技術


IO復用是Linux中的IO模型之一,IO復用就是進程預先告訴內核需要監視的IO條件,使得內核一旦發現進程指定的一個或多個IO條件就緒,就通過進程進程處理,從而不會在單個IO上阻塞了。Linux中,提供了select、poll、epoll三種接口函數來實現IO復用。

 

1、select函數

#include <sys/select.h>  
#include <sys/time.h>  
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);  
    // 返回:若有就緒描述符則為其個數,超時為0,出錯-1  

nfds參數指定了被監聽文件描述符的個數,通常設置為監聽的所有描述符最大值加1,因為文件描述符是從0開始的。readfs、writefds和exceptfds分別對應可讀、可寫和異常等事件文件描述符集合,當調用select時,通過這3個參數傳入自己感興趣的文件描述符,select函數返回后,內核通過修改他們來通知應用程序那些文件描述符已經就緒。

fd_set結構體包含一個整形數組,該數組中每一個元素的每一位標記一個文件描述符,fd_set容納的文件描述符數量由FD_SETSIZE指定,這就限制了select能同時處理的文件描述符最大個數。通過一些宏來操作fd_set結構體中的位:

#include <sys/select.h>  
FD_ZERO(fd_set *fdset);     /* 清除fdset所有標志位 */  
FD_SET(int fd, fd_set fdset);       /* 設置fdset標志位fd */  
FD_CLR(int fd, fd_set fdset);       /* 清除fdset標志位fd */  
int FD_ISSET(int fd, fd_set *fdset);    /* 測試fdset的位fd是否被設置 */  

timeout參數用來設置select的超時時間,它是一個timeval結構類型指針,采用指針參數是應為內核將修改它以告訴應用程序select等待了多久。不過我們不能完全信任select調用返回的timeout值,比如調用失敗后timeout的值是不確定的。

struct timeval  
{  
    long tv_sec;    //秒數  
    long tv_usec;   //微秒數  
};  

select提供了一個微妙的定時方案,如果給timeval的成員都賦值0,則select將立即返回;如果timeout為NULL,則select將一直阻塞,直到某個文件描述符就緒。select成功時返回就緒的文件描述符的總數,如果在超時時間內沒有任何描述符就緒,select返回0,select失敗返回-1並設置errno。如果在select等待期間,程序收到信號,則select立即返回-1,並設置errno為EINTR。

select缺點:

  • 每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大
  • 每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
  • select支持的文件描述符數量太小了,默認是1024

select測試用例:

#include <iostream>
#include <vector>

#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>

using namespace std;

int getMaxNumOfVector(vector<int> &fds);
vector<int> flipVector(vector<int> &fds);

int main(int argc, char **argv)
{
    vector<int> fds;
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    fd_set read_fd;
    fd_set write_fd;
    fd_set except_fd;
    char buff[1024];
    
    FD_ZERO(&read_fd);
    FD_ZERO(&write_fd);
    FD_ZERO(&except_fd);
    
    fds.push_back(STDIN_FILENO);
    fds.push_back(listenfd);
    
    bool running = true;
    while (running) {
        buff[0] = '\0';
        
        /**
         * 每次調用select都要重新初始化read_fd和except_fd中的文件描述符集
         */
        for (int i = 0; i < fds.size(); i++) {
            FD_SET(fds[i], &read_fd);
            if ((fds[i] != STDIN_FILENO) && (fds[i] != listenfd)) {
                //FD_SET(fds[i], &write_fd);
                FD_SET(fds[i], &except_fd);
            }
        }
        
        int event_num = select(getMaxNumOfVector(fds) + 1, &read_fd, &write_fd, &except_fd, NULL);
        if (event_num < 0) {
            cerr << "select error" << endl;
            break;
        }
        
        for (int i = 0; i < fds.size(); i++) {
            if (fds[i] == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (FD_ISSET(STDIN_FILENO, &read_fd)) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (fds[i] == listenfd) {
                if (FD_ISSET(listenfd, &read_fd)) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    fds.push_back(connfd);
                    cout << "往fds添加 " << connfd << ", fds.size: " << fds.size() << endl;
                }
            }
            else {
                if (FD_ISSET(fds[i], &read_fd)) {
                    int len = recv(fds[i], buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從fds刪除 " << fds[i] << endl;
                        // 客戶端斷開了連接
                        fds[i] = -1;
                    }
                    else {
                        buff[len] = '\0';
                        cout << fds[i] << " recv: " << buff << endl;
                    }
                }
                else if (FD_ISSET(fds[i], &write_fd)) {
                    
                }
                else if (FD_ISSET(fds[i], &except_fd)) {
                    
                }
            }
        }
        
        fds = flipVector(fds);
    }
    
    // 關閉文件描述符
    for (int i = 0; i < fds.size(); i++) {
        close(fds[i]);
    }
    close(listenfd);

    return 0;
}

int getMaxNumOfVector(vector<int> &fds)
{
    int result = 0;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i] > result) {
            result = fds[i];
        }
    }
    
    return result;
}

vector<int> flipVector(vector<int> &fds) {
    vector<int> fdsnew;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i] != -1) {
            fdsnew.push_back(fds[i]);
        }
    }
    
    return fdsnew;
}

 

poll函數

#include <poll.h>  
int poll(struct pollfd *fds, nfds_t nfds, int timeout);  
    // 返回:若有就緒描述符則為其數目,超時為0,出錯-1 

poll系統調用和select類似,也是在一定時間內輪詢一定數量的文件描述符,測試是否有就緒者。nfds參數指定被監聽事件集合fds的大小,timeout指定poll的超時值,單位為毫秒,當timeout為-1時,poll調用將一直阻塞,直到某個事件發生;當timeout為0時,poll調用馬上返回。

pollfd結構體

struct pollfd  
{  
    int fd;         /* 文件描述符 */  
    short events;       /* 注冊的事件 */  
    short revents;      /* 實際發生的事件,有內核填充 */  
};  

poll支持的事件類型:

poll函數測試用例,監聽多個socket連接和終端輸入,當在終端中輸入quit時退出程序:

#include <iostream>
#include <vector>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>

using namespace std;

vector<pollfd> flipVector(vector<pollfd> &fds);
struct pollfd *getPollfd(vector<pollfd> &fds, int *ppoll_size);

int main(int argc, char **argv)
{
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    struct pollfd poll_fd;
    vector<struct pollfd> fds;
    
    poll_fd.fd = STDIN_FILENO;
    poll_fd.events = POLLIN;
    fds.push_back(poll_fd);
    
    poll_fd.fd = listenfd;
    poll_fd.events = POLLIN;
    fds.push_back(poll_fd);
    
    char buff[1024];
    struct pollfd *ppoll = NULL;
    int poll_size = 0;
    
    ppoll = getPollfd(fds, &poll_size);
    
    bool running = true;
    while (running) {
        int oldSize = fds.size();
        buff[0] = '\0';
        
        int event_num = poll(ppoll, poll_size, -1);
        if (event_num < 0) {
            cerr << "select error" << endl;
            break;
        }
        
        int fds_size = fds.size();
        for (int i = 0; i < fds_size; i++) {
            if (ppoll[i].fd == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (ppoll[i].revents & POLLIN) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (ppoll[i].fd == listenfd) {
                if (ppoll[i].revents & POLLIN) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    poll_fd.fd = connfd;
                    poll_fd.events = POLLIN;
                    fds.push_back(poll_fd);
                    cout << "往fds添加 " << connfd << ", fds.size: " << fds.size() << endl;
                }
            }
            else {
                if (ppoll[i].revents & POLLIN) {
                    int len = recv(ppoll[i].fd, buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從fds刪除 " << fds[i].fd << endl;
                        // 客戶端斷開了連接
                        fds[i].events = 0;
                        fds[i].fd = -1;
                    }
                    else {
                        buff[len] = '\0';
                        cout << fds[i].fd << " recv: " << buff << endl;
                    }
                }
            }
        }
        
        fds = flipVector(fds);
        if (oldSize != fds.size()) {
            free(ppoll);
            ppoll = getPollfd(fds, &poll_size);
        }
    }
    
    // 關閉文件描述符
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i].fd != -1) {
            close(fds[i].fd);
        }
    }
    close(listenfd);

    return 0;
}

struct pollfd *getPollfd(vector<pollfd> &fds, int *ppoll_size)
{
    struct pollfd *poll = (struct pollfd *) malloc(fds.size() * sizeof(struct pollfd));
    for (int i = 0; i < fds.size(); i++) {
        poll[i].fd = fds[i].fd;
        poll[i].events = fds[i].events;
    }
    
    *ppoll_size = fds.size();
    return poll;
}

vector<pollfd> flipVector(vector<pollfd> &fds) {
    vector<pollfd> fdsnew;
    
    for (int i = 0; i < fds.size(); i++) {
        if (fds[i].fd != -1) {
            fdsnew.push_back(fds[i]);
        }
    }
    
    return fdsnew;
}

 

epoll系列函數

epoll是Linux特有的IO復用函數,它在實現和使用上與select和poll有很大差異,首先,epoll使用一組函數來完成操作,而不是單個函數。其次,epoll把用戶關心的文件描述符上的事件放在內核上的一個事件表中,從而無須像select和poll那樣每次調用都要重復傳入文件描述符集合事件表。但epoll需要使用一個額外的文件描述符,來唯一標識內核中這個事件表,這個文件描述符使用如下epoll_create函數創建:

#include <sys/epoll.h>  
int epoll_create(int size);  
    // 返回:成功返回創建的內核事件表對應的描述符,出錯-1  

size參數現在並不起作用,只是給內核一個提示,告訴它內核表需要多大,該函數返回的文件描述符將用作其他所有epoll函數的第一個參數,以指定要訪問的內核事件表。用epoll_ctl函數操作內核事件表

#include <sys/epoll.h>  
int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event);  

返回:成功返回0,出錯-1

         fd參數是要操作的文件描述符,op指定操作類型,操作類型有3種

  • EPOLL_CTL_ADD:往事件表中注冊fd上的事件
  • EPOLL_CTL_MOD:修改fd上的注冊事件
  • EPOLL_CTL_DEL:刪除fd上的注冊時間

event指定事件類型,它是epoll_event結構指針類型

struct epoll_event  
{  
    __uint32_t events;  /* epoll事件 */  
    epoll_data_t data;  /* 用戶數據 */  
};  

其中events描述事件類型,epoll支持的事件類型和poll基本相同,表示epoll事件類型的宏是在poll對應的宏加上”E”,比如epoll的數據可讀事件是EPOLLIN,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT,它們對於高效運作非常關鍵,data用於存儲用戶數據,其類型epoll_data_t定義如下:

typedef union epoll_data  
{  
    void *ptr;  
    int fd;  
    uint32_t u32;  
    uint64_t u64;  
}epoll_data_t;  

epoll_data_t是一個聯合體,其4個成員最多使用的是fd,它指定事件所從屬的目標文件描述符,ptr成員可用來指定fd相關的用戶數據,但由於opoll_data_t是一個聯合體,我們不能同時使用fd和ptr,如果要將文件描述符嗯哼用戶數據關聯起來,以實現快速的數據訪問,則只能使用其他手段,比如放棄使用fd成員,而在ptr指針指向的用戶數據中包含fd。

#include <sys/epoll.h>  
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);  
    // 返回:成功返回就緒的文件描述符個數,出錯-1  

timeout參數的含義與poll接口的timeout參數相同,maxevents參數指定最多監聽多少個事件,它必須大於0。

epoll_wait如果檢測到事件,就將所有就緒的事件從內核事件表(由epfd指定)中復制到events指定的數組中,這個數組只用來輸epoll_wait檢測到的就緒事件,而不像select和poll的參數數組既傳遞用於用戶注冊的事件,有用於輸出內核檢測到就緒事件,這樣極大提高了應用程序索引就緒文件描述符的效率。

epoll函數測試用例,監聽多個socket連接和終端輸入,當在終端中輸入quit時退出程序:

#include <iostream>

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <fcntl.h>  
#include <string.h> 

using namespace std;

void addfd(int epollfd, int fd)
{
    epoll_event event;
    
    event.data.fd = fd;
    event.events = EPOLLIN;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

void delfd(int epollfd, int fd)
{
    epoll_event event;
    
    event.data.fd = fd;
    event.events = EPOLLIN;
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event);
}

int main(int argc, char **argv)
{
    int listenfd, connfd;
    struct sockaddr_in servaddr;
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 5);
    
    int epollfd = epoll_create(32);
    if (epollfd < 0) {
        cerr << "epoll_create error" << endl;
        exit(-1);
    }
    
    addfd(epollfd, STDIN_FILENO);
    addfd(epollfd, listenfd);
    
    epoll_event events[32];
    
    char buff[1024];
    bool running = true;
    while (running) {
        buff[0] = '\0';
        
        int event_num = epoll_wait(epollfd, events, 32, -1);
        if (event_num < 0) {
            cerr << "epoll_wait error" << endl;
            break;
        }
        
        for (int i = 0; i < event_num; i++) {
            int fd = events[i].data.fd;
            int event = events[i].events;
            
            if (fd == STDIN_FILENO) {
                // 從STDIN_FILENO中讀取數據
                if (event & EPOLLIN) {
                    cin >> buff;
                    if (strcmp(buff, "quit") == 0) {
                        running = false;
                        break;
                    }
                    else {
                        cout << buff << endl;
                    }
                }
            }
            else if (fd == listenfd) {
                if (event & EPOLLIN) {
                    connfd = accept(listenfd, NULL, NULL);
                    if (connfd < 0) {
                        running = false;
                        break;
                    }
                    
                    addfd(epollfd, connfd);
                    cout << "往epoll添加 " << connfd << endl;
                }
            }
            else {
                if (event & EPOLLIN) {
                    int len = recv(fd, buff, sizeof(buff) - 1, 0);
                    if (len < 0) {
                        cerr << "recv error" << endl;
                    }
                    else if (len == 0) {
                        cout << "從epoll刪除 " << fd << endl;
                        // 客戶端斷開了連接
                        delfd(epollfd, fd);
                    }
                    else {
                        buff[len] = '\0';
                        cout << fd << " recv: " << buff << endl;
                    }
                }
            }
        }
    }
    
    // 關閉文件描述符
    close(listenfd);

    return 0;
}

 

參考:

  1、《UNIX網絡編程》IO復用章節

  2、 網絡編程API-下 (I/O復用函數)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM