top - 03:17:46 up 13 days, 10 min, 4 users, load average: 1.09, 0.83, 0.37
Tasks: 164 total, 2 running, 162 sleeping, 0 stopped, 0 zombie
Cpu0 : 31.2%us, 68.8%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 0.0%us, 0.3%sy, 0.0%ni, 99.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 1091948k used, 1865588k free, 233880k buffers
Swap: 3096568k total, 24080k used, 3072488k free, 266228k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6709 root 20 0 150m 113m 784 R 100.0 3.9 15:16.63 mosquitto
no route to host;
telnet: Unable to connect to remote host: No route to host
覺得甚是差異,估計是虛擬機裝了有問題,就把虛擬機中的防火牆給清了一下,發現可行。
[zhoulei@localhost ~]$ sudo iptables -F
mosquitto無法啟動
發現netstat -an|grep 1883 沒有發現記錄,發現mosquitto.db文件很大445M,把該文件刪除后,
可以正常啟動
2014/05/07 結果
------------------------------------------------------------------------------------------------
【subscriber】
開6000個subscribe active 連接,腳本如下
java -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883
kswapd0 進程跳來跳去
top - 13:16:13 up 31 min, 4 users, load average: 617.83, 187.04, 65.29
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.9%us, 58.7%sy, 0.0%ni, 0.0%id, 34.5%wa, 0.0%hi, 3.9%si, 0.0%st
Mem: 2957536k total, 2899268k used, 58268k free, 984k buffers
Swap: 3096568k total, 805564k used, 2291004k free, 14656k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2538 root 20 0 25.2g 1.7g 4420 S 122.1 60.3 5:27.12 java
[root@wwwu ~]# free -m
total used free shared buffers cached
Mem: 2888 2810 77 0 0 15
id=1,rate=996.20,threads=6000
id=1,rate=1007.30,threads=6000
之后司機
publish 只有700M可用內存
[root@oc8050533176 Local]# top
top - 17:13:51 up 3:44, 2 users, load average: 0.00, 0.00, 0.05
Tasks: 124 total, 1 running, 123 sleeping, 0 stopped, 0 zombie
Cpu(s): 14.7%us, 15.2%sy, 0.0%ni, 64.6%id, 0.0%wa, 0.0%hi, 5.4%si, 0.0%st
Mem: 2011348k total, 1736976k used, 274372k free, 23976k buffers
Swap: 8388600k total, 0k used, 8388600k free, 803732k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1946 root 20 0 3362m 591m 7292 S 66.7 30.1 3:22.69 java
最多2500個publish active 連接, throughput 3242 msgs/second,
id=5,rate=3084.80,threads=2000
id=5,rate=2324.60,threads=2000
id=5,rate=3176.30,threads=2000
id=5,rate=3091.40,threads=2000
[mosquitto]
connection 后的top
[root@wwwu mosquitto]# top -b
top - 05:50:55 up 2:00, 4 users, load average: 1.00, 0.81, 0.74
Tasks: 161 total, 2 running, 159 sleeping, 0 stopped, 0 zombie
Cpu(s): 31.9%us, 7.4%sy, 0.0%ni, 54.8%id, 3.2%wa, 0.0%hi, 2.6%si, 0.0%st
Mem: 2957536k total, 2518020k used, 439516k free, 31976k buffers
Swap: 3096568k total, 0k used, 3096568k free, 209064k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 R 77.9 42.5 93:35.86 mosquitto
[root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l
8000
遇到問題
[root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
FIN_WAIT1 298
ESTABLISHED 2372
LISTEN 12
tcp 0 21848 9.119.154.107:1883 9.119.154.160:55851(sub) FIN_WAIT1
tcp 0 37393 9.119.154.107:1883 9.119.154.160:57275 FIN_WAIT1
tcp 0 0 9.119.154.107:1883 9.119.154.160:56913 FIN_WAIT2
tcp 0 40545 9.119.154.107:1883 9.119.154.160:55864 FIN_WAIT1
netstat顯示的連接狀態有幾種WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他們的含義要從TCP的連接中斷過程說起
Server Client
-------- FIN -------->
<------- ACK ---------
<------- FIN ---------
-------- ACK -------->
假設服務器主動關閉連接(Active Close)
服務器首先向客戶機發送FIN包,然后服務器進入FIN_WAIT_1狀態。
客戶機向服務器確認FIN包收到,向服務器發送FIN/ACK,客戶機進入CLOSE_WAIT狀態。
服務器收到來自客戶機的FIN/ACK后,進入FIN_WAIT_2狀態
現在客戶機進入被動關閉(“passive close”)狀態,客戶機操作系統等待他上面的應用程序關閉連接。一旦連接被關閉,客戶端會發送FIN包到服務器
當服務器收到FIN包后,服務器會向客戶機發送FIN/ACK確認,然后進入著名的TIME_WAIT狀態
由於在連接關閉后,還不能確定所有連接關閉前的包都被服務器接受到了(包的接受是沒有先后順序的),因此有了TIME_WAIT狀態。在這個狀態中,服務器仍然在等待客戶機發送的但是還未到達服務器的包。這個狀態將保持2*MSL的時間,這里的MSL指的是一個TCP包在網絡中存在的最長時間。一般情況下2*MSL=240秒。
-------------------------------------------------------------------------------------
10000 sub connections
mosquitto 2G 內存 32% CPU
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu0 : 3.7%us, 11.6%sy, 0.0%ni, 84.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 4.3%us, 13.6%sy, 0.0%ni, 82.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 2202836k used, 754700k free, 34916k buffers
Swap: 3096568k total, 0k used, 3096568k free, 48164k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 S 32.6 42.5 148:35.98 mosquitto
-----------------boss worker multithread model---------------------------
int queue[QUEUE_SIZE];
This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,
dequeue, empty, etc. When the server accepts a connection,
it enqueues the socket that the incoming connection is on.
The worker threads which were dispatched at the beginning are constantly checking this queue to
see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port,
and read/parse/write the incoming http request.
int main(int argc, char* argv[])
{
int hSocket, hServerSocket;
struct hostent* pHostInfo;
struct sockaddr_in Address;
int nAddressSize = sizeof(struct sockaddr_in);
int nHostPort;
int numThreads;
int i;
init(&head,&tail);
/
hServerSocket=socket(AF_INET,SOCK_STREAM,0);
if(hServerSocket == SOCKET_ERROR)
{
printf("\nCould not make a socket\n");
return 0;
}
Address.sin_addr.s_addr = INADDR_ANY;
Address.sin_port = htons(nHostPort);
Address.sin_family = AF_INET;
printf("\nBinding to port %d\n",nHostPort);
if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {
printf("\nCould not connect to host\n");
return 0;
}
getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);
printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));
printf("Server\n\
sin_family = %d\n\
sin_addr.s_addr = %d\n\
sin_port = %d\n"
, Address.sin_family
, Address.sin_addr.s_addr
, ntohs(Address.sin_port)
);
//Up to this point is boring server set up stuff. I need help below this.
/
if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {
printf("\nCould not listen\n");
return 0;
}
while(1) {
pthread_mutex_lock(&mtx);
printf("\nWaiting for a connection");
while(!empty(head,tail)) {
pthread_cond_wait (&cond2, &mtx);
}
hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);
printf("\nGot a connection");
enqueue(queue,&tail,hSocket);
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond); // wake worker thread
}
}
void *worker(void *threadarg) {
while(true)
{
pthread_mutex_lock(&mtx);
while(empty(head,tail)) {
pthread_cond_wait(&cond, &mtx);
}
int hSocket = dequeue(queue,&head);
unsigned nSendAmount, nRecvAmount;
char line[BUFFER_SIZE];
nRecvAmount = read(hSocket,line,sizeof line);
printf("\nReceived %s from client\n",line);
/
if(close(hSocket) == SOCKET_ERROR) {
printf("\nCould not close socket\n");
return 0;
}
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond);
}
1、相比於select與poll,epoll最大的好處在於它不會隨着監聽fd數目的增長而降低效率。內核中的select與poll的實現是采用輪詢來處理的,輪詢的fd數目越多,自然耗時越多。
2、epoll的實現是基於回調的,如果fd有期望的事件發生就通過回調函數將其加入epoll就緒隊列中,也就是說它只關心“活躍”的fd,與fd數目無關。
3、內核 / 用戶空間 內存拷貝問題,如何讓內核把 fd消息通知給用戶空間呢?在這個問題上select/poll采取了內存拷貝方法。而epoll采用了共享內存的方式。
4、epoll不僅會告訴應用程序有I/0 事件到來,還會告訴應用程序相關的信息,這些信息是應用程序填充的,因此根據這些信息應用程序就能直接定位到事件,而不必遍歷整個fd集合。
epoll 的EPOLLLT (水平觸發,默認)和 EPOLLET(邊沿觸發)模式的區別
1、EPOLLLT:完全靠kernel epoll驅動,應用程序只需要處理從epoll_wait返回的fds,這些fds我們認為它們處於就緒狀態。此時epoll可以認為是更快速的poll。
2、EPOLLET:此模式下,系統僅僅通知應用程序哪些fds變成了就緒狀態,一旦fd變成就緒狀態,epoll將不再關注這個fd的任何狀態信息,(從epoll隊列移除)直到應用程序通過讀寫操作(非阻塞)觸發EAGAIN狀態,epoll認為這個fd又變為空閑狀態,那么epoll又重新關注這個fd的狀態變化(重新加入epoll隊列)。隨着epoll_wait的返回,隊列中的fds是在減少的,所以在大並發的系統中,EPOLLET更有優勢,但是對程序員的要求也更高,因為有可能會出現數據讀取不完整的問題,舉例如下:
假設現在對方發送了2k的數據,而我們先讀取了1k,然后這時調用了epoll_wait,如果是邊沿觸發,那么這個fd變成就緒狀態就會從epoll 隊列移除,很可能epoll_wait 會一直阻塞,忽略尚未讀取的1k數據,與此同時對方還在等待着我們發送一個回復ack,表示已經接收到數據;如果是電平觸發,那么epoll_wait 還會檢測到可讀事件而返回,我們可以繼續讀取剩下的1k 數據。
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd = epoll_create(10);
if (epollfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}