端口復用與驚群效應
REUSEADDR
假設同一個機器上有2個套接字,分別bind到 ip1:port1、ip2:port2,如果 port1 == port2,則第二個bind的套接字會有"Address already in use"的錯誤。
為了允許多個套接字綁定到同一個port上,可以打開SO_REUSEADDR選項,如下例子
#include "stdio.h" #include "stdlib.h" #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <string> #include <iostream> int bindSocket(char* ip, short port) { int nfd = socket(AF_INET, SOCK_STREAM, 0); if (nfd < 0) { perror("socket error "); return -1; } const int one = 1; setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY; addr.sin_port = htons(port); if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind error "); return -1; } listen(nfd, 1024);struct sockaddr_in addr2; socklen_t addrlen = 0; memset(&addr2, 0, sizeof(addr2)); if (accept(nfd, (struct sockaddr*)&addr2, &addrlen) < 0) { perror("accept error "); } return nfd; } int main() { int pid = fork(); if (pid == 0) { // child bindSocket("127.0.0.1", 7801); return 0; } else if (pid < 0) { perror("fork error"); } bindSocket("0.0.0.0", 7801); int res = 0; wait(&res); return 0; }
例子中,父進程bind到0.0.0.0:7801,子進程bind到127.0.0.1:7801,它們都可以bind成功。
注意,這里兩個套接字bind的ip不一樣(一個是127.0.0.1,一個是0.0.0.0),如果ip和port都一樣,即使打開SO_REUSEADDR選項也會有沖突。
SO_REUSEADDR 還有一個作用,根據TCP協議,服務端如果主動關閉連接,會進入TIME_WAIT狀態,在該狀態下,如果又有套接字要bind到同一個IP:Port,也會有錯誤,但在開啟SO_REUSEADDR時,就可以bind成功。
REUSEPORT
前面提到的reuseaddr 允許多個套接字綁定到同一個port(但ip不能相同),而reuseport允許將多個套接字bind在同一個IP+Port 對上。
那么當來了一個連接時,要由哪個套接字來處理它呢?reuseport分為兩種模式:
- 熱備份模式,在Linux 3.9內核引入,實際工作的套接字只有一個,其它的作為備份,只有當前一個套接字不再可用的時候,才會由后一個來取代,其投入工作的順序取決於實現;
- 負載均衡模式,(在3.9內核之后),用數據包的源IP/源端口作為一個HASH函數的輸入計算由哪個套接字來處理,所以同一個客戶端的連接總是被分發到同一個套接字;
對於負載均衡模式,考慮是否存在這樣的問題,對於UDP而言,比如一個事務中需要交互4個數據包,第1個數據包的元組HASH結果索引到了線程1的套接字,它理所當然被線程1處理,在第2個數據包到達之前,線程1掛了,那么該線程的套接字的位置將會被別的線程,比如線程2的套接字取代!在第2個數據包到達的時候,將會由線程2的套接字來處理之,然而線程2並不知道線程1保存的關於此連接的事務狀態。
再討論下reuseport的實現原理,對於 3.9 <= linux < 4.5 版本的內核,共享同一個port的套接字以鏈表的形式組織起來,如下圖所示,
假設服務端建立了4個Server(A、B、C、D),監聽的IP和port如圖;
其中A和B使用了reuseport,比如說A有4個線程監聽了0.0.0.0:21,而B有2個線程監聽了192.168.10.1:21;
沖突鏈以port為key,因此A、B、D掛在同一條沖突鏈上;
如果此時客戶端請求了192.168.10.1:21,那么內核會遍歷listening_hash[0],為上面7個套接字打分,由於B監聽的精准地址,所以得分會更高,內核會在sk_B0和sk_B1之間做選擇。
從上面的例子可以看出,內核需要遍歷沖突鏈,給監聽該port上的所有socket打分,性能會有不足之處。
在 linux >= 4.5 版本的內核中進一步優化了這個問題,引入了reuseport group,它將bind到同一個ip:port的套接字進行分組,
這樣當要查找目標地址為192.168.10.1:21的套接字時,可以直接在socketB reuseport group中查找,而不用再遍歷整個沖突鏈。
注意,這個group特性在4.5版本中只支持了UDP協議,TCP要到4.6版本才支持。
再來看 reuseport 使用的一個例子,父、子進程都監聽127.0.0.1:7801端口,
#include <string.h> #include "stdio.h" #include "stdlib.h" #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <string> #include <iostream> //using namespace std; int bindSocket(char* ip, short port) { int nfd = socket(AF_INET, SOCK_STREAM, 0); if (nfd < 0) { perror("socket error "); return -1; } const int one = 1; setsockopt(nfd, SOL_SOCKET, SO_REUSEPORT, (char *)&one, sizeof(int)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY; addr.sin_port = htons(port); if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind error "); return -1; } listen(nfd, 1024);struct sockaddr_in addr2; socklen_t addrlen = 0; while (true) { memset(&addr2, 0, sizeof(addr2)); int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen); if (fd < 0) { perror("accept error "); break; } write(fd, "hello\n", sizeof("hello") ); printf("pid=%d, receive request from %s:%d\n", getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port); close(fd); } return nfd; } int main() { int pid = fork(); if (pid == 0) { // child printf("child pid=%d\n", getpid()); bindSocket("127.0.0.1", 7801); return 0; } else if (pid < 0) { perror("fork error"); } // parent printf("parent pid=%d\n", getpid()); bindSocket("127.0.0.1", 7801); wait(0); return 0; }
我們用nc命令模擬發起請求,
nc 127.0.0.1 7801
多次執行如上命令的結果:
parent pid=16922 child pid=16923 pid=16922, receive request from 0.0.0.0:0 pid=16923, receive request from 0.0.0.0:0 pid=16922, receive request from 127.0.0.1:53343 pid=16923, receive request from 127.0.0.1:19552 pid=16922, receive request from 127.0.0.1:36960 pid=16922, receive request from 127.0.0.1:54880
可見,accept請求的監聽套接字可能發生變化!這里是負載均衡模式。
驚群效應
上面說到的 reuseaddr、reuseport 都是不同套接字bind到同一個port上,套接字本身是不同的,每個套接字都有自己的accept隊列。
但在有些場景下,是多個進程(一般是父子關系)或者多線程監聽同一個套接字,因此這些父子進程(或多線程)共享同一個accept隊列。
接下來我們以多進程為例說明,
當一個請求進來,accept同時喚醒等待socket的多個進程,但是只有一個進程能accept到新的socket,其他進程accept不到任何東西,只好繼續回到accept流程,這就是驚群效應。
如果使用的是select/epoll + accept,則把驚群提前到了select/epoll這一步,多個進程只有一個進程能accept到連接,因為是非阻塞socket,其他進程返回EAGAIN。
accept 阻塞調用方式
看下面的例子,父進程創建套接字后先bind到127.0.0.1:7801,然后調用listen開始監聽請求;
之后fork出5個子進程,每個子進程都會繼承父進程的監聽套接字,接着每個子進程去accept請求。
#include <string.h> #include "stdio.h" #include "stdlib.h" #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <string> #include <iostream> int bindSocket(char* ip, short port) { int nfd = socket(AF_INET, SOCK_STREAM, 0); if (nfd < 0) { perror("socket error "); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY; addr.sin_port = htons(port); if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind error "); return -1; } listen(nfd, 1024); return nfd; } void acceptSocket(int nfd) { struct sockaddr_in addr2; socklen_t addrlen = 0; while (true) { memset(&addr2, 0, sizeof(addr2)); int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen); if (fd < 0) { perror("accept error "); break; } write(fd, "hello\n", sizeof("hello") ); printf("pid=%d, receive request from %s:%d\n", getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port); close(fd); } } int main() { int nfd = bindSocket("127.0.0.1", 7801); for (int n = 0; n < 5; n++) { int pid = fork(); if (pid == 0) { // child printf("child pid=%d\n", getpid()); acceptSocket(nfd); return 0; } else if (pid < 0) { perror("fork error"); } } int res = 0; wait(&res); return 0; }
這時,通用用nc命令模擬請求:
nc 127.0.0.1 7801
運行結果
child pid=7478 child pid=7479 child pid=7480 child pid=7481 child pid=7482 pid=7478, receive request from 0.0.0.0:0
看起來只有一個子進程的accept調用返回了,難道驚群現象不存在嗎?這時因為在Linux 2.6 版本以后,內核內核已經解決了accept()函數的“驚群”問題,大概的處理方式就是,當內核接收到一個客戶連接后,只會喚醒等待隊列上的第一個進程或線程。所以,如果服務器采用accept阻塞調用方式,在最新的Linux系統上,已經沒有“驚群”的問題了。
epoll 方式
實際工程中常見的服務器程序,大都使用select、poll或epoll機制,此時,服務器不是阻塞在accept,而是阻塞在select、poll或epoll_wait,這種情況下的“驚群”仍然需要考慮。
看下面的例子,父進程創建套接字后先bind到127.0.0.1:7801,然后調用listen開始監聽請求(這里會將監聽套接字設置為非阻塞);
之后fork出5個子進程,每個子進程都會繼承父進程的監聽套接字,接着每個子進程創建一個epoll句柄,並將監聽套接字的讀事件注冊到epoll中;
#include <string.h> #include "stdio.h" #include "stdlib.h" #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <string> #include <iostream> #include <sys/epoll.h> int bindSocket(char* ip, short port) { int nfd = socket(AF_INET, SOCK_STREAM, 0); if (nfd < 0) { perror("socket error "); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY; addr.sin_port = htons(port); if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind error "); return -1; } listen(nfd, 1024); int flags; if ((flags = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("fcntl error "); return -1; } return nfd; } void acceptSocket(int nfd) { struct sockaddr_in addr2; socklen_t addrlen = 0; memset(&addr2, 0, sizeof(addr2)); int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen); if (fd < 0) { perror("accept error "); return; } write(fd, "hello\n", sizeof("hello") ); printf("pid=%d, receive request from %s:%d\n", getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port); close(fd); } void pollSocket(int nfd) { const int MAX_EPOLL_EVENTS = 128; int epfd = epoll_create(MAX_EPOLL_EVENTS); if (epfd < 0) { perror("epoll_create error"); return; } struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLET | EPOLLIN; event.data.fd = nfd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, nfd, &event) == -1) { perror("epoll_ctl error"); return; } while (true) { struct epoll_event events[MAX_EPOLL_EVENTS]; memset(events, 0, sizeof(events)); int events_cnt = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, 1000); if (events_cnt == 0) { //printf("epoll_wait timeout\n"); } else if (events_cnt < 0) { perror("epoll_wait error"); } else { for (int i = 0; i < events_cnt; i++) { if (events[i].events & EPOLLIN) { acceptSocket(events[i].data.fd); } else if (events[i].events & EPOLLERR) { printf("epoll_wait EPOLLERR\n"); } } } } } int main() { int nfd = bindSocket("127.0.0.1", 7801); for (int n = 0; n < 5; n++) { int pid = fork(); if (pid == 0) { // child printf("child pid=%d\n", getpid()); pollSocket(nfd); return 0; } else if (pid < 0) { perror("fork error"); } } int res = 0; wait(&res); return 0; }
這時,通用用nc命令模擬請求:
nc 127.0.0.1 7801
運行結果
child pid=25879 child pid=25880 child pid=25881 child pid=25882 child pid=25883 accept error : Resource temporarily unavailable accept error : Resource temporarily unavailable accept error : Resource temporarily unavailable pid=25881, receive request from 0.0.0.0:0 accept error : Resource temporarily unavailable
可見,當請求來臨時,所有的子進程epoll_wait 均返回了一個可讀事件,然后大家都去調用accept,但這個時候只有一個子進程能accept成功,其它子進程會報錯,這就是驚群問題!
如何解決這個問題呢?Nginx中使用mutex互斥鎖解決這個問題,具體措施有使用全局互斥鎖,每個子進程在epoll_wait()之前先去申請鎖,申請到則繼續處理,獲取不到則等待,並設置了一個負載均衡的算法(當某一個子進程的任務量達到總設置量的7/8時,則不會再嘗試去申請鎖)來均衡各個進程的任務量。
在上面的例子基礎上,增加信號量對epoll_wait進行同步,代碼如下

#include <string.h> #include "stdio.h" #include "stdlib.h" #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <string> #include <iostream> #include <sys/epoll.h> #include <sys/ipc.h> #include <sys/sem.h> union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */ }; int sem_init() { int semid = semget(IPC_PRIVATE, 1, 0666); if (semid == -1) { perror("semget error"); return -1; } union semun sem_union; sem_union.val = 1; if (semctl(semid, 0, SETVAL, sem_union) == -1) { perror("semctl error"); return -1; } return semid; } void sem_lock(int sem_id) { struct sembuf sem_b; sem_b.sem_num = 0; sem_b.sem_op = -1;//P() sem_b.sem_flg = SEM_UNDO; if (semop(sem_id, &sem_b, 1) == -1) { perror("sem_lock error"); } } void sem_unlock(int sem_id) { struct sembuf sem_b; sem_b.sem_num = 0; sem_b.sem_op = 1;//V() sem_b.sem_flg = SEM_UNDO; if (semop(sem_id, &sem_b, 1) == -1) { perror("sem_unlock error"); } } int bindSocket(char* ip, short port) { int nfd = socket(AF_INET, SOCK_STREAM, 0); if (nfd < 0) { perror("socket error "); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip ? inet_addr(ip) : INADDR_ANY; addr.sin_port = htons(port); if (bind(nfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind error "); return -1; } listen(nfd, 1024); int flags; if ((flags = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("fcntl error "); return -1; } return nfd; } void acceptSocket(int nfd) { struct sockaddr_in addr2; socklen_t addrlen = 0; memset(&addr2, 0, sizeof(addr2)); int fd = accept(nfd, (struct sockaddr *) &addr2, &addrlen); if (fd < 0) { perror("accept error "); return; } write(fd, "hello\n", sizeof("hello") ); printf("pid=%d, receive request from %s:%d\n", getpid(), inet_ntoa(addr2.sin_addr), addr2.sin_port); close(fd); } void pollSocket(int nfd, int semid) { const int MAX_EPOLL_EVENTS = 128; int epfd = epoll_create(MAX_EPOLL_EVENTS); if (epfd < 0) { perror("epoll_create error"); return; } struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLET | EPOLLIN; event.data.fd = nfd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, nfd, &event) == -1) { perror("epoll_ctl error"); return; } while (true) { struct epoll_event events[MAX_EPOLL_EVENTS]; memset(events, 0, sizeof(events)); sem_lock(semid); int events_cnt = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, 1000); sem_unlock(semid); if (events_cnt == 0) { //printf("epoll_wait timeout\n"); } else if (events_cnt < 0) { perror("epoll_wait error"); } else { for (int i = 0; i < events_cnt; i++) { if (events[i].events & EPOLLIN) { acceptSocket(events[i].data.fd); } else if (events[i].events & EPOLLERR) { printf("epoll_wait EPOLLERR\n"); } } } } } int main() { int nfd = bindSocket("127.0.0.1", 7801); int semid = sem_init(); for (int n = 0; n < 5; n++) { int pid = fork(); if (pid == 0) { // child printf("child pid=%d\n", getpid()); pollSocket(nfd, semid); return 0; } else if (pid < 0) { perror("fork error"); } } int res = 0; wait(&res); return 0; }
除了加鎖的解決方法外,還有其他2個辦法:
- 利用reuseport機制(需要3.9以后版本),但這需要在每個子進程去創建監聽端口(而不是繼承父進程的),這樣就可以保證每個子進程的套接字都是獨立的,它們都有自己的accept隊列,由內核來做負載均衡;
- liunx 4.5內核在epoll已經新增了EPOLL_EXCLUSIVE選項,在多個進程同時監聽同一個socket,只有一個被喚醒。