在使用接口Channel::Create()連接到rabbitmq時,如果網絡中斷或者ip端口地址不對的時候,程序就會一直阻塞在這個調用上,沒有 返回值沒有異常提示,這種情況如果你想提示個錯誤什么的就無能為力了,Panda工作中也遇到這個問題,我想:如果他能提供一個連接超時異常就好了,畢竟 SimpleAmqpClient只是對另外一個c語言開源項目rabbitmq-c的封裝,而且我記得rabbitmq-c是支持我所說的功能的。下面 請跟隨我一起一步一步完成這個事情吧。
1

1 int m_nSockfd; 2 int m_nChannelIdSend; 3 int m_nChannelIdReve; 4 int m_nChannelIdResult; 5 amqp_connection_state_t m_Connection; 6 amqp_bytes_t m_stReply_to_queue;
m_Connection = amqp_new_connection(); 2 m_nSockfd = amqp_open_socket(m_strIp.toLocal8Bit().data(), m_nPort); 3 amqp_set_sockfd(m_Connection, m_nSockfd); 4 amqp_login(m_Connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,m_strRabbitUser.toLocal8Bit().data(), m_strRabbitPwd.toLocal8Bit().data()); 5 6 //生產者 7 amqp_channel_open(m_Connection, m_nChannelIdSend); 8 amqp_get_rpc_reply(m_Connection); 9 amqp_exchange_declare(m_Connection, m_nChannelIdSend, amqp_cstring_bytes("ping") , Type, 10 0,1,0,0, amqp_empty_table);//綁定交換器 amqp_cstring_bytes("ping") 11 12 m_strExchange = "ping"; 13 m_strRoutingkey = "rpc"; 14 m_pProducer1 = new CMqProducerThread(m_Connection, m_nChannelIdSend, m_strExchange, m_strRoutingkey, this); 15 connect(m_pProducer1, SIGNAL(SendProcess(int, QString)), this, SLOT(SetProcess(int, QString))); 16 m_pProducer1->start(); 17 18 //測試結果上傳 19 amqp_channel_open(m_Connection, m_nChannelIdResult); 20 amqp_get_rpc_reply(m_Connection); 21 amqp_exchange_declare(m_Connection, m_nChannelIdResult, amqp_cstring_bytes("testResult") , Type, 22 0,1,0,0, amqp_empty_table); 23 m_strExchange = "testResult"; 24 m_strRoutingkey = "result"; 25 m_pResoultThread = new MQResultThread(m_Connection, m_nChannelIdResult, m_strExchange, m_strRoutingkey, this);
先來看一下Channel::Channel(…)
然后在rabbitmq-c項目頭文件amqp.h中找到創建非阻塞socket的函數
代碼實現
有方向了,終於可以快樂的寫代碼o(∩_∩)o 。根據設計模式的開閉原則:我們做的事情更好的是擴展而不是修改現有的功能,所以比較優雅的方案應該是增加一個工廠函數生成創建一個channel,做法如下:
在Channel.h增加兩個函數
/** * 以非阻塞的方法創建Channel * author: panxianzhan * @param timeout 最大等待事件,為NULL時采用阻塞方式打開 */ explicit Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* ); /** * 工廠方法 * 以非阻塞的方法創建Channel * author: panxianzhan * @param timeout 最大等待事件,為NULL時采用阻塞方式打開 */ static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, timeval* timeout = NULL) { return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout); }
然后在Channel.cpp實現
Channel::Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* timeout) : m_impl(new Detail::ChannelImpl) { m_impl->m_connection = amqp_new_connection(); if (NULL == m_impl->m_connection) { throw std::bad_alloc(); } try { amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection); int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout); } //如果連接超時,下面這一行就會拋出異常 m_impl->CheckForError(sock); m_impl->DoLogin(username, password, vhost, frame_max); } catch (...) { amqp_destroy_connection(m_impl->m_connection); throw; } m_impl->SetIsConnected(true); }
使用例子如下:
int main() { timeval tv = {0}; tv.tv_usec = 200 * 1000; //等待200毫秒 try { Channel::ptr_t channel = Channel::CreateNoBlock( "127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv); ... ... } catch (AmqpLibraryException& ex) { //提示連接失敗; } return 0; }