mqtt學習筆記


mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
 

mosquito 是一個MQTT 服務器。目前只在公司移動互聯網服務器上環境部署過,用的比較少,show 一下安裝過程:


wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-1.0.3.tar.gz
cd mosquitto-1.0.3
make WITH_TLS_PSK=no  (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加參數WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/*  /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*

strip /homo/mosquitto/lib/*


echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf

修改 /homo/mosquitto/etc/mosquitto.conf 用戶:
user nobody
(其它參數目前使用默認值)

啟動服務(如有服務相關錯誤,檢查/home/mosquitto/mosquitto.log,端口默認1883)

/home/mosquitto/sbin/mosquitto  -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1



終端測試:
客戶端
mosquitto_sub -h SERVERIP -t test
服務器端執行后
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客戶端會成功收到"123456"

查看Mqtt訂閱者運行狀況
mosquitto_sub -v -t \$SYS/#
或者細化為其中一個命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
Mosquitto pub/sub代碼分析
優化:
對於后續的提高優化的地方,簡單記錄幾點:
 
發送數據用writev
poll -> epoll ,用以支持更高的冰法;
改為單線程版本,降低鎖開銷,目前鎖開銷還是非常大的。目測可以改為單進程版本,類似redis,精心維護的話應該能達到不錯的效果;
網絡數據讀寫使用一次盡量多讀的方式,避免多次進入系統調用;
內存操作優化。不free,留着下次用;
考慮使用spwan-fcgi的形式或者內置一次啟動多個實例監聽同一個端口。這樣能更好的發揮機器性能,達到更高的性能;
 
 
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一個進程的最大內存
JVMMemory         JVM內存
ReservedOsMemory  保留的操作系統內存
ThreadStackSize      線程棧的大小

在java語言里, 當你創建一個線程的時候,虛擬機會在JVM內存創建一個Thread對象同時創建一個操作系統線程,而這個系統線程的內存用的不是JVMMemory,而是系統中剩下的內存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 
 
 
JAVA使用EPoll來進行NIO處理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (僅限 Linux 系統 ),對並發idle connection會有大幅度的性能提升,這就是很多網絡服務器應用程序需要的。
 
啟用的方法如下:
 
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
 
  • 單進程FD上限是最大可以打開文件的數目,
這個數字一般遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max

 

mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可開啟服務
 
壓力測試: 系統參數修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)
 
測試最大連接數
 
#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
 (( c++ ))
done
 
# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 
 
kill -9 `ps -ef|grep mosquitto|awk '{print $2}'` 
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9 
pkill -9 mosquitto
killall -9 mosquitto
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu

查看線程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto

[root@wwwu fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2  0x000000000040414b in main ()
  • 實際測試
Test case 1
 
1 10934 max connections with QoS 0
[root@ ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12
 
腳本輸出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
 
 
Test Case 2:
該測試只是驗證能打開的最大連接數,意義不太大
max files:
 ulimit -n 15000
 
[root@wwwu test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088
 
cpu 2 core 只使用100%(total 200%),目前沒有publish數據,所以memory使用量很少3%,
還可以繼續提升
vmstat 
 
[root@wwwu test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0  24080 1865596 233880 266228    1    3     2     4    2   11  0  0 99  0  0
 1  0  24080 1865588 233880 266228    0    0     0     0 1030   30 17 33 50  0  0
 
[root@wwwu test]# top
 
top - 03:17:46 up 13 days, 10 min,  4 users,  load average: 1.09, 0.83, 0.37
Tasks: 164 total,   2 running, 162 sleeping,   0 stopped,   0 zombie
Cpu0  : 31.2%us, 68.8%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  0.0%us,  0.3%sy,  0.0%ni, 99.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   2957536k total,  1091948k used,  1865588k free,   233880k buffers
Swap:  3096568k total,    24080k used,  3072488k free,   266228k cached
 
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                
 6709 root      20   0  150m 113m  784 R 100.0  3.9  15:16.63 mosquitto   
 
發現問題
 

no route to host;

telnet: Unable to connect to remote host: No route to host

覺得甚是差異,估計是虛擬機裝了有問題,就把虛擬機中的防火牆給清了一下,發現可行。

[zhoulei@localhost ~]$ sudo iptables -F

 

mosquitto無法啟動

發現netstat -an|grep 1883 沒有發現記錄,發現mosquitto.db文件很大445M,把該文件刪除后,

可以正常啟動

 

2014/05/07 結果

------------------------------------------------------------------------------------------------

【subscriber】

 

開6000個subscribe active 連接,腳本如下

java  -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul  MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883


kswapd0 進程跳來跳去


top - 13:16:13 up 31 min,  4 users,  load average: 617.83, 187.04, 65.29

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu(s):  2.9%us, 58.7%sy,  0.0%ni,  0.0%id, 34.5%wa,  0.0%hi,  3.9%si,  0.0%st

Mem:   2957536k total,  2899268k used,    58268k free,      984k buffers

Swap:  3096568k total,   805564k used,  2291004k free,    14656k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                    2538 root      20   0 25.2g 1.7g 4420 S 122.1 60.3   5:27.12 java    


[root@wwwu ~]# free -m

             total       used       free     shared    buffers     cached

 

Mem:          2888       2810         77          0          0         15

 

id=1,rate=996.20,threads=6000

id=1,rate=1007.30,threads=6000

之后司機

publish 只有700M可用內存

[root@oc8050533176 Local]# top


top - 17:13:51 up  3:44,  2 users,  load average: 0.00, 0.00, 0.05

Tasks: 124 total,   1 running, 123 sleeping,   0 stopped,   0 zombie

Cpu(s): 14.7%us, 15.2%sy,  0.0%ni, 64.6%id,  0.0%wa,  0.0%hi,  5.4%si,  0.0%st

Mem:   2011348k total,  1736976k used,   274372k free,    23976k buffers

Swap:  8388600k total,        0k used,  8388600k free,   803732k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 1946 root      20   0 3362m 591m 7292 S 66.7 30.1   3:22.69 java    

最多2500個publish active 連接, throughput 3242 msgs/second, 

id=5,rate=3084.80,threads=2000

id=5,rate=2324.60,threads=2000

id=5,rate=3176.30,threads=2000

id=5,rate=3091.40,threads=2000


[mosquitto]

connection 后的top

[root@wwwu mosquitto]# top -b

top - 05:50:55 up  2:00,  4 users,  load average: 1.00, 0.81, 0.74

Tasks: 161 total,   2 running, 159 sleeping,   0 stopped,   0 zombie

Cpu(s): 31.9%us,  7.4%sy,  0.0%ni, 54.8%id,  3.2%wa,  0.0%hi,  2.6%si,  0.0%st

Mem:   2957536k total,  2518020k used,   439516k free,    31976k buffers

Swap:  3096568k total,        0k used,  3096568k free,   209064k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 2509 root      20   0 1264m 1.2g  740 R 77.9 42.5  93:35.86 mosquitto

 

 [root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l

8000


遇到問題

[root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 

CLOSE_WAIT 1

FIN_WAIT1 298

ESTABLISHED 2372

 

LISTEN 12


tcp        0  21848 9.119.154.107:1883          9.119.154.160:55851(sub)         FIN_WAIT1   

tcp        0  37393 9.119.154.107:1883          9.119.154.160:57275         FIN_WAIT1   

tcp        0      0 9.119.154.107:1883          9.119.154.160:56913         FIN_WAIT2   

 

tcp        0  40545 9.119.154.107:1883          9.119.154.160:55864         FIN_WAIT1   


netstat顯示的連接狀態有幾種WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他們的含義要從TCP的連接中斷過程說起


Server              Client

  -------- FIN -------->

  <------- ACK ---------

  <------- FIN ---------

  -------- ACK -------->

假設服務器主動關閉連接(Active Close)


服務器首先向客戶機發送FIN包,然后服務器進入FIN_WAIT_1狀態。

客戶機向服務器確認FIN包收到,向服務器發送FIN/ACK,客戶機進入CLOSE_WAIT狀態。

服務器收到來自客戶機的FIN/ACK后,進入FIN_WAIT_2狀態

現在客戶機進入被動關閉(“passive close”)狀態,客戶機操作系統等待他上面的應用程序關閉連接。一旦連接被關閉,客戶端會發送FIN包到服務器

當服務器收到FIN包后,服務器會向客戶機發送FIN/ACK確認,然后進入著名的TIME_WAIT狀態

 

由於在連接關閉后,還不能確定所有連接關閉前的包都被服務器接受到了(包的接受是沒有先后順序的),因此有了TIME_WAIT狀態。在這個狀態中,服務器仍然在等待客戶機發送的但是還未到達服務器的包。這個狀態將保持2*MSL的時間,這里的MSL指的是一個TCP包在網絡中存在的最長時間。一般情況下2*MSL=240秒。

-------------------------------------------------------------------------------------


10000 sub connections 

mosquitto 2G 內存 32% CPU

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu0  :  3.7%us, 11.6%sy,  0.0%ni, 84.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Cpu1  :  4.3%us, 13.6%sy,  0.0%ni, 82.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Mem:   2957536k total,  2202836k used,   754700k free,    34916k buffers

Swap:  3096568k total,        0k used,  3096568k free,    48164k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                

 

 2509 root      20   0 1264m 1.2g  740 S 32.6 42.5 148:35.98 mosquitto   



-----------------boss worker multithread model---------------------------

int queue[QUEUE_SIZE];

This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,

 dequeue, empty, etc. When the server accepts a connection,

 it enqueues the socket that the incoming connection is on. 

 The worker threads which were dispatched at the beginning are constantly checking this queue to

 see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port, 

 and read/parse/write the incoming http request.


int main(int argc, char* argv[])

{

int hSocket, hServerSocket;  

struct hostent* pHostInfo;  

struct sockaddr_in Address;

int nAddressSize = sizeof(struct sockaddr_in);

int nHostPort;

int numThreads;

int i;


init(&head,&tail);


/

hServerSocket=socket(AF_INET,SOCK_STREAM,0);


if(hServerSocket == SOCKET_ERROR)

{

    printf("\nCould not make a socket\n");

    return 0;

}


 

Address.sin_addr.s_addr = INADDR_ANY;

Address.sin_port = htons(nHostPort);

Address.sin_family = AF_INET;


printf("\nBinding to port %d\n",nHostPort);


 

if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {

    printf("\nCould not connect to host\n");

    return 0;

}

 

getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);


printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));


printf("Server\n\

      sin_family        = %d\n\

      sin_addr.s_addr   = %d\n\

      sin_port          = %d\n"

      , Address.sin_family

      , Address.sin_addr.s_addr

      , ntohs(Address.sin_port)

    );

//Up to this point is boring server set up stuff. I need help below this.

/

if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {

    printf("\nCould not listen\n");

    return 0;

}


while(1) {


    pthread_mutex_lock(&mtx);

    printf("\nWaiting for a connection");


    while(!empty(head,tail)) {

        pthread_cond_wait (&cond2, &mtx);

    }


   

    hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);


    printf("\nGot a connection");


    enqueue(queue,&tail,hSocket);


    pthread_mutex_unlock(&mtx);

    pthread_cond_signal(&cond);     // wake worker thread

}

}



void *worker(void *threadarg) {


while(true)

{


pthread_mutex_lock(&mtx);


while(empty(head,tail)) {

    pthread_cond_wait(&cond, &mtx);

}

int hSocket = dequeue(queue,&head);


unsigned nSendAmount, nRecvAmount;

char line[BUFFER_SIZE];


nRecvAmount = read(hSocket,line,sizeof line);

printf("\nReceived %s from client\n",line);



/

if(close(hSocket) == SOCKET_ERROR) {

    printf("\nCould not close socket\n");

    return 0;

}



pthread_mutex_unlock(&mtx);

pthread_cond_signal(&cond);


 

}


  • epoll與select、poll區別

1、相比於select與poll,epoll最大的好處在於它不會隨着監聽fd數目的增長而降低效率。內核中的select與poll的實現是采用輪詢來處理的,輪詢的fd數目越多,自然耗時越多。 
2、epoll的實現是基於回調的,如果fd有期望的事件發生就通過回調函數將其加入epoll就緒隊列中,也就是說它只關心“活躍”的fd,與fd數目無關。 
3、內核 / 用戶空間 內存拷貝問題,如何讓內核把 fd消息通知給用戶空間呢?在這個問題上select/poll采取了內存拷貝方法。而epoll采用了共享內存的方式。 
4、epoll不僅會告訴應用程序有I/0 事件到來,還會告訴應用程序相關的信息,這些信息是應用程序填充的,因此根據這些信息應用程序就能直接定位到事件,而不必遍歷整個fd集合。

epoll 的EPOLLLT (水平觸發,默認)和 EPOLLET(邊沿觸發)模式的區別

1、EPOLLLT:完全靠kernel epoll驅動,應用程序只需要處理從epoll_wait返回的fds,這些fds我們認為它們處於就緒狀態。此時epoll可以認為是更快速的poll。

2、EPOLLET:此模式下,系統僅僅通知應用程序哪些fds變成了就緒狀態,一旦fd變成就緒狀態,epoll將不再關注這個fd的任何狀態信息,(從epoll隊列移除)直到應用程序通過讀寫操作(非阻塞)觸發EAGAIN狀態,epoll認為這個fd又變為空閑狀態,那么epoll又重新關注這個fd的狀態變化(重新加入epoll隊列)。隨着epoll_wait的返回,隊列中的fds是在減少的,所以在大並發的系統中,EPOLLET更有優勢,但是對程序員的要求也更高,因為有可能會出現數據讀取不完整的問題,舉例如下:

假設現在對方發送了2k的數據,而我們先讀取了1k,然后這時調用了epoll_wait,如果是邊沿觸發,那么這個fd變成就緒狀態就會從epoll 隊列移除,很可能epoll_wait 會一直阻塞,忽略尚未讀取的1k數據,與此同時對方還在等待着我們發送一個回復ack,表示已經接收到數據;如果是電平觸發,那么epoll_wait 還會檢測到可讀事件而返回,我們可以繼續讀取剩下的1k 數據。

 
  • man epoll example
 
           #define MAX_EVENTS 10
           struct epoll_event ev, events[MAX_EVENTS];
           int listen_sock, conn_sock, nfds, epollfd;
 
           
 
           epollfd = epoll_create(10);
           if (epollfd == -1) {
               perror("epoll_create");
               exit(EXIT_FAILURE);
           }
 
           ev.events = EPOLLIN;
           ev.data.fd = listen_sock;
           if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
               perror("epoll_ctl: listen_sock");
               exit(EXIT_FAILURE);
           }
 
           for (;;) {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               if (nfds == -1) {
                   perror("epoll_pwait");
                   exit(EXIT_FAILURE);
               }
 
               for (n = 0; n < nfds; ++n) {
                   if (events[n].data.fd == listen_sock) {
                       conn_sock = accept(listen_sock,
                                       (struct sockaddr *) &local, &addrlen);
                       if (conn_sock == -1) {
                           perror("accept");
                           exit(EXIT_FAILURE);
                       }
                       setnonblocking(conn_sock);
                       ev.events = EPOLLIN | EPOLLET;
                       ev.data.fd = conn_sock;
                       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                                   &ev) == -1) {
                           perror("epoll_ctl: conn_sock");
                           exit(EXIT_FAILURE);
                       }
                   } else {
                       do_use_fd(events[n].data.fd);
                   }
               }
           }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM