Rabbitmq解決連接時阻塞的問題(amqp_open_socket)


在使用接口Channel::Create()連接到rabbitmq時,如果網絡中斷或者ip端口地址不對的時候,程序就會一直阻塞在這個調用上,沒有 返回值沒有異常提示,這種情況如果你想提示個錯誤什么的就無能為力了,Panda工作中也遇到這個問題,我想:如果他能提供一個連接超時異常就好了,畢竟 SimpleAmqpClient只是對另外一個c語言開源項目rabbitmq-c的封裝,而且我記得rabbitmq-c是支持我所說的功能的。下面 請跟隨我一起一步一步完成這個事情吧。

 

 1
1 int m_nSockfd;
2 int m_nChannelIdSend;
3 int m_nChannelIdReve;
4 int m_nChannelIdResult;
5 amqp_connection_state_t m_Connection;
6 amqp_bytes_t m_stReply_to_queue;
View Code
       m_Connection = amqp_new_connection();
 2     m_nSockfd = amqp_open_socket(m_strIp.toLocal8Bit().data(), m_nPort);
 3     amqp_set_sockfd(m_Connection, m_nSockfd);
 4     amqp_login(m_Connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,m_strRabbitUser.toLocal8Bit().data(), m_strRabbitPwd.toLocal8Bit().data());
 5 
 6     //生產者
 7     amqp_channel_open(m_Connection, m_nChannelIdSend);
 8     amqp_get_rpc_reply(m_Connection);
 9     amqp_exchange_declare(m_Connection, m_nChannelIdSend, amqp_cstring_bytes("ping") , Type,
10                           0,1,0,0, amqp_empty_table);//綁定交換器 amqp_cstring_bytes("ping")
11 
12     m_strExchange = "ping";
13     m_strRoutingkey = "rpc";
14     m_pProducer1 = new CMqProducerThread(m_Connection, m_nChannelIdSend, m_strExchange, m_strRoutingkey, this);
15     connect(m_pProducer1, SIGNAL(SendProcess(int, QString)), this, SLOT(SetProcess(int, QString)));
16     m_pProducer1->start();
17 
18     //測試結果上傳
19     amqp_channel_open(m_Connection, m_nChannelIdResult);
20     amqp_get_rpc_reply(m_Connection);
21     amqp_exchange_declare(m_Connection, m_nChannelIdResult, amqp_cstring_bytes("testResult") , Type,
22                           0,1,0,0, amqp_empty_table);
23     m_strExchange = "testResult";
24     m_strRoutingkey = "result";
25     m_pResoultThread = new MQResultThread(m_Connection, m_nChannelIdResult, m_strExchange, m_strRoutingkey, this);

先來看一下Channel::Channel(…) 

然后在rabbitmq-c項目頭文件amqp.h中找到創建非阻塞socket的函數

代碼實現
有方向了,終於可以快樂的寫代碼o(∩_∩)o 。根據設計模式的開閉原則:我們做的事情更好的是擴展而不是修改現有的功能,所以比較優雅的方案應該是增加一個工廠函數生成創建一個channel,做法如下: 
在Channel.h增加兩個函數

    /**
     * 以非阻塞的方法創建Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,為NULL時采用阻塞方式打開
     */
    explicit Channel(const std::string &host,
        int port,
        const std::string &username,
        const std::string &password,
        const std::string &vhost,
        int frame_max,
        timeval*
        );

    /**
     * 工廠方法
     * 以非阻塞的方法創建Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,為NULL時采用阻塞方式打開
     */
    static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1",
        int port = 5672,
        const std::string &username = "guest",
        const std::string &password = "guest",
        const std::string &vhost = "/",
        int frame_max = 131072,
        timeval* timeout = NULL)
    {
        return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout);
    }

然后在Channel.cpp實現

Channel::Channel(const std::string &host,
                 int port,
                 const std::string &username,
                 const std::string &password,
                 const std::string &vhost,
                 int frame_max,
                 timeval* timeout) :
    m_impl(new Detail::ChannelImpl)
{
    m_impl->m_connection = amqp_new_connection();

    if (NULL == m_impl->m_connection)
    {
        throw std::bad_alloc();
    }

    try
    {
        amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
        int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
        }

        //如果連接超時,下面這一行就會拋出異常
        m_impl->CheckForError(sock);

        m_impl->DoLogin(username, password, vhost, frame_max);
    }
    catch (...)
    {
        amqp_destroy_connection(m_impl->m_connection);
        throw;
    }

    m_impl->SetIsConnected(true);
}

使用例子如下:

int main()
{
    timeval tv = {0};
    tv.tv_usec = 200 * 1000; //等待200毫秒
    try 
    {
        Channel::ptr_t channel = Channel::CreateNoBlock(
        "127.0.0.1", 5567"guest""guest""/", 131072, &tv);
        ...
        ...
    } catch (AmqpLibraryException& ex)
    {
        //提示連接失敗;
    }
    return 0;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM