RabbitMQ - TcpConnection析構引發的一次handshake_timeout


使用RabbitMQ時,連接rabbit-server一直連接失敗,代碼沒有任何錯誤提示。但是通過rabbitmqctl始終查詢不到連接以及創建的queue等信息。

官方的文件demo里面也沒有TcpConnection相關例子,只在github上有些簡單說明。

然而網上幾乎所有人都依然還是在使用Connection,幾乎沒有使用TcpConnection的例子。最后還是放棄了網絡求助,老老實實看源碼定位了。

使用tcpdump確認,代碼這邊的TcpConnection確實是已經向rabbit-server發出了連接請求。

 

開始觀察發現三次握手是已經建立了連接的,但是幾秒后,rabbit-server主動發送返回了一個RST包。這非常詫異,查看rabbit-server日志看到,產生了一次handshake_timeout錯誤。

現在可以確認,不是鑒權產生的問題,而是在連接時就已經失敗了,在完成連接到RST包收到剛好過了10s時間。在官方文檔查閱到,rabbit-server的心跳也剛好是10s。

后來還是確定問題點是在代碼上,但是代碼只有短短幾行從github上copy下來的,怎么會出錯呢。

最后在日志打印上發現monitor函數執行了兩次,這個小小的信息感覺看到了問題的原因,查看TcpConnection源碼monitor被調用的地方。

 1 public:
 2     /**
 3      *  Constructor
 4      *  @param  connection  Parent TCP connection object
 5      *  @param  socket      The socket filedescriptor
 6      *  @param  buffer      The buffer that was already built
 7      *  @param  handler     User-supplied handler object
 8      */
 9     TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) : 
10         TcpState(connection, handler),
11         _socket(socket),
12         _out(std::move(buffer)),
13         _in(4096)
14     {
15         // if there is already an output buffer, we have to send out that first
16         if (_out) _out.sendto(_socket);
17         
18         // tell the handler to monitor the socket, if there is an out
19         _handler->monitor(_connection, _socket, _out ? readable | writable : readable);
20     }
21     
22     /**
23      *  Destructor
24      */
25     virtual ~TcpConnected() noexcept
26     {
27         // we no longer have to monitor the socket
28         _handler->monitor(_connection, _socket, 0);
29         
30         // close the socket
31         close(_socket);
32     }

在構造和析構中各調用了一次,而且內部使用connection可能是為了提高效率進行了線程操作,也就是說實際的connection是在多線程中完成的。

最后嘗試修改代碼,使用指針進行操作,因為代碼並不是github上的單個函數文件,而是多處引用,最后問題解決。成功使用TcpConnection連接上了rabbit-server。

 

附上簡單代碼:

 1 int Broker::init(std::string host,int port, std::string username, std::string userpasswd, int svrid)
 2 {
 3     // create an instance of your own tcp handler
 4     _handle = new DSBrokerMessageHandle();
 5 
 6     // address of the server
 7     AMQP::Address address(host, port,AMQP::Login(username,userpasswd),"/");
 8 
 9     // create a AMQP connection object
10     _connection = new AMQP::TcpConnection(_handle, address);
11 
12 // and create a channel 13 _channel = new AMQP::TcpChannel(&connection); 14 15 auto receiveMessageCallback = [=](const AMQP::Message &message, 16 uint64_t deliveryTag, 17 bool redelivered) 18 { 19 //_channel->ack(deliveryTag); 20 }; 21 22 AMQP::QueueCallback callback = 23 [=](const std::string &name, int msgcount, int consumercount) 24 { 25 _channel->bindQueue("service", name, name); 26 _channel->bindQueue("service", name, "monitor"); 27 _channel->bindQueue("service", name, "heartbeat"); 28 29 _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback); 30 }; 31 32 AMQP::SuccessCallback success = [svrid, this, callback]() 33 { 34 char que[4] = { '\0' }; 35 ACE_OS::itoa(svrid, que, 10); 36 std::string quename(que); 37 _channel->declareQueue(quename, AMQP::durable).onSuccess(callback); 38 }; 39 40 // use the channel object to call the AMQP method you like 41 _channel->declareExchange("service", AMQP::fanout).onSuccess(success); 42 43 return 0; 44 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM