在使用epoll的時候,我們上篇文章epoll的陷阱大體介紹了epoll中會有哪些問題。這篇文章我們就針對必須要了解,也是繞不過去的陷阱進行實驗,看看現象是什么,並且如何編寫才能達到我們想要的效果。
https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering
https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/
https://juejin.cn/post/6844904122018168845
系統環境
Linux version 4.19.0-6-amd64 (debian-kernel@lists.debian.org) (gcc version 8.3.0 (Debian 8.3.0-6)) #1 SMP Debian 4.19.67-2+deb10u1 (2019-09-20)
監聽套接字的驚群現象
問題描述
創建一個epoll實例,多個線程調用epoll_wait監聽,一個客戶端請求連接,所有調用epoll_wait的線程都會返回。因為我們的listen socket是非阻塞的,所以只有一個線程的accept返回成功,其他的線程返回EAGAIN。
性能影響
https://www.ichenfu.com/2017/05/03/proxy-epoll-thundering-herd/
我們知道系統中影響性能最大的就是上下文切換,IO操作和輪詢。accept的驚群現象至少滿足了兩種:
-
頻繁的喚起線程,上下文切換
-
頻繁的操作socket,IO操作
重現問題
#ifndef EPOLL1_H
#define EPOLL1_H
#include <pthread.h>
class epoll1;
struct THREADDATA
{
epoll1* pepoll;
pthread_t threadid;
int num;
THREADDATA()
{
pepoll = NULL;
threadid = 0;
num = 0;
}
};
class epoll1
{
public:
epoll1();
~epoll1();
void start();
void setnonblocking(int fd);
static void* start_routine(void * parg);
int m_epollfd;
int m_listenfd;
THREADDATA m_threaddata[5];
};
#endif // EPOLL1_H
#include "epoll1.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAX_EVENTS 100
#define MAX_BUF 1024
using namespace std;
epoll1::epoll1()
{
m_epollfd = -1;
m_listenfd = -1;
for(auto& iter : m_threaddata)
{
iter.pepoll = this;
}
}
epoll1::~epoll1()
{
close(m_listenfd);
close(m_epollfd);
}
void epoll1::start()
{
struct sockaddr_in servaddr;
struct epoll_event ev;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9000);
if((m_listenfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) == -1)
{
cout << "create listen socket err" << endl;
exit(EXIT_FAILURE);
}
if(bind(m_listenfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)) == -1)
{
cout << "bind listen socket err" << endl;
cout << strerror(errno) << endl;
exit(EXIT_FAILURE);
}
if(listen(m_listenfd, 5) == -1)
{
cout << "listen listen socket err" << endl;
exit(EXIT_FAILURE);
}
m_epollfd = epoll_create1(0);
ev.events = EPOLLIN;
ev.data.fd = m_listenfd;
if(epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
exit(EXIT_FAILURE);
}
for(int i = 0; i < 5; i++)
{
m_threaddata[i].num = i;
}
pthread_create(&(m_threaddata[0].threadid), NULL, start_routine, (void*)(&m_threaddata[0]));
pthread_create(&(m_threaddata[1].threadid), NULL, start_routine, (void*)(&m_threaddata[1]));
pthread_create(&(m_threaddata[2].threadid), NULL, start_routine, (void*)(&m_threaddata[2]));
pthread_create(&(m_threaddata[3].threadid), NULL, start_routine, (void*)(&m_threaddata[3]));
pthread_create(&(m_threaddata[4].threadid), NULL, start_routine, (void*)(&m_threaddata[4]));
for(auto& iter : m_threaddata)
{
pthread_join(iter.threadid, NULL);
}
}
void epoll1::setnonblocking(int fd)
{
fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK));
}
void* epoll1::start_routine(void *parg)
{
epoll1 * pepoll = ((THREADDATA*)parg)->pepoll;
struct epoll_event events[MAX_EVENTS];
struct sockaddr_in cliaddr;
char buf[MAX_BUF] = {0};
struct epoll_event ev;
socklen_t len = sizeof(struct sockaddr_in);
while(true)
{
int nfds = epoll_wait(pepoll->m_epollfd, events, MAX_EVENTS, -1);
if(nfds == -1)
{
break;
}
cout << "thread id: " << ((THREADDATA*)parg)->num << " wake up" << endl;
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd == pepoll->m_listenfd)
{
int connfd = accept(pepoll->m_listenfd, (struct sockaddr *)&cliaddr, &len);
if(connfd == -1)
{
if(errno == EAGAIN)
{
cout << "thread id: " << ((THREADDATA*)parg)->num << " err eagain" << endl;
continue;
}
else
{
cout << "thread id: " << ((THREADDATA*)parg)->num << " err return" << endl;
break;
}
}
pepoll->setnonblocking(connfd);
cout << "thread id: " << ((THREADDATA*)parg)->num << " accept" << endl;
ev.events = EPOLLIN;
ev.data.fd = connfd;
if( epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0 )
{
break;
}
}
else
{
int npro = read(events[i].data.fd, buf, sizeof(buf));
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
else
{
cout << buf << endl;
npro = write(events[i].data.fd, buf, npro);
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
}
}
}
}
return NULL;
}
當有一個客戶端請求連接時,輸出日志如下
thread id: thread id: thread id: 4 wake up0 wake up3 wake up
thread id:
thread id: 1 wake up
thread id: 0 accept
2 wake up
thread id: 3 err eagain
thread id: thread id: 4 err eagain
2 err eagain
thread id: 1 err eagain
因為是多線程,終端輸出的時候不能保證順序,所以出現了亂序,不過從日志中也能看出,5個空閑的線程都喚醒了,但是呢,只有0序號的線程成功接收了請求,其他的線程都報了錯。
解決方案一
EPOLLEXCLUSIVE (since Linux 4.5)
https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
https://github.com/torvalds/linux/commit/df0108c5da561c66c333bb46bfe3c1fc65905898
為epoll文件描述符設置獨占喚醒模式也會被附加到目標文件描述符fd上。當需要喚醒事件的時候,如果有多個epoll文件描述符被附加到同一個目標文件,使用的是EPOLLEXCLUSIVE標識,一個或者更多epoll文件描述符會通過epoll_wait收到事件。默認情況下(沒有設置EPOLLEXCLUSIVE),所有的epoll文件描述符都會收到事件。EPOLLEXCLUSIVE在這種情形下,對於避免驚群問題非常有效。
如果同一個文件描述符是在多個epoll實例中,一些有EPOLLEXCLUSIVE標識,一些沒有,事件會喚醒所有沒有定義EPOLLEXCLUSIVE標識的實例,至少喚醒一個定義了EPOLLEXCLUSIVE標識的epoll實例。
如下的一些值可以和EPOLLEXCLUSIVE同時使用:EPOLLIN EPOLLOUT EPOLLWAKEUP EPOLLET。EPOLLHUP和EPOLLERR也可以被定義,但是沒必要:正常情況下,這些事件只會在發生的時候才會通知,不管有沒有定義在事件中。如果設定了其他的值,會觸發EINVAL的錯誤。
EPOLLEXCLUSIVE可能會用在EPOLL_CTL_ADD的操作中;如果想在EPOLL_CTL_MOD使用,會報錯。如果EPOLLEXCLUSIVE已經經過epoll_ctl()設置了,那么在后續EPOLL_CTL_MOD中操作同一個epfd和fd對也會報錯。調用epoll_ctl()為事件設定EPOLLEXCLUSIVE,然后設定這個目標文件描述符為epoll的實例也會報錯。所有這些操作的錯誤代碼都是EINVAL。
EPOLLEXCLUSIVE在調用epoll_ctl時為傳入事件設置的一個輸入標識;永遠也不會通過epoll_wait返回給用戶。
我們從官方的文檔可以看出,這個標識是用來避免,一個事件到達時,喚醒所有epoll_wait的問題。但是官方也只說,會喚起一個或者多個,也就是這個標識是減少了驚群問題的觸發概率,但是沒有完全避免。
那我們為監聽socket設置EPOLLEXCLUSIVE標識進行測試
m_epollfd = epoll_create1(0);
ev.events = EPOLLIN|EPOLLEXCLUSIVE;
ev.data.fd = m_listenfd;
if(epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
exit(EXIT_FAILURE);
}
在把監聽socket添加到epoll中的時候,增加EPOLLEXCLUSIVE標識,測試結果如下
thread id: thread id: thread id: 4 wake up1 wake up0 wake up
thread id: thread id: 0 err eagain
thread id: 3 wake up
4 accept
thread id: 3 err eagain
thread id: 2 wake up
thread id: 2 err eagain
thread id: 1 err eagain
非常奇怪,按照官方文檔和github上的這個flag的patch來講,應該是只有一個線程喚醒,或者會有多個喚醒,但是不管如何測試,結果都是5個線程全部喚醒。針對這個問題,我也沒找到對應的解釋和解決方法,也沒有查找到在LT模式下EPOLLEXCLUSIVE的用法。目前得到的結論就是
-
這個標識肯定是有作用的。因為源碼是公開的,並且官方文檔中也寫明了。
-
導致所有線程都被喚醒的原因可能是因為LT模式。LT模式的特點是,如果epoll中監聽的socket有事件,那么不管什么時候調用,epoll_wait都會返回。我們設置了EPOLLEXCLUSIVE標識,可以保證所有的epoll_wait同時調用的時候只有一個返回,但是這個線程返回之后,在調用accept把監聽事件從epoll中取出來之前,別的線程的epoll_wait也在阻塞監聽,就相當於這個時候又有epoll_wait調用,基於LT的特點,所以又返回了。
最后的結果就是,LT模式下的EPOLLEXCLUSIVE並不能解決監聽套接字的驚群問題。
感謝 @笨拙的菜鳥 提供的信息,新的結論請參考文章最后
進一步驗證
如果說EPOLLEXCLUSIVE對於監聽套接字的驚群問題沒有任何作用,根據官方所說,驚群的現象是所有等待線程都會被喚醒,那我們把線程增加到50個,再看看驚群的現象。50個線程去掉EPOLLEXCLUSIVE標識,測試結果如下
thread id: 49 wake up
thread id: 47 wake up
thread id: 42 wake up
thread id: 48 wake up
thread id: 42 err eagain
thread id: thread id: 49 accept
thread id: thread id: 4446 wake up
wake up
thread id: 46 err eagain
thread id: thread id: 47 err eagain
4845 wake up
err eagainthread id: 45 err eagain
thread id:
43 wake up
thread id: 44 err eagain
thread id: 43 err eagain
奇怪的問題又來了,並沒有喚起所有線程,而是喚起了部分線程。根據官方文檔的描述和理解(https://man7.org/linux/man-pages/man7/epoll.7.html),導致這個的原因有可能是,epoll在遍歷的時候,一個個的喚起線程,但是因為每個線程互不影響,導致某一個系統時鍾切片,有一個線程運行到了accept並且成功了,這個時候把事件從epoll中讀取出來,所以切換回epoll繼續遍歷喚醒的時候,發現沒有事件了,也就不會喚起其他線程了。
具體的證明還需要后續查看epoll的源碼。
解決方案二
如果新的標識不能解決我們的問題,那么我們就使用另一個
EPOLLONESHOT (since Linux 2.6.2)
在相關的文件描述符上請求只有一次的通知。也就是,當一個事件在epoll_wait等待的文件描述符上觸發時,這個文件描述符就會從通知列表中禁用,不會再通知到其他的epoll實例。用戶必須調用epoll_ctl()和EPOLL_CTL_MOD把這個文件描述符用新的事件標識重新啟用。
這個標識是event.events的輸入標識,在調用epoll_ctl()的時候使用;不可能在epoll_wait返回。
使用EPOLLONESHOT的話,我們只需要改兩個地方:一個是第一次把監聽套接字添加到epoll;另一個是在epoll_wait返回的時候,需要通過EPOLL_CTL_MOD再次重置。
m_epollfd = epoll_create1(0);
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = m_listenfd;
if(epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
exit(EXIT_FAILURE);
}
while(true)
{
int nfds = epoll_wait(pepoll->m_epollfd, events, MAX_EVENTS, -1);
if(nfds == -1)
{
break;
}
cout << "thread id: " << ((THREADDATA*)parg)->num << " wake up " << nfds << endl;
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd == pepoll->m_listenfd)
{
int connfd = accept(pepoll->m_listenfd, (struct sockaddr *)&cliaddr, &len);
if(connfd == -1)
{
if(errno == EAGAIN)
{
cout << "thread id: " << ((THREADDATA*)parg)->num << " err eagain" << endl;
continue;
}
else
{
cout << "thread id: " << ((THREADDATA*)parg)->num << " err return" << endl;
break;
}
}
pepoll->setnonblocking(connfd);
cout << "thread id: " << ((THREADDATA*)parg)->num << " accept" << endl;
ev.events = EPOLLIN;
ev.data.fd = connfd;
if(epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
break;
}
//在這里重新把監聽socket添加到epoll
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = pepoll->m_listenfd;
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_MOD, pepoll->m_listenfd, &ev);
}
else
{
...
}
這種方法是否可行呢,我們測試一下
thread id: 49 wake up 1
thread id: 49 accept
thread id: 49 wake up 1
thread id: 49 accept
thread id: 49 wake up 1
thread id: 49 accept
thread id: 49 wake up 1
thread id: 49 accept
從輸出的日志可以看出,同時來了4個鏈接,每一個鏈接都是單獨喚醒一個線程,並且沒有驚群的問題。算是解決了。
注意
上面代碼可能會有一個問題,就是epoll_wait返回的時候,有多個監聽的事件,那么應該在所有監聽事件都處理完成后再激活監聽socket。不然有可能在處理完第一個監聽事件的時候,激活了套接字,在處理第二個監聽事件之前,另一個線程的epoll_wait也返回了,就會出現多個線程同時accept同一個鏈接,浪費一次調用。
雖然多次測試發現都是只有一個監聽事件返回,建議還是修改一下,做一個標識,如果有監聽事件,那么在這些事件都處理完成后,再次把監聽socket激活。但是這樣做呢,又有另一個問題,就是如果這次返回的事件中,有一個處理起來非常慢,就會導致監聽遲遲無法響應。
進一步完善的話,就是,先處理epoll_wait返回的所有監聽事件,如果有,把監聽套接字重新激活,然后再處理剩下的事件。
可能的影響
從日志中可以看出,這種方式不能很好的利用多線程/多進程/CPU多內核的優勢。因為當有一個連接請求進來時,epoll_wait就會返回,並且把監聽事件禁止掉,其他進來的連接只能等當前的處理完再次激活監聽事件后才能處理,也就是強制變成了串行的。我們理想的模型是不管進來幾個連接,都會對每個連接喚起一個空閑的線程進行處理;不管是一次處理一個請求還是多個請求,都不會出現一個請求多個線程喚起最后只有一個成功(驚群問題)或者多個線程同時處理一個請求(比如接收數據的亂序問題,下面會介紹)。很顯然epoll並不能很容易的滿足我們的要求。
解決方案三
上面的解決方案是一個epoll實例,然后多個線程同時監聽,也可以寫成每個線程/進程創建一個自己的epoll實例,監聽同一個端口。
SO_REUSEPORT (since Linux 3.9)
允許多個AF_INET或者AF_INET6套接字綁定到一個制定的套接字地址上。必須為每一個套接字設置這個屬性,包括第一個套接字,在調用bind前設置這個屬性。為了防止端口劫持,所有的綁定到同一個地址的進程必須有着同一個有效的UID。這個選項對與TCP和UDP都有效。
對於TCP套接字,這個選項允許accept分布式的加載在多線程服務器,對於每一個線程用不同的監聽套接字,用於改善性能。對比傳統的技術這種方法提供了改進的負載分配,比如使用單獨的一個accept線程,或者有多個線程對於同一個socket競爭的accept。
對於UDP套接字,這個選項對比傳統技術多個進程競爭的獲取同一個套接字上的數據包可以在多進程(或者多線程)提供更好的負載對於將要到達的數據包。
這個方案是針對每一個線程/進程,都開啟一個獨立的監聽套接字,每一個線程/進程都創建一個單獨的epoll實例。這樣的話,每個線程/進程內部,相當於是單線程的,每個線程/進程內部的監聽套接字是唯一的,不會有其他的線程/進程在監聽當前線程的套接字,所以就可以避免多個線程喚起了。
Address already in use
當我們啟用SO_REUSEPORT的時候,開啟多個線程監聽,結果提示Address already in use。這是因為SO_REUSEPORT是解決多個進程綁定同一個端口的問題,而我們的是多線程,屬於同一個線程綁定同一個端口,那么還需要增加一個參數SO_REUSEADDR
SO_REUSEADDR
在調用bind前使用的指定地址可以在本地重復利用。對於AF_INET套接字,意思就是一個套接字可以綁定,就算已經有一個激活的監聽套接字已經綁定到這個地址上了。如果一個監聽套接字已經通過INADDR_ANY標識綁定到了一個指定的端口上,那么就不能把這個端口綁定到本地其他地址上了。參數是整數的布爾類型的標識。
SO_REUSEADDR和SO_REUSEPORT的作用分別是指定地址和端口是否允許多次綁定。
代碼
#include "epoll1.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <arpa/inet.h>
#define MAX_EVENTS 100
#define MAX_BUF 1024
using namespace std;
epoll1::epoll1()
{
for(auto& iter : m_threaddata)
{
iter.pepoll = this;
}
}
epoll1::~epoll1()
{
}
void epoll1::start()
{
int i = 0;
for(auto& iter : m_threaddata)
{
iter.num = i;
pthread_create(&(iter.threadid), NULL, start_routine, (void*)(&iter));
i++;
}
for(auto& iter : m_threaddata)
{
pthread_join(iter.threadid, NULL);
}
}
void epoll1::setnonblocking(int fd)
{
fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK));
}
void* epoll1::start_routine(void *parg)
{
int epollfd = epoll_create1(0);
bool bcontinue = true;
//cout << "enter thread " << ((THREADDATA1*)parg)->num << endl;
epoll1 * pepoll = ((THREADDATA1*)parg)->pepoll;
struct epoll_event events[MAX_EVENTS];
struct sockaddr_in cliaddr;
char buf[MAX_BUF] = {0};
struct epoll_event ev;
socklen_t len = sizeof(struct sockaddr_in);
int listenfd = 0;
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9000);
if((listenfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) == -1)
{
cout << "create listen socket err" << endl;
bcontinue = false;
}
int val = 1;
if(bcontinue && setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (val)) == -1)
{
cout << "set listen socket reuseport err" << endl;
bcontinue = false;
}
if(bcontinue && setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof (val)) == -1)
{
cout << "set listen socket reuseport err" << endl;
bcontinue = false;
}
if(bcontinue && bind(listenfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)) == -1)
{
cout << "bind listen socket err" << endl;
cout << strerror(errno) << endl;
bcontinue = false;
}
if(bcontinue && listen(listenfd, 5) == -1)
{
cout << "listen listen socket err" << endl;
bcontinue = false;
}
ev.events = EPOLLIN;
ev.data.fd = listenfd;
if(bcontinue && epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
bcontinue = false;
}
while(bcontinue)
{
int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if(nfds == -1)
{
break;
}
cout << "thread id: " << ((THREADDATA1*)parg)->num << " wake up " << nfds << endl;
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd == listenfd)
{
int connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &len);
if(connfd == -1)
{
if(errno == EAGAIN)
{
cout << "thread id: " << ((THREADDATA1*)parg)->num << " err eagain" << endl;
continue;
}
else
{
cout << "thread id: " << ((THREADDATA1*)parg)->num << " err return" << endl;
break;
}
}
pepoll->setnonblocking(connfd);
cout << "thread id: " << ((THREADDATA1*)parg)->num << " accept" << endl;
ev.events = EPOLLIN;
ev.data.fd = connfd;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
break;
}
}
else
{
int npro = read(events[i].data.fd, buf, sizeof(buf));
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
else
{
cout << buf << endl;
npro = write(events[i].data.fd, buf, npro);
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
}
}
}
}
cout << "quit thread " << ((THREADDATA1*)parg)->num << endl;
close(listenfd);
close(epollfd);
return NULL;
}
我們把創建監聽socket放到了線程內,並且在bind之前,先調用setsockopt設置了SO_REUSEADDR,再設置了SO_REUSEPORT。記住,這個順序不能顛倒,不然還是會提示Address already in use。我們同時連接4個請求,輸出日志如下
thread id: 13 wake up 1
thread id: 13 accept
thread id: 0 wake up 1
thread id: 0 accept
thread id: 26 wake up 1
thread id: 15 wake up 1
thread id: 26 accept
thread id: 15 accept
從日志可以看出,解決了監聽socket的驚群問題。方法三比方法二的優勢是,多個監聽套接字,如果其中一個監聽套接字出現問題,不影響程序運行。這里需要注意的是需要為每一個線程創建一個epoll實例,在這個線程中創建監聽套接字,並且綁定,在當前線程監聽當前的epoll實例。
為什么不能創建一個epoll實例,然后多個線程使用多個監聽套接字呢?是因為,相當於前面的一個epoll實例,一個監聽套接字,多個線程監聽的情形,只不過多添加了幾個監聽套接字。epoll_wait返回的時候,並不會因為你是在A線程通過epoll_ctl添加的監聽套接字,就返回給A線程。還會出現驚群問題,相當於我們什么都沒做,僅僅多綁定了幾個監聽套接字,變得更亂。所以必須每個線程一個epoll實例,一個監聽套接字。
可能的影響
這種方式是可以了,不過因為創建了多個epoll實例,一個連接進來時,需要把它綁定到對應的epoll實例上,這里必須要把這個關系對應好。如果把一個套接字綁定到多個epoll實例上,當有事件來臨時,與上面的問題一樣,可能喚起多個線程處理。
解決方案四
與方案三類似,只不過是創建進程。這種方案適合每個連接關聯比較弱的情形,不然的話,就需要進程間的通信,實現起來更復雜。目前我沒有使用這種方式,就沒有做更詳細的測試。
解決方案五
使用ET模式。邊緣觸發模式,在狀態觸發的時候通知。比如當前監聽套接字沒有事件,這時來了一個連接,那么epoll就會通知一次,只有一個epoll_wait返回。不管我們沒有處理,對於當前連接的事件,epoll都不會再次通知。如果包含沒有處理的連接,那么就算后續再次來新的連接,epoll也不會通知。
使用ET模式也有需要處理的問題:
無用的喚醒
雖然ET模式基本上可以避免驚群問題了,但是還是會有這種情況:
-
線程A處理監聽套接字
-
處理完最后一個連接,監聽套接字的事件列表為空
-
因為A不知道已經是最后一個連接了,所以需要再次調用accept
-
這時來到一個新連接,因為原來的監聽套接字的事件列表是空的,所以epoll通知事件
-
因為線程A的epoll_wait返回了,並且沒有再次調用,所以epoll喚起線程B處理
-
這就出現了線程A和線程B同時accept同一個連接的情況,其中一個會返回EAGAIN
這種情形實際上比較少見,並且影響不大,只是多喚起了一個線程。
飢餓
-
有事件到達,喚起線程A
-
線程A的處理速度比后續事件到達的速度慢
-
線程A不斷的處理事件
-
其他線程沒有事情做
這個也可以叫做“撐死”,就是一種事件觸發,比如監聽套接字有新的連接到達,那么喚起線程A處理。但是由於並發太大,監聽套接字的事件列表一直有內容,線程A需要不斷的處理事件,epoll也不會再次通知其他線程。
這個問題處理起來就比較麻煩了,因為epoll不會再次通知其他線程處理,只能這個線程接收,然后分發。也不能說,這個線程設定一個處理上限,到達了就不處理了,因為其他epoll_wait不會返回,就導致有事件,但是沒有線程知道的問題。
還有一種方案就是記錄一下當前事件的狀態,比如正在接收監聽套接字的事件,那么喚起一個線程一起接收。
不管如何操作,都不是我們的初衷,需要我們花費大量心思在各種細節上來實現負載均衡的處理網絡消息。
總結
根據上面的內容,建議還是使用方案三:多個線程,多個epoll實例,多個監聽套接字。這種實現即簡單,也能負載均衡。
接收數據亂序
問題描述
-
套接字A有數據到達
-
喚起線程T1處理
-
在T1接收數據過程中,套接字A有新數據到達或者T1第一次接收沒有把數據讀完
-
喚起線程T2處理
這個問題非常嚴重,並且難以解決。如果是accept,那么就算多個線程調用,頂多是返回EAGAIN的錯誤,或者各自接收到一個連接請求,因為accept系統內部處理了,不會出現同時返回成功同一個連接請求。但是多個線程調用recv本來就是合理的,系統也不會做額外的限制。如果我們在這個地方沒有做處理(比如通過鎖把數據按照接收的順序放入同一個緩沖區,或者強制限制只能一個線程接收數據等),那么就會出現亂序。對方發送的是ABCDE,我們接收的是ACDBE,這肯定是不行的。
問題重現
#include "epoll2.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAX_EVENTS 100
#define MAX_BUF 1
using namespace std;
epoll2::epoll2()
{
m_epollfd = -1;
m_listenfd = -1;
for(auto& iter : m_threaddata)
{
iter.pepoll = this;
}
}
epoll2::~epoll2()
{
close(m_listenfd);
close(m_epollfd);
}
void epoll2::start()
{
struct sockaddr_in servaddr;
struct epoll_event ev;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9000);
if((m_listenfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) == -1)
{
cout << "create listen socket err" << endl;
exit(EXIT_FAILURE);
}
int val = 1;
if(setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof (val)) == -1)
{
cout << "set listen socket reuseport err" << endl;
}
if(bind(m_listenfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)) == -1)
{
cout << "bind listen socket err" << endl;
cout << strerror(errno) << endl;
exit(EXIT_FAILURE);
}
if(listen(m_listenfd, 5) == -1)
{
cout << "listen listen socket err" << endl;
exit(EXIT_FAILURE);
}
m_epollfd = epoll_create1(0);
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = m_listenfd;
if(epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
exit(EXIT_FAILURE);
}
int i = 0;
for(auto& iter : m_threaddata)
{
iter.num = i;
pthread_create(&(iter.threadid), NULL, start_routine, (void*)(&iter));
i++;
}
for(auto& iter : m_threaddata)
{
pthread_join(iter.threadid, NULL);
}
}
void epoll2::setnonblocking(int fd)
{
fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK));
}
void* epoll2::start_routine(void *parg)
{
epoll2 * pepoll = ((THREADDATA2*)parg)->pepoll;
struct epoll_event events[MAX_EVENTS];
struct sockaddr_in cliaddr;
char buf[MAX_BUF] = {0};
struct epoll_event ev;
socklen_t len = sizeof(struct sockaddr_in);
while(true)
{
int nfds = epoll_wait(pepoll->m_epollfd, events, MAX_EVENTS, -1);
if(nfds == -1)
{
break;
}
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd == pepoll->m_listenfd)
{
cout << "enter thread " << ((THREADDATA2*)parg)->num << " listen" << endl;
int connfd = accept(pepoll->m_listenfd, (struct sockaddr *)&cliaddr, &len);
if(connfd == -1)
{
if(errno == EAGAIN)
{
cout << "thread id: " << ((THREADDATA2*)parg)->num << " err eagain" << endl;
continue;
}
else
{
cout << "thread id: " << ((THREADDATA2*)parg)->num << " err return" << endl;
break;
}
}
pepoll->setnonblocking(connfd);
cout << "thread id: " << ((THREADDATA2*)parg)->num << " accept" << endl;
ev.events = EPOLLIN;
ev.data.fd = connfd;
if(epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
break;
}
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = pepoll->m_listenfd;
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_MOD, pepoll->m_listenfd, &ev);
}
else
{
cout << "thread id: " << ((THREADDATA2*)parg)->num << " wake up " << nfds << endl;
int npro = read(events[i].data.fd, buf, sizeof(buf));
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
else
{
cout << buf << endl;
npro = write(events[i].data.fd, buf, npro);
if(npro == -1)
{
close(events[i].data.fd );
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
}
}
}
}
cout << "quit thread " << ((THREADDATA2*)parg)->num << endl;
return NULL;
}
我們接收到一個連接,然后設置緩沖區為1,發送一個“hello world”。輸出如下
enter thread 49 listen
thread id: 49 accept
thread id: 49 wake up 1
h
thread id: 47 wake up 1
thread id: thread id: 49 wake up 1
e
48 wake up 1
thread id: l
thread id: 46 wake up 149 wake up 1
thread id: 44 wake up 1
thread id: o
thread id: 48 wake up 1
thread id: lthread id: 43 wake up 1
thread id: 37 wake up 1
thread id: 35 wake up 1
45s
wake up thread id: 41 wake up 1
thread id: 48 wake up 1
thread id: 1
rthread id: 39 wake up 1
vthread id:
e
thread id: 31 wake up 1
thread id: 35 wake up 1
thread id: 24 wake up 1
thread id:
thread id:
thread id: 31 wake up 1
thread id: 23thread id: 21 wake up 1
thread id: 1922 wake up wake up 1
wake up 1
thread id: 36 wake up 1
126
thread id: 32 wake up 1
wake up 1
thread id: 34 wake up 1
thread id: 40 wake up 1
3833 wake up 1
thread id: r42
wake up �1
e
wake up 1
44 wake up 1
我們看到多個線程被喚起,分別接收了部分數據。這種情況下我們處理起來很麻煩。
理論上接收數據與接收監聽是同樣的事件,對於epoll都是EPOLLIN。所以解決亂序的問題與解決監聽套接字驚群的問題思路基本一樣。
解決方案一 EPOLLEXCLUSIVE
我們對於新的連接設置EPOLLEXCLUSIVE標識,修改代碼如下
ev.events = EPOLLIN|EPOLLEXCLUSIVE;
ev.data.fd = connfd;
if(epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
break;
}
不出我們所料,這個標識並不能解決我們的問題,輸出結果如下
he
el
r
�
ls
r
v
e
o
解決方案二 EPOLLONESHOT
//接收完連接,增加到epoll中的時候設置EPOLLONESHOT
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = connfd;
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_ADD, connfd, &ev);
//接收完數據,把事件重新激活
ev.events = EPOLLIN|EPOLLONESHOT;
ev.data.fd = events[i].data.fd;
epoll_ctl(pepoll->m_epollfd, EPOLL_CTL_MOD, events[i].data.fd, &ev);
發送兩條消息,看一下輸出日志
thread id: 49 wake up 1
hello server
�
thread id: 49 wake up 1
hello server
�
可以解決這個問題。收發數據本來也不需要多線程同時操作。如果多線程接收數據需要額外的鎖,效率可能並不會太高。所以這種方案是可行的。
解決方案三
多個線程,多個epoll實例,多個監聽套接字。當一個連接進來,會被一個監聽套接字捕獲,然后就會加入到當前線程的epoll實例中監聽,不會有額外的線程監聽當前的epoll實例,相當於單線程,所以是可以解決這個問題的。
代碼與上面的基本一致,只不過改了接收的部內容。
#include "epoll1.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <arpa/inet.h>
#define MAX_EVENTS 100
#define MAX_BUF 1024
using namespace std;
epoll1::epoll1()
{
for(auto& iter : m_threaddata)
{
iter.pepoll = this;
}
}
epoll1::~epoll1()
{
}
void epoll1::start()
{
int i = 0;
for(auto& iter : m_threaddata)
{
iter.num = i;
pthread_create(&(iter.threadid), NULL, start_routine, (void*)(&iter));
i++;
}
for(auto& iter : m_threaddata)
{
pthread_join(iter.threadid, NULL);
}
}
void epoll1::setnonblocking(int fd)
{
fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK));
}
void* epoll1::start_routine(void *parg)
{
int epollfd = epoll_create1(0);
bool bcontinue = true;
//cout << "enter thread " << ((THREADDATA1*)parg)->num << endl;
epoll1 * pepoll = ((THREADDATA1*)parg)->pepoll;
struct epoll_event events[MAX_EVENTS];
struct sockaddr_in cliaddr;
char buf[MAX_BUF] = {0};
struct epoll_event ev;
socklen_t len = sizeof(struct sockaddr_in);
int listenfd = 0;
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9000);
if((listenfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) == -1)
{
cout << "create listen socket err" << endl;
bcontinue = false;
}
int val = 1;
if(bcontinue && setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (val)) == -1)
{
cout << "set listen socket reuseport err" << endl;
bcontinue = false;
}
if(bcontinue && setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof (val)) == -1)
{
cout << "set listen socket reuseport err" << endl;
bcontinue = false;
}
if(bcontinue && bind(listenfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)) == -1)
{
cout << "bind listen socket err" << endl;
cout << strerror(errno) << endl;
bcontinue = false;
}
if(bcontinue && listen(listenfd, 5) == -1)
{
cout << "listen listen socket err" << endl;
bcontinue = false;
}
ev.events = EPOLLIN;
ev.data.fd = listenfd;
if(bcontinue && epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
cout << "add listen socket to epoll err" << endl;
bcontinue = false;
}
while(bcontinue)
{
int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if(nfds == -1)
{
break;
}
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd == listenfd)
{
int connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &len);
if(connfd == -1)
{
if(errno == EAGAIN)
{
cout << "thread id: " << ((THREADDATA1*)parg)->num << " err eagain" << endl;
continue;
}
else
{
cout << "thread id: " << ((THREADDATA1*)parg)->num << " err return" << endl;
break;
}
}
pepoll->setnonblocking(connfd);
cout << "thread id: " << ((THREADDATA1*)parg)->num << " accept" << endl;
ev.events = EPOLLIN;
ev.data.fd = connfd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev);
}
else
{
cout << "thread id: " << ((THREADDATA1*)parg)->num << " wake up " << nfds << endl;
while (read(events[i].data.fd, buf, sizeof(buf)) != -1)
{
cout << buf;
}
cout << endl;
if(errno != EAGAIN)
{
close(events[i].data.fd );
epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
}
}
}
}
cout << "quit thread " << ((THREADDATA1*)parg)->num << endl;
close(listenfd);
close(epollfd);
return NULL;
}
輸出日志如下
thread id: 34 accept
thread id: 34 wake up 1
hello server
thread id: 34 wake up 1
hello server
同樣很好的解決了我們的問題
其他解決方案
與上面監聽套接字的四和五一樣,也有着同樣的影響,我們不使用這種模式,沒做驗證。
結論
-
EPOLLEXCLUSIVE在LT模式下並不能解決我們的問題,放棄
-
EPOLLONESHOT可以解決問題,但是在並發比較大的時候,不能很好的利用系統資源
-
多線程,多epoll實例。目前來看是最適合我們的方案
-
多進程,多epoll實例。更適合網站類型的,比如nginx,每個連接關聯比較弱。
-
ET模式,有着無法避免的缺陷,放棄
所以我們最終選擇多線程,多epoll實例的方案。
關於EPOLLEXCLUSIVE的用法總結
根據 @笨拙的菜鳥 提供的內容,我又做了測試,並且重新看了EPOLLEXCLUSIVE的man文檔和patch說明。從文檔和測試結果來看,EPOLLEXCLUSIVE是用於多個epoll實例監聽同一個socket對象的情況。
Introduce a new 'EPOLLEXCLUSIVE' flag that can be passed as part of the
'event' argument during an epoll_ctl() EPOLL_CTL_ADD operation. This new
flag allows for exclusive wakeups when there are multiple epfds attached
to a shared fd event source.
可以看到最后一句,這個新的flag,允許唯一的喚醒,當有多個epfds(epoll實例)被一個共享的fd(socket對象)的事件源附加的時候。
The implementation walks the list of exclusive waiters, and queues an
event to each epfd, until it finds the first waiter that has threads
blocked on it via epoll_wait(). The idea is to search for threads which
are idle and ready to process the wakeup events. Thus, we queue an event
to at least 1 epfd, but may still potentially queue an event to all epfds
that are attached to the shared fd source.
這一段內容也說明了實現方法,就是遍歷事件相關的epfd(epoll實例),直到找到第一個等待者,有一個線程阻塞在epoll_wait()。根據我的理解,系統遍歷與這個事件相關的所有epoll實例,在每個epoll實例中再查找是否有調用了epoll_wait()並且空閑的線程,如果有,就喚起這個。這樣的話,就能理解EPOLLEXCLUSIVE的作用場景了,系統是按照每一個epoll實例遍歷,我們原來一個epoll實例,多個線程調用epoll_wait監聽,這種情況是正好把系統屏蔽的功能給抵消掉了。
從測試結果和適用方案來看,還是建議每個線程一套單獨的epoll實例和監聽socket,因為多創建幾個監聽套接字也沒太大的影響,並且EPOLLEXCLUSIVE並沒有完全避免一個連接到達,多個實例喚醒,還有就是在並發沒有達到比較高的負荷的時候,也就是並發比創建的工作線程少,或者說少於一半的時候,負載並不是很好。