ActiveMQ之Mqtt的TCP丟包


現象

Mqtt Consumer應該收到的消息少於預期,登錄ActiveMQ的管理頁面里的Topics,查看Messages Enqueued發現同樣少於理應接收的數量。

定位問題

  1. 懷疑是TCP丟包,通過netstat -s命令觀察發送消息前后Tcp信息的輸出
  2. 對比兩次Tcp信息的輸出,發現packets pruned from receive queue because of socket buffer overrunpackets collapsed in receive queue due to low socket buffer等含有prunedcollapsed字樣的數值在增多。

解決方案

  • 首先調整系統級tcp的緩沖區,修改/etc/sysctl.conf如下
net.core.rmem_max = 8388608
net.core.wmem_max = 8388608
net.core.rmem_default = 655360
net.core.wmem_default = 655360
net.ipv4.tcp_rmem = 4096 655360 8388608 # Tcp接收緩沖區,分別是最小、默認、最大
net.ipv4.tcp_wmem = 4096 655360 8388608 # Tcp發送緩沖區,分別是最小、默認、最大
net.ipv4.tcp_mem = 8388608 8388608 8388608
<transportConnector name="mqtt"
uri="mqtt+nio://0.0.0.0:1883?maximumConnections=1000&amp;
wireFormat.maxFrameSize=104857600&amp;transport.ioBufferSize=1048576&amp;
transport.socketBufferSize=4194304"/>
- 其中**+nio**表示啟用**nio**方式的socket通信。Java里**nio**方式的socket比**bio**方式的更高效。mqtt默認采用**bio**。
- **socketBufferSize**調整緩沖區大小為4m,默認為64k,防止socket接收緩沖過小引發系統扔包
- **ioBufferSize**調整程序內部使用的緩沖區大小為1m,默認為8k,提高緩沖可以增加處理性能

代碼分析

  • MQTTTransportFactory繼承自TcpTransportFactory
    • org.apache.activemq.transport.tcp.TcpTransportFactory#doBind時解析URI帶入的參數
  • org.apache.activemq.transport.mqtt.MQTTNIOTransportFactory#createTcpTransportServer創建TcpTransportServer
  • org.apache.activemq.transport.tcp.TcpTransportServer#doRunWithServerSocketChannel創建與客戶端通信的Transport
    • 默認的socketBufferSize = 65536
    • 默認的ioBufferSize = 8192
  • Transportorg.apache.activemq.transport.TransportAcceptListener#onAccept處理
    • Transport被扔給org.apache.activemq.thread.TaskRunnerFactory線程池
    • 在線程池中創建org.apache.activemq.broker.Connection
  • Connection中的org.apache.activemq.broker.TransportConnection#start啟動整個TCP的鏈路

Windows下定位問題的要點


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM