使用RabbitMQ時,連接rabbit-server一直連接失敗,代碼沒有任何錯誤提示。但是通過rabbitmqctl始終查詢不到連接以及創建的queue等信息。
官方的文件demo里面也沒有TcpConnection相關例子,只在github上有些簡單說明。
然而網上幾乎所有人都依然還是在使用Connection,幾乎沒有使用TcpConnection的例子。最后還是放棄了網絡求助,老老實實看源碼定位了。
使用tcpdump確認,代碼這邊的TcpConnection確實是已經向rabbit-server發出了連接請求。
開始觀察發現三次握手是已經建立了連接的,但是幾秒后,rabbit-server主動發送返回了一個RST包。這非常詫異,查看rabbit-server日志看到,產生了一次handshake_timeout錯誤。
現在可以確認,不是鑒權產生的問題,而是在連接時就已經失敗了,在完成連接到RST包收到剛好過了10s時間。在官方文檔查閱到,rabbit-server的心跳也剛好是10s。
后來還是確定問題點是在代碼上,但是代碼只有短短幾行從github上copy下來的,怎么會出錯呢。
最后在日志打印上發現monitor函數執行了兩次,這個小小的信息感覺看到了問題的原因,查看TcpConnection源碼monitor被調用的地方。
1 public: 2 /** 3 * Constructor 4 * @param connection Parent TCP connection object 5 * @param socket The socket filedescriptor 6 * @param buffer The buffer that was already built 7 * @param handler User-supplied handler object 8 */ 9 TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) : 10 TcpState(connection, handler), 11 _socket(socket), 12 _out(std::move(buffer)), 13 _in(4096) 14 { 15 // if there is already an output buffer, we have to send out that first 16 if (_out) _out.sendto(_socket); 17 18 // tell the handler to monitor the socket, if there is an out 19 _handler->monitor(_connection, _socket, _out ? readable | writable : readable); 20 } 21 22 /** 23 * Destructor 24 */ 25 virtual ~TcpConnected() noexcept 26 { 27 // we no longer have to monitor the socket 28 _handler->monitor(_connection, _socket, 0); 29 30 // close the socket 31 close(_socket); 32 }在構造和析構中各調用了一次,而且內部使用connection可能是為了提高效率進行了線程操作,也就是說實際的connection是在多線程中完成的。
最后嘗試修改代碼,使用指針進行操作,因為代碼並不是github上的單個函數文件,而是多處引用,最后問題解決。成功使用TcpConnection連接上了rabbit-server。
附上簡單代碼:
1 int Broker::init(std::string host,int port, std::string username, std::string userpasswd, int svrid) 2 { 3 // create an instance of your own tcp handler 4 _handle = new DSBrokerMessageHandle(); 5 6 // address of the server 7 AMQP::Address address(host, port,AMQP::Login(username,userpasswd),"/"); 8 9 // create a AMQP connection object 10 _connection = new AMQP::TcpConnection(_handle, address); 11
12 // and create a channel 13 _channel = new AMQP::TcpChannel(&connection); 14 15 auto receiveMessageCallback = [=](const AMQP::Message &message, 16 uint64_t deliveryTag, 17 bool redelivered) 18 { 19 //_channel->ack(deliveryTag); 20 }; 21 22 AMQP::QueueCallback callback = 23 [=](const std::string &name, int msgcount, int consumercount) 24 { 25 _channel->bindQueue("service", name, name); 26 _channel->bindQueue("service", name, "monitor"); 27 _channel->bindQueue("service", name, "heartbeat"); 28 29 _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback); 30 }; 31 32 AMQP::SuccessCallback success = [svrid, this, callback]() 33 { 34 char que[4] = { '\0' }; 35 ACE_OS::itoa(svrid, que, 10); 36 std::string quename(que); 37 _channel->declareQueue(quename, AMQP::durable).onSuccess(callback); 38 }; 39 40 // use the channel object to call the AMQP method you like 41 _channel->declareExchange("service", AMQP::fanout).onSuccess(success); 42 43 return 0; 44 }