(原創)如何使用boost.asio寫一個簡單的通信程序(一)


  boost.asio相信很多人聽說過,作為一個跨平台的通信庫,它的性能是很出色的,然而它卻談不上好用,里面有很多地方稍不注意就會出錯,要正確的用好asio還是需要花一番精力去學習和實踐的,本文將通過介紹如何寫一個簡單的通信程序來告訴讀者如何使用asio,希望對asio的初學者有所幫助。由於只是介紹其基本用法,作為例子的簡單示例並不考慮很多的業務邏輯和異常處理,只是介紹基本用法,讓初學者入門。

  使用asio容易出錯的一個主要原因是因為它是基於proactor模式實現的,asio有很多異步操作接口,這些異步接口稍不注意就會出現莫名奇妙的錯誤,所以要用好asio的第一步是理解其異步操思想。

異步操作思想

  用戶發起異步事件,asio將這些異步事件投遞到一個隊列中,用戶發起的操作就返回了,io_service::run會處理異步事件隊列中的所有的異步事件,它會將這些事件交給操作系統處理,操作系統處理完成之后會丟到asio的事件完成的隊列中,io_service發現有完成隊列中有完成事件了,就會通知用戶處理完成事件。 所以用戶要發起一個異步操作需要做三件事:

  1. 調用asio異步操作接口,發起異步操作;如:async_connect、async_read、async_write,這些異步接口需要一個回調函數入參,這個回調函數在事件完成時,由io_service觸發。
  2. 調用io_service::run處理異步事件;發起一個異步操作,必須要保證io_service::run,因為io_service通過一個循環去處理這些異步操作事件的,如果沒有事件就會退出,所以要保證異步事件發起之后,io_service::run還在運行。要保證一直run的一個簡單辦法就是使用io_service::work,它可以保證io_service一直run。
  3. 處理異步操作完成事件;在調用異步接口時會傳入一個回調函數,這個回調函數就是處理操作完成事件的,比如讀完成了,用戶需要對這些數據進行業務邏輯的處理。

  下圖描述了一個異步操作的過程:

  asio的的核心是io_service, 理解了asio異步接口的機制就容易找出使用asio過程中出現的問題了,在這里把一些常見的問題列出來,並分析原因和提出解決方法。

  • 問題1:為什么我發起了異步操作,如連接或者寫,對方都沒有反應,好像沒有收到連接請求或者沒有收到數據? 答案:一個很可能的原因是io_service在異步操作發起之后沒有run,解決辦法是保持io_service run。
  • 問題2:為什么發送數據會報錯? 答案:一個可能的原因是發送的數據失效了,異步發送要求發送的數據在回調完成之前都有效,異步操作只是將異步事件句柄投遞到io_service隊列中就返回了,並不是阻塞的,不注意這一點,如果是臨時變量的數據,出了作用域就失效了,導致異步事件還沒完成時數據就失效了。解決辦法,保證發送數據在事件完成之前一直有效。
  • 問題3:為什么監聽socket時,會報“函數不正確”的異常? 答案:因為監聽時,也要保證這個socket一直有效,如果是一個臨時變量socket,在調用異步監聽后超出作用域就失效了,解決辦法,將監聽的socket保存起來,使它的生命周期和acceptor一樣長。
  • 問題4:為什么連續調用異步操作時會報錯? 答案:因為異步操作必須保證當前異步操作完成之后再發起下一次異步操作。解決辦法:在異步完成事件處理完成之后再發起新的異步操作即可。
  • 問題5:為什么對方半天收不到數據,過了半天才一下子收到之前發送的數據? 答案:因為socket是流數據,一次發送多少數據不是外界能控制的,這也是所謂的粘包問題。解決辦法,可以在接收時指定至少收多少的條件,或者做tcp分包處理。

  說了這么多,還是來看看例子吧,一個簡單的通信程序:服務端監聽某個端口,允許多個客戶端連接上來,服務器將客戶端發來的數據打印出來。 先看看服務端的需求,需求很簡單,第一,要求能接收多個客戶端;第二,要求把收到的數據打印出來。

  要求能接收多個客戶端是第一個要解決的問題,異步接收需要用到acceptor::async_accept,它接收一個socket和一個完成事件的回調函數。前面的問題3中提到監聽的這個socket不能是臨時變量,我們要把它保存起來,最好是統一管理起來。可以考慮用一個map去管理它們,每次一個新連接過來時,服務器自動分配一個連接號給這個連接,以方便管理。然而,socket是不允許拷貝的,所以不能直接將socket放入容器中,還需要外面包裝一層才可以。

  第二個問題是打印來自客戶端的數據,既然要打印就需要異步讀數據了。異步讀是由socket完成,這個socket還要完成讀寫功能,為了簡化用戶操作,我將socket封裝到一個讀寫事件處理器中,這個事件處理器只具備具備讀和寫的功能。服務器每次監聽的時候我都會創建一個新的事件處理器並放到一個map中,客戶端成功連接后就由這個事件處理器去處理各種讀寫事件了。 根據問題1,異步讀寫時要保證數據的有效性,這里我將一個固定大小的緩沖區作為讀緩沖區。為了簡單起見我使用同步發送,異步接收。

 

具體看看這個讀寫事件處理器是怎么寫的:

const int MAX_IP_PACK_SIZE = 65536;
const int HEAD_LEN = 4;
class RWHandler
{
public:

    RWHandler(io_service& ios) : m_sock(ios)
    {
    }

    ~RWHandler()
    {
    }

    void HandleRead()
    {
        //三種情況下會返回:1.緩沖區滿;2.transfer_at_least為真(收到特定數量字節即返回);3.有錯誤發生
        async_read(m_sock, buffer(m_buff), transfer_at_least(HEAD_LEN), [this](const boost::system::error_code& ec, size_t size)
        {
            if (ec != nullptr)
            {
                HandleError(ec);
                return;
            }

            cout << m_buff.data() + HEAD_LEN << endl;

            HandleRead();
        });
    }

    void HandleWrite(char* data, int len)
    {
        boost::system::error_code ec;
        write(m_sock, buffer(data, len), ec);
        if (ec != nullptr)
            HandleError(ec);
    }

    tcp::socket& GetSocket()
    {
        return m_sock;
    }

    void CloseSocket()
    {
        boost::system::error_code ec;
        m_sock.shutdown(tcp::socket::shutdown_send, ec);
        m_sock.close(ec);
    }

    void SetConnId(int connId)
    {
        m_connId = connId;
    }

    int GetConnId() const
    {
        return m_connId;
    }

    template<typename F>
    void SetCallBackError(F f)
    {
        m_callbackError = f;
    }

private:
    void HandleError(const boost::system::error_code& ec)
    {
        CloseSocket(); 
        cout << ec.message() << endl;
        if (m_callbackError)
            m_callbackError(m_connId);
    }

private:
    tcp::socket m_sock;
    std::array<char, MAX_IP_PACK_SIZE> m_buff; 
    int m_connId;
    std::function<void(int)> m_callbackError;
};
View Code

  這個讀寫事件處理器有四個成員變量,第一個是socket它是具體的讀寫執行者;第二個是固定長度的讀緩沖區,用來讀數據;第三個是連接id,由連接管理層分配;第四個是回調函數,讀寫發生錯誤時回調到上層。當然還可以加一個tcp分包之后的回調函數,將應用層數據回調到應用層,這里簡單起見,只是將其打印出來。

 

再來看看Server是如何寫的:

#include <boost/asio/buffer.hpp>
#include <boost/unordered_map.hpp>
#include "Message.hpp"
#include "RWHandler.hpp"

const int MaxConnectionNum = 65536;
const int MaxRecvSize = 65536;
class Server
{
public:

    Server(io_service& ios, short port) : m_ios(ios), m_acceptor(ios, tcp::endpoint(tcp::v4(), port)), m_cnnIdPool(MaxConnectionNum)
    {
        int current = 0;
        std::generate_n(m_cnnIdPool.begin(), MaxConnectionNum, [&current]{return ++current; });
    }

    ~Server()
    {
    }

    void Accept()
    {
        cout << "Start Listening " << endl;
        std::shared_ptr<RWHandler> handler = CreateHandler();

        m_acceptor.async_accept(handler->GetSocket(), [this, handler](const boost::system::error_code& error)
        {
            if (error)
            {
                cout << error.value() << " " << error.message() << endl;
                HandleAcpError(handler, error);
            }
            
            m_handlers.insert(std::make_pair(handler->GetConnId(), handler));
            cout << "current connect count: " << m_handlers.size() << endl;

            handler->HandleRead();
            Accept();
        });
    }

private:
    void HandleAcpError(std::shared_ptr <RWHandler> eventHanlder, const boost::system::error_code& error)
    {
        cout << "Error,error reason:" << error.value() << error.message() << endl;
        //關閉socket,移除讀事件處理器
        eventHanlder->CloseSocket();
        StopAccept();
    }

    void StopAccept()
    {
        boost::system::error_code ec;
        m_acceptor.cancel(ec);
        m_acceptor.close(ec);
        m_ios.stop();
    }

    std::shared_ptr<RWHandler> CreateHandler()
    {
        int connId = m_cnnIdPool.front();
        m_cnnIdPool.pop_front();
        std::shared_ptr<RWHandler> handler = std::make_shared<RWHandler>(m_ios);

        handler->SetConnId(connId);

        handler->SetCallBackError([this](int connId)
        {        
            RecyclConnid(connId);
        });

        return handler;
    }

    void RecyclConnid(int connId)
    {
        auto it = m_handlers.find(connId);
        if (it != m_handlers.end())
            m_handlers.erase(it);
        cout << "current connect count: " << m_handlers.size() << endl; 
        m_cnnIdPool.push_back(connId);
    }

private:
    io_service& m_ios;
    tcp::acceptor m_acceptor;    boost::unordered_map<int, std::shared_ptr<RWHandler>> m_handlers;

    list<int> m_cnnIdPool;
};
View Code

這個Server具備連接管理功能,會統一管理所有連上來的客戶端。 其中的Message類是boost官網中的那個char_message

class Message
{
public:
    enum { header_length = 4 };
    enum { max_body_length = 512 };

    Message()
        : body_length_(0)
    {
    }

    const char* data() const
    {
        return data_;
    }

    char* data()
    {
        return data_;
    }

    size_t length() const
    {
        return header_length + body_length_;
    }

    const char* body() const
    {
        return data_ + header_length;
    }

    char* body()
    {
        return data_ + header_length;
    }

    size_t body_length() const
    {
        return body_length_;
    }

    void body_length(size_t new_length)
    {
        body_length_ = new_length;
        if (body_length_ > max_body_length)
            body_length_ = max_body_length;
    }

    bool decode_header()
    {
        char header[header_length + 1] = "";
        std::strncat(header, data_, header_length);
        body_length_ = std::atoi(header);
        if (body_length_ > max_body_length)
        {
            body_length_ = 0;
            return false;
        }
        return true;
    }

    void encode_header()
    {
        char header[header_length + 1] = "";
        std::sprintf(header, "%4d", body_length_);
        std::memcpy(data_, header, header_length);
    }

private:
    char data_[header_length + max_body_length];
    std::size_t body_length_;
};
View Code

  

  至此一個簡單的服務端程序寫完了,還要把這個Server運行起來。

void TestServer()
{
    io_service ios;
    //boost::asio::io_service::work work(ios);
    //std::thread thd([&ios]{ios.run(); }); 

    Server server(ios, 9900);
    server.Accept();
    ios.run();

    //thd.join();
}

注意看這個TestServer函數,看我是如何保證io_service::run一直運行的, 我這里沒有使用io_service::work來保證,用了一種更簡單的方法,具體方法讀者看代碼便知。

現在可以寫一個簡單的客戶端來測試一下,看看服務器能否正常工作,下一篇再繼續寫如何寫一個簡單的客戶端程序。

如果你覺得這篇文章對你有用,可以點一下推薦,謝謝。

c++11 boost技術交流群:296561497,歡迎大家來交流技術。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM