端口復用與驚群效應


端口復用與驚群效應

 

 

REUSEADDR

假設同一個機器上有2個套接字,分別bind到 ip1:port1、ip2:port2,如果 port1 == port2,則第二個bind的套接字會有"Address already in use"的錯誤。

為了允許多個套接字綁定到同一個port上,可以打開SO_REUSEADDR選項,如下例子

#include "stdio.h"
#include "stdlib.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <string>
#include <iostream>

int bindSocket(char* ip, short port) {
    int nfd = socket(AF_INET, SOCK_STREAM, 0);
    if (nfd < 0) {
        perror("socket error ");
        return -1;
    }

    const int one = 1;
    setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int));

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind error ");
        return -1;
    }

    listen(nfd, 1024);struct sockaddr_in addr2;
    socklen_t addrlen = 0;
    memset(&addr2, 0, sizeof(addr2));
    if (accept(nfd, (struct sockaddr*)&addr2, &addrlen) < 0) {
        perror("accept error ");
    }

    return nfd;
}

int main() {

    int pid = fork();
    if (pid == 0) { // child
        bindSocket("127.0.0.1", 7801);
        return 0;

    } else if (pid < 0) {
        perror("fork error");
    }

    bindSocket("0.0.0.0", 7801);

    int res = 0;
    wait(&res);

    return 0;
}

例子中,父進程bind到0.0.0.0:7801,子進程bind到127.0.0.1:7801,它們都可以bind成功。

注意,這里兩個套接字bind的ip不一樣(一個是127.0.0.1,一個是0.0.0.0),如果ip和port都一樣,即使打開SO_REUSEADDR選項也會有沖突。

SO_REUSEADDR 還有一個作用,根據TCP協議,服務端如果主動關閉連接,會進入TIME_WAIT狀態,在該狀態下,如果又有套接字要bind到同一個IP:Port,也會有錯誤,但在開啟SO_REUSEADDR時,就可以bind成功。

 

 

REUSEPORT

前面提到的reuseaddr 允許多個套接字綁定到同一個port(但ip不能相同),而reuseport允許將多個套接字bind在同一個IP+Port 對上。

那么當來了一個連接時,要由哪個套接字來處理它呢?reuseport分為兩種模式:

  • 熱備份模式,在Linux 3.9內核引入,實際工作的套接字只有一個,其它的作為備份,只有當前一個套接字不再可用的時候,才會由后一個來取代,其投入工作的順序取決於實現;
  • 負載均衡模式,(在3.9內核之后),用數據包的源IP/源端口作為一個HASH函數的輸入計算由哪個套接字來處理,所以同一個客戶端的連接總是被分發到同一個套接字;

對於負載均衡模式,考慮是否存在這樣的問題,對於UDP而言,比如一個事務中需要交互4個數據包,第1個數據包的元組HASH結果索引到了線程1的套接字,它理所當然被線程1處理,在第2個數據包到達之前,線程1掛了,那么該線程的套接字的位置將會被別的線程,比如線程2的套接字取代!在第2個數據包到達的時候,將會由線程2的套接字來處理之,然而線程2並不知道線程1保存的關於此連接的事務狀態。

 

再討論下reuseport的實現原理,對於 3.9 <= linux < 4.5 版本的內核,共享同一個port的套接字以鏈表的形式組織起來,如下圖所示,

假設服務端建立了4個Server(A、B、C、D),監聽的IP和port如圖;

其中A和B使用了reuseport,比如說A有4個線程監聽了0.0.0.0:21,而B有2個線程監聽了192.168.10.1:21;

沖突鏈以port為key,因此A、B、D掛在同一條沖突鏈上;

如果此時客戶端請求了192.168.10.1:21,那么內核會遍歷listening_hash[0],為上面7個套接字打分,由於B監聽的精准地址,所以得分會更高,內核會在sk_B0和sk_B1之間做選擇。

從上面的例子可以看出,內核需要遍歷沖突鏈,給監聽該port上的所有socket打分,性能會有不足之處。

在 linux >= 4.5 版本的內核中進一步優化了這個問題,引入了reuseport group,它將bind到同一個ip:port的套接字進行分組,

這樣當要查找目標地址為192.168.10.1:21的套接字時,可以直接在socketB  reuseport group中查找,而不用再遍歷整個沖突鏈。

 

注意,這個group特性在4.5版本中只支持了UDP協議,TCP要到4.6版本才支持。

 

 

再來看 reuseport 使用的一個例子,父、子進程都監聽127.0.0.1:7801端口,

#include <string.h>
#include "stdio.h"
#include "stdlib.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <string>
#include <iostream>


//using namespace std;

int bindSocket(char* ip, short port) {
    int nfd = socket(AF_INET, SOCK_STREAM, 0);
    if (nfd < 0) {
        perror("socket error ");
        return -1;
    }

    const int one = 1;
    setsockopt(nfd, SOL_SOCKET, SO_REUSEPORT, (char *)&one, sizeof(int));

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind error ");
        return -1;
    }

    listen(nfd, 1024);struct sockaddr_in addr2;
    socklen_t addrlen = 0;

    while (true) {
        memset(&addr2, 0, sizeof(addr2));
        int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen);
        if (fd < 0) {
            perror("accept error ");
            break;
        }

        write(fd, "hello\n", sizeof("hello") );

        printf("pid=%d, receive request from %s:%d\n",
               getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port);
        close(fd);
    }
    return nfd;
}

int main() {

    int pid = fork();
    if (pid == 0) { // child
        printf("child pid=%d\n", getpid());
        bindSocket("127.0.0.1", 7801);
        return 0;

    } else if (pid < 0) {
        perror("fork error");
    }

    // parent
    printf("parent pid=%d\n", getpid());
    bindSocket("127.0.0.1", 7801);

    wait(0);

    return 0;
}

我們用nc命令模擬發起請求,

nc 127.0.0.1 7801

多次執行如上命令的結果:

parent pid=16922
child pid=16923
pid=16922, receive request from 0.0.0.0:0
pid=16923, receive request from 0.0.0.0:0
pid=16922, receive request from 127.0.0.1:53343
pid=16923, receive request from 127.0.0.1:19552
pid=16922, receive request from 127.0.0.1:36960
pid=16922, receive request from 127.0.0.1:54880

可見,accept請求的監聽套接字可能發生變化!這里是負載均衡模式。

 

 

 

驚群效應

上面說到的 reuseaddr、reuseport 都是不同套接字bind到同一個port上,套接字本身是不同的,每個套接字都有自己的accept隊列。

但在有些場景下,是多個進程(一般是父子關系)或者多線程監聽同一個套接字,因此這些父子進程(或多線程)共享同一個accept隊列。

接下來我們以多進程為例說明,

當一個請求進來,accept同時喚醒等待socket的多個進程,但是只有一個進程能accept到新的socket,其他進程accept不到任何東西,只好繼續回到accept流程,這就是驚群效應。

如果使用的是select/epoll + accept,則把驚群提前到了select/epoll這一步,多個進程只有一個進程能accept到連接,因為是非阻塞socket,其他進程返回EAGAIN。

 

accept 阻塞調用方式

看下面的例子,父進程創建套接字后先bind到127.0.0.1:7801,然后調用listen開始監聽請求;

之后fork出5個子進程,每個子進程都會繼承父進程的監聽套接字,接着每個子進程去accept請求。

#include <string.h>
#include "stdio.h"
#include "stdlib.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <string>
#include <iostream>


int bindSocket(char* ip, short port) {
    int nfd = socket(AF_INET, SOCK_STREAM, 0);
    if (nfd < 0) {
        perror("socket error ");
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind error ");
        return -1;
    }

    listen(nfd, 1024);

    return nfd;
}

void acceptSocket(int nfd) {
    struct sockaddr_in addr2;
    socklen_t addrlen = 0;

    while (true) {
        memset(&addr2, 0, sizeof(addr2));
        int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen);
        if (fd < 0) {
            perror("accept error ");
            break;
        }

        write(fd, "hello\n", sizeof("hello") );

        printf("pid=%d, receive request from %s:%d\n",
               getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port);
        close(fd);
    }
}


int main() {

    int nfd = bindSocket("127.0.0.1", 7801);

    for (int n = 0; n < 5; n++) {
         int pid = fork();
         if (pid == 0) { // child
              printf("child pid=%d\n", getpid());
              acceptSocket(nfd);
              return 0;
         } else if (pid < 0) {
              perror("fork error");
         }
    }

    int res = 0;
    wait(&res);

    return 0;
}

這時,通用用nc命令模擬請求:

nc 127.0.0.1 7801

運行結果

child pid=7478
child pid=7479
child pid=7480
child pid=7481
child pid=7482
pid=7478, receive request from 0.0.0.0:0

看起來只有一個子進程的accept調用返回了,難道驚群現象不存在嗎?這時因為在Linux 2.6 版本以后,內核內核已經解決了accept()函數的“驚群”問題,大概的處理方式就是,當內核接收到一個客戶連接后,只會喚醒等待隊列上的第一個進程或線程。所以,如果服務器采用accept阻塞調用方式,在最新的Linux系統上,已經沒有“驚群”的問題了。

 

 

 

epoll 方式

實際工程中常見的服務器程序,大都使用select、poll或epoll機制,此時,服務器不是阻塞在accept,而是阻塞在select、poll或epoll_wait,這種情況下的“驚群”仍然需要考慮。 

看下面的例子,父進程創建套接字后先bind到127.0.0.1:7801,然后調用listen開始監聽請求(這里會將監聽套接字設置為非阻塞);

之后fork出5個子進程,每個子進程都會繼承父進程的監聽套接字,接着每個子進程創建一個epoll句柄,並將監聽套接字的讀事件注冊到epoll中;

#include <string.h>
#include "stdio.h"
#include "stdlib.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <string>
#include <iostream>
#include <sys/epoll.h>

int bindSocket(char* ip, short port) {
    int nfd = socket(AF_INET, SOCK_STREAM, 0);
    if (nfd < 0) {
        perror("socket error ");
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind error ");
        return -1;
    }

    listen(nfd, 1024);

    int flags;
    if ((flags = fcntl(nfd, F_GETFL, 0)) < 0 ||
            fcntl(nfd, F_SETFL, flags | O_NONBLOCK) < 0) {
        perror("fcntl error ");
        return -1;
    }

    return nfd;
}

void acceptSocket(int nfd) {
    struct sockaddr_in addr2;
    socklen_t addrlen = 0;

    memset(&addr2, 0, sizeof(addr2));
    int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen);
    if (fd < 0) {
        perror("accept error ");
        return;
    }

    write(fd, "hello\n", sizeof("hello") );

    printf("pid=%d, receive request from %s:%d\n",
            getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port);
    close(fd);
}

void pollSocket(int nfd) {

    const int MAX_EPOLL_EVENTS = 128;

    int epfd = epoll_create(MAX_EPOLL_EVENTS);
    if (epfd < 0) {
        perror("epoll_create error");
        return;
    }

    struct epoll_event event;
    memset(&event, 0, sizeof(event));
    event.events = EPOLLET | EPOLLIN;
    event.data.fd = nfd;

    if (epoll_ctl(epfd, EPOLL_CTL_ADD, nfd, &event) == -1) {
        perror("epoll_ctl error");
        return;
    }

    while (true) {

        struct epoll_event events[MAX_EPOLL_EVENTS];
        memset(events, 0, sizeof(events));

        int events_cnt = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (events_cnt == 0) {
            //printf("epoll_wait timeout\n");

        } else if (events_cnt < 0) {
            perror("epoll_wait error");

        } else {
            for (int i = 0; i < events_cnt; i++) {
                if (events[i].events & EPOLLIN) {
                    acceptSocket(events[i].data.fd);

                } else if (events[i].events & EPOLLERR) {
                    printf("epoll_wait EPOLLERR\n");
                }
            }
        }
    }
}

int main() {

    int nfd = bindSocket("127.0.0.1", 7801);

        for (int n = 0; n < 5; n++) {
                int pid = fork();
                if (pid == 0) { // child
                        printf("child pid=%d\n", getpid());
                        pollSocket(nfd);
                        return 0;

                } else if (pid < 0) {
                        perror("fork error");
                }
        }

    int res = 0;
    wait(&res);

    return 0;
}

這時,通用用nc命令模擬請求:

nc 127.0.0.1 7801

運行結果

child pid=25879
child pid=25880
child pid=25881
child pid=25882
child pid=25883


accept error : Resource temporarily unavailable
accept error : Resource temporarily unavailable
accept error : Resource temporarily unavailable
pid=25881, receive request from 0.0.0.0:0
accept error : Resource temporarily unavailable

可見,當請求來臨時,所有的子進程epoll_wait 均返回了一個可讀事件,然后大家都去調用accept,但這個時候只有一個子進程能accept成功,其它子進程會報錯,這就是驚群問題!

如何解決這個問題呢?Nginx中使用mutex互斥鎖解決這個問題,具體措施有使用全局互斥鎖,每個子進程在epoll_wait()之前先去申請鎖,申請到則繼續處理,獲取不到則等待,並設置了一個負載均衡的算法(當某一個子進程的任務量達到總設置量的7/8時,則不會再嘗試去申請鎖)來均衡各個進程的任務量。

在上面的例子基礎上,增加信號量對epoll_wait進行同步,代碼如下

#include <string.h>
#include "stdio.h"
#include "stdlib.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <string>
#include <iostream>
#include <sys/epoll.h>
#include <sys/ipc.h>
#include <sys/sem.h>

union semun {
    int              val;    /* Value for SETVAL */
    struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
    unsigned short  *array;  /* Array for GETALL, SETALL */
    struct seminfo  *__buf;  /* Buffer for IPC_INFO
                                (Linux-specific) */
};

int sem_init() {
    int semid = semget(IPC_PRIVATE, 1, 0666);
    if (semid == -1) {
        perror("semget error"); 
        return -1;
    }

    union semun sem_union;
    sem_union.val = 1;
    if (semctl(semid, 0, SETVAL, sem_union) == -1) {
        perror("semctl error"); 
        return -1;
    }

        return semid;
}

void sem_lock(int sem_id) {
        struct sembuf sem_b;
        sem_b.sem_num = 0;
        sem_b.sem_op = -1;//P()
        sem_b.sem_flg = SEM_UNDO;
        if (semop(sem_id, &sem_b, 1) == -1) {
                perror("sem_lock error");
        }
}

void sem_unlock(int sem_id) {
        struct sembuf sem_b;
        sem_b.sem_num = 0;
        sem_b.sem_op = 1;//V()
        sem_b.sem_flg = SEM_UNDO;
        if (semop(sem_id, &sem_b, 1) == -1) {
                perror("sem_unlock error");
        }
}

int bindSocket(char* ip, short port) {
    int nfd = socket(AF_INET, SOCK_STREAM, 0);
    if (nfd < 0) {
        perror("socket error ");
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind error ");
        return -1;
    }

    listen(nfd, 1024);

    int flags;
    if ((flags = fcntl(nfd, F_GETFL, 0)) < 0 ||
            fcntl(nfd, F_SETFL, flags | O_NONBLOCK) < 0) {
        perror("fcntl error ");
        return -1;
    }

    return nfd;
}

void acceptSocket(int nfd) {
    struct sockaddr_in addr2;
    socklen_t addrlen = 0;

    memset(&addr2, 0, sizeof(addr2));
    int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen);
    if (fd < 0) {
        perror("accept error ");
        return;
    }

    write(fd, "hello\n", sizeof("hello") );

    printf("pid=%d, receive request from %s:%d\n",
            getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port);
    close(fd);
}

void pollSocket(int nfd, int semid) {

    const int MAX_EPOLL_EVENTS = 128;

    int epfd = epoll_create(MAX_EPOLL_EVENTS);
    if (epfd < 0) {
        perror("epoll_create error");
        return;
    }

    struct epoll_event event;
    memset(&event, 0, sizeof(event));
    event.events = EPOLLET | EPOLLIN;
    event.data.fd = nfd;

    if (epoll_ctl(epfd, EPOLL_CTL_ADD, nfd, &event) == -1) {
        perror("epoll_ctl error");
        return;
    }

    while (true) {

        struct epoll_event events[MAX_EPOLL_EVENTS];
        memset(events, 0, sizeof(events));

                sem_lock(semid);

        int events_cnt = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, 1000);

                sem_unlock(semid);

        if (events_cnt == 0) {
            //printf("epoll_wait timeout\n");

        } else if (events_cnt < 0) {
            perror("epoll_wait error");

        } else {
            for (int i = 0; i < events_cnt; i++) {
                if (events[i].events & EPOLLIN) {
                    acceptSocket(events[i].data.fd);

                } else if (events[i].events & EPOLLERR) {
                    printf("epoll_wait EPOLLERR\n");
                }
            }
        }

    }
}



int main() {

    int nfd = bindSocket("127.0.0.1", 7801);
        int semid = sem_init();

        for (int n = 0; n < 5; n++) {
                int pid = fork();
                if (pid == 0) { // child
                        printf("child pid=%d\n", getpid());
                        pollSocket(nfd, semid);
                        return 0;

                } else if (pid < 0) {
                        perror("fork error");
                }
        }

    int res = 0;
    wait(&res);

    return 0;
}
View Code

 

除了加鎖的解決方法外,還有其他2個辦法:

  1. 利用reuseport機制(需要3.9以后版本),但這需要在每個子進程去創建監聽端口(而不是繼承父進程的),這樣就可以保證每個子進程的套接字都是獨立的,它們都有自己的accept隊列,由內核來做負載均衡;
  2. liunx 4.5內核在epoll已經新增了EPOLL_EXCLUSIVE選項,在多個進程同時監聽同一個socket,只有一個被喚醒。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM