第15章 高並發服務器編程(2)_I/O多路復用


3. I/O多路復用:select函數

3.1 I/O多路復用簡介

(1)通信領域的時分多路復用

 

(2)I/O多路復用(I/O multiplexing)

  ①同一線程,通過“撥開關”方式,來同時處理多個I/O流,哪個IO准備就緒就把開關撥向它。(I/O多路復用類似於通信領域中的“時分復用”

  ②通過select/poll函數可以實現IO多路復用,他們采用輪詢的方式來監視I/O。而epoll是對select/poll的加強,它是基於事件驅動,epoll_ctl注冊事件並注冊callback回調函數,epoll_wait只返回發生的事件避免了像select/poll對事件的整個輪詢操作。

  ③I/O多路復用避免阻塞在I/O上,使原本為多進程或多線程來接收多個連接的消息變為單進程或單線程保存多個socket的狀態后輪詢處理。

(3)I/O復用模型

 

  ①進程阻塞於select調用,等待數據報socket可讀。當select返回socket可讀條件時,可以再調用recvfrom將數據報拷貝到應用緩沖區中。

  ②使用了系統調用select,要求兩次系統調用(另一次為recvfrom),好象使得比單純使用recvfrom效率更得更差。但實際上使用select給我們帶來的好處卻是讓我們可以同時等待多個socket准備好。

3.2 select函數

(1)select函數

頭文件

#include <sys/types.h>

#include <sys/time.h>

#include <unistd.h>

函數

int select(int maxfdp1, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);

參數

(1)maxfdp1:最大fd加1(max fd plus 1),在三個描述符集中找出最高描述符編號值,然后加1,這就是第一個參數值。

(2)readfds、writefds和exceptfds:是指向描述符集的指針。這三個描述符集說明了我們關心的可讀、可寫或處理異常條件的各個描述符。每個描述符集存放在一個fd_set數據類型中。

(3)timeval結構體:指定願意等待的時間。NULL:表示永遠等待,直到列表中的某個套接字就緒才返回。如果timeout中的時間設置為0,表示不等待,測試所有指定的描述符並立即返回其他具體值表示等待的時間

struct timeval{
    long tv_set;   //
    long tv_usec;  //微秒
};

返回值

准備就緒的描述符數,若超時則為0,若出錯則為-1

功能

確定一個或多個套接口的狀態,如果有必要,則會等待。

(2)select的作用

 

備注

傳給select的參數告訴內核

①我們所關心的socket

②對於每個socket,我們所關心的條件(是否可讀一個socket,是否可寫一個socket,是否關心一個socket的異常

③希望等待多長時間(可以永遠等待或等待一個固定量時間,或完全不等待)

從select返時時內核告訴我們

①己准好的socket的數量

哪一個socket己准備好讀、寫或異常條件

③使用這種返回值,就可調用相應的I/O函數(一般是read/write),並且確知該函數不會阻塞因為socket返回就說明要等待的條件己經滿足,就可以直接處理而不必阻塞等待了。

(3)處理文件描述符集(socket集)的四個宏

作用

FD_ZERO(fd_set* set)

清除一個文件描述符集

FD_SET(int fd, fd_set* set)

將一個文件描述符加入到fd_set中

FD_CLR(int fd, fd_set* set)

將一個fd從fd_set中清除

FD_ISSET(int fd, fd_set* set)

測試fd_set中的一個給定fd是否有變化

備注:

(1)在使用select函數之前,首先使用FD_ZERO和FD_SET來初始化fd_set,並在使用select函數時,可循環使用FD_ISSET測試fd_set。

【編程實驗】echo服務器(利用I/O多路復用方式實現)

 

//vector_fd.h(與上一例相同)

#ifndef __VECTOR_H__
#define __VECTOR_H__

#include <pthread.h>

//用於存放sock的動態數組(線程安全!)
typedef struct{
    int     *fd;
    int     counter;    //元素個數
    int     max_counter;//最多存數個數,會動態增長
    pthread_mutex_t mutex; 
}VectorFD, *PVectorFD;

//動態數組相關的操作函數
extern  VectorFD*  create_vector_fd(void);
extern  void       destroy_vector_fd(VectorFD* vfd);
extern  int        get_fd(VectorFD* vfd, int index);
extern  void       remove_fd(VectorFD* vfd, int fd);
extern  void       add_fd(VectorFD* vfd, int fd);

#endif

//vector_fd.c  //動態數組操作函數(與上一例相同)

#include "vector_fd.h"
#include <memory.h>
#include <malloc.h>
#include <assert.h>

//查找指定fd在數組中的索引值
static int indexof(VectorFD* vfd, int fd)
{
    int ret = -1;

    int i=0;
    for(; i<vfd->counter; i++){
        if(vfd->fd[i] == fd){
            ret = i;
            break;
        }
    }

    return ret;
}

//數組空間的動態增長
static void encapacity(VectorFD* vfd)
{
    if(vfd->counter >=vfd->max_counter){
        int* fds = (int*)calloc(vfd->counter + 5, sizeof(int));
        assert(fds != NULL);
        memcpy(fds, vfd->fd, sizeof(int) * vfd->counter);
        
        free(vfd->fd);
        vfd->fd = fds;
        vfd->max_counter += 5;
    }
}

//動態數組相關的操作
VectorFD*  create_vector_fd(void)
{
    VectorFD* vfd = (VectorFD*)calloc(1, sizeof(VectorFD));
    assert(vfd != NULL);
    
    //分配存放fd的數組空間
    vfd->fd = (int*)calloc(5, sizeof(int));
    assert(vfd->fd != NULL);

    vfd->counter = 0;
    vfd->max_counter = 0;

    //對互斥鎖進行初始化
    pthread_mutex_init(&vfd->mutex, NULL);

    return vfd;
}

void  destroy_vector_fd(VectorFD* vfd)
{
    assert(vfd != NULL);
    //銷毀互斥鎖
    pthread_mutex_destroy(&vfd->mutex);
    
    free(vfd->fd);
    free(vfd);
}

int  get_fd(VectorFD* vfd, int index)
{
    int ret = 0;
    assert(vfd != NULL);
    
    pthread_mutex_lock(&vfd->mutex);

    if((0 <= index) && (index < vfd->counter)){
        ret = vfd->fd[index];
    }

    pthread_mutex_unlock(&vfd->mutex);

    return ret;
}

void  remove_fd(VectorFD* vfd, int fd)
{
    assert(vfd != NULL);
    
    pthread_mutex_lock(&vfd->mutex);

    int index = indexof(vfd, fd);

    if(index >= 0){
        int i = index;
        for(; i<vfd->counter-1; i++){
             vfd->fd[i] = vfd->fd[i+1];   
        }
        
        vfd->counter--;
    }
   
    pthread_mutex_unlock(&vfd->mutex);
}

void  add_fd(VectorFD* vfd, int fd)
{
    assert(vfd != NULL);
    
    encapacity(vfd);
    vfd->fd[vfd->counter++] = fd;
}

//echo_tcp_server_select.c(與上一例相同)

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#include "vector_fd.h"
#include <fcntl.h>

/*基於I/O多路復用的高並發服務器編程
測試:telnet 127.0.0.1 xxxx 
      http://xxx.xxx.xxx.xxx:端口號
注意:演示時可關閉服務器的防火牆,防火牆口被過濾
      #service iptables status     查看防火牆
      #service iptables stop       關閉防火牆
*/

VectorFD* vfd;
int sockfd;
int bStop = 0;

void sig_handler(int signo)
{
    if(signo == SIGINT){
        bStop = 1;
        printf("server close\n");
        
        close(sockfd);
        destroy_vector_fd(vfd);
        
        exit(1);
    }
}

void out_addr(struct sockaddr_in* clientAddr)
{
    char ip[16];
    memset(ip, 0, sizeof(ip));
    int port = ntohs(clientAddr->sin_port);
    inet_ntop(AF_INET, &clientAddr->sin_addr.s_addr, ip, sizeof(ip));

    printf("%s(%d) connnected!\n", ip, port);
}

/*服務程序
 *  fd對應於某個連接的客戶端,和某一個連接的客戶端進行雙向通信
 */
void do_service(int fd)
{
    /*服務端和客戶端進行讀寫操作(雙向通信)*/
    char buff[512];
    
    memset(buff, 0, sizeof(buff));
    size_t size = read(fd, buff, sizeof(buff));

    //讀取客戶端發送過來的消息
    //若讀不到數據直接返回了,直接服務於下一個客戶端
    //因此不需要判斷size小於0的情況。
    if(size == 0){  //客戶端己關閉連接
        char info[] = "client close\n";
        write(STDOUT_FILENO, info, sizeof(info));

        //將fd從動態數組中刪除
        remove_fd(vfd, fd);
        close(fd);
    }else if(size > 0){
        write(STDOUT_FILENO, buff, sizeof(buff));//顯示客戶端發送的消息
        //寫回客戶端(回顯功能)
        if(write(fd, buff, sizeof(buff)) != size){
            if(errno == EPIPE){
                //如果客戶端己被關閉(相當於管道的讀端關閉),會產生SIGPIPE信號
                //並將errno設置為EPIPE
                perror("write error");
                remove_fd(vfd, fd);
                close(fd);   
            }
        }
    }
}

//遍歷動態數組中所有的socket描述符,並將之加入到fd_set中。
//同時此函數返回動態數組中最大的那個描述符
static int add_set(fd_set* set)
{
    FD_ZERO(set);  //清空描述符集
    int max_fd = vfd->fd[0];
    
    int i=0;
    for(; i<vfd->counter; i++){
        int fd = get_fd(vfd, i);
        if(fd > max_fd)  
            max_fd = fd;
        FD_SET(fd, set); //將fd加入到fd_set中
    }

    return max_fd;
}

//線程函數
void* th_fn(void* arg)
{
    struct timeval t;
    t.tv_sec = 2;
    t.tv_usec = 0;
    int n = 0; //返回select返回的准備好的socket數量
    int maxfd; //所有socket描述符的最大值
    fd_set set;
    maxfd = add_set(&set);
    /*
     * 調用select函數會阻塞,委托內核去檢查傳入的描述符集是否有socket己准備好,
     * 若有,則返回准備好的socket數量,超時則返回0
     * 第1個參數為fd_set中socket的范圍(最大描述符+1)
     */
    while(((n = select(maxfd + 1, &set, NULL, NULL, &t)) >=0) && (!bStop)){
        if(n > 0){
            int i = 0;
            //檢測哪些socket准備好,並和這些准備好的socket對應的客戶端進行雙向通信
            for(; i<vfd->counter; i++){
                int fd = get_fd(vfd, i);
                if(FD_ISSET(fd, &set)){
                    do_service(fd);
                }
            }
        }

        //重新設置時間
        t.tv_sec = 2;
        t.tv_usec = 0;
        
        //清空描述符集
        //重新遍歷動態數組中最新的描述符,並放置到fd_set
        maxfd = add_set(&set);
    }

    return (void*)0;
}

int main(int argc, char* argv[])
{
    if(argc < 2){
        printf("usage: %s port\n", argv[0]);
        exit(1);
    }

    //按ctrl-c時中止服務端程序
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }

    /*步驟1:創建socket(套接字)
     *注:socket創建在內核中,是一個結構體
     *AF_INET:IPv4
     *SOCK_STREAM:tcp協議
     */
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    /*步驟2:將sock和地址(包括ip、port)進行綁定*/
    struct sockaddr_in servAddr; //使用專用地址結構體
    memset(&servAddr, 0, sizeof(servAddr));
    //往地址中填入ip、port和Internet地址族類型
    servAddr.sin_family = AF_INET;//IPv4
    servAddr.sin_port = htons(atoi(argv[1])); //port
    servAddr.sin_addr.s_addr = INADDR_ANY; //任一可用的IP

    if(bind(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) <0 ){
        perror("bind error");
        exit(1);
    }

    /*步驟3:調用listen函數啟動監聽
     *       通知系統去接受來自客戶端的連接請求
     */
    if(listen(sockfd, 10) < 0){  //隊列中最多允許10個連接請求
        perror("listen error");
        exit(1);
    }

    //創建放置套接字描述符的動態數組
    vfd = create_vector_fd();

    //設置線程的分離屬性
    pthread_t  th;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //啟動子線程
    int err;
    if((err = pthread_create(&th, &attr, th_fn, (void*)0)) != 0){
        perror("pthread create error");
        exit(1);
    }
    pthread_attr_destroy(&attr);
    
    /*(1)主線程獲得客戶端連接,將新的socket描述符放置到動態數組中
     *(2)子線程的任務
         A.調用select委托內核去檢查傳入到select中的描述符是否准備好
     *   B.利用FD_ISSET來找出准備好的那些描述符並和對應的客戶端進行雙向通信
     */

    struct sockaddr_in clientAddr;
    socklen_t len = sizeof(clientAddr);

    while(!bStop){
        /*步驟4:調用accept函數,從請求隊列中獲取一個連接
         *       並返回新的socket描述符
         * */
        int fd = accept(sockfd, (struct sockaddr*)&clientAddr, &len);
       
        if(fd < 0){
            perror("accept error");
            continue;
        }
        

        //輸出客戶端信息
        out_addr(&clientAddr);
        
        //將返回的新socket描述符加入到動態數組中
        add_fd(vfd, fd);
    }

    close(sockfd);
    destroy_vector_fd(vfd);
    return 0;
}
/*輸出結果
 * [root@localhost 15.AdvNet]# gcc -o bin/echo_tcp_server_select -Iinclude bin/vector_fd.o src/echo_tcp_server_select.c -lpthread           
 * [root@localhost 15.AdvNet]# bin/echo_tcp_server_select 8888
 * 127.0.0.1(40695) connnected!
 * abcdefaaabbbcdef^Cserver close
 */

//echo_tcp_client.c(與上一例相同)

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>

int main(int argc, char* argv[])
{
    if(argc < 3){
        printf("usage: %s ip port\n", argv[0]);
        exit(1);
    }

    /*步驟1: 創建socket(套接字)*/
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        perror("socket error");
    }

    //往servAddr中填入ip、port和地址族類型
    struct sockaddr_in servAddr;
    memset(&servAddr, 0, sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_port = htons(atoi(argv[2]));
    //將ip地址轉換成網絡字節序后填入servAdd中
    inet_pton(AF_INET, argv[1], &servAddr.sin_addr.s_addr);

    /*步驟2: 客戶端調用connect函數連接到服務器端*/
    if(connect(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0){
        perror("connect error");
        exit(1);
    }

    /*步驟3: 調用自定義的協議處理函數和服務端進行雙向通信*/
    char buff[512];
    size_t size;
    char* prompt = ">";

    while(1){
        memset(buff, 0, sizeof(buff));
        write(STDOUT_FILENO, prompt, 1);
        size = read(STDIN_FILENO, buff, sizeof(buff));
        if(size < 0) continue;

        buff[size-1] = '\0';
        //將鍵盤輸入的內容發送到服務端
        if(write(sockfd, buff, sizeof(buff)) < 0){
            perror("write error");
            continue;
        }else{
            memset(buff, 0, sizeof(buff));
            //讀取來自服務端的消息
            if(read(sockfd, buff, sizeof(buff)) < 0){
                perror("read error");
                continue;
            }else{
                printf("%s\n", buff);
            }
        }
    }

    /*關閉套接字*/
    close(sockfd);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM