本文已於20170903更新完畢,所有boost asio 代碼均為本人手抄。編譯器為vs2013,並且所有代碼已經上傳,本文下方可下載源碼
為了學習boost asio庫,我是從boost的官方boost asio的教程學起的。
每一個示例我都抄寫了一遍以加深記憶,每一個例子我都用自己的話概括一遍,雖然概括的不是很好,代碼覺得難懂的地方我都加注釋。
1.同步使用Timer
本便使用了boost::asio::deadline_timer,這個timer有兩種狀態:過期和不過期。wait函數調用一個過期的timer直接返回。
int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; boost::asio::deadline_timer t(io,boost::posix_time::seconds(5)); t.wait(); std::cout<<"wait finished!"<<std::endl; return 0; }
2.異步使用Timer
下在演示了使用deadline_timer的asyn_wati函數實現異步等待。但要注意的一點是異步等待必須要調用io.run才可以。而且必須在io.run函數執行之前調用asyn_wait,否則io.run會立即返回,因為他沒有可以做的事。這說明io.run必須至少有一個等待的,否則它會直接返回。asio函數保證回調函數執行和io.run所在的線程一樣!
//異步Timer void print(const boost::system::error_code & ) { std::cout<<"Wait Finished"<<std::endl; } int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; boost::asio::deadline_timer t(io,boost::posix_time::seconds(5)); t.async_wait(&print); io.run(); return 0; }
3.為回調函數綁定參數
這個例子一個是說明異步Timer的可持續性問題,也就是在回調中設置Time的超時時間。另一個說明回調函數參數的綁定 。但是實際發現我官的代碼沒有發生那個重復回調的效果。原因是我只是調用了expire_at而沒有調用再次等待函數async_wait。這讓我更加明白expires_at這個函數相當於下次觸發的時間。而async_wait提交一個等待申請。
async_wait提交一次,回調函數執行一次,而expire_at設定下次回調函數調用的時間。
#include <boost/bind.hpp> void Print(const boost::system::error_code & , boost::asio::deadline_timer * t,int * count) { if(*count < 5) { std::cout<<*count<<std::endl; ++(*count); t->expires_at(t->expires_at() + boost::posix_time::seconds(1)); t->async_wait(boost::bind(Print,boost::asio::placeholders::error,t,count)); } } int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; int count = 0; boost::asio::deadline_timer t(io,boost::posix_time::seconds(1)); t.async_wait(boost::bind(Print,boost::asio::placeholders::error,&t,&count)); io.run(); return 0; }
4.類成員做為timer的回調函數
這個例子主要演示了,如何綁定一個類成員函數作為一個回調
class Print { public: Print(boost::asio::io_service & io) :timer_(io,boost::posix_time::seconds(1)),count_(0) { timer_.async_wait(boost::bind(&Print::print,this)); } ~Print() { std::cout<<"finnal count is "<<count_<<std::endl; } void print() { if(count_ < 5) { std::cout<<count_<<std::endl; ++count_; timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1)); timer_.async_wait(boost::bind(&Print::print,this)); } } protected: boost::asio::deadline_timer timer_; int count_; }; int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; Print p(io); io.run(); return 0; }
4.在多線程程序中的同步回調
先前的例子通過io_service.run和同步回調在同一個線程內,正如你所知的那樣,asio保證回調函數只能被在io_service.run()所在的線程調用 。因此,只有在一個線程內調用io_service::run保證回調函數不會並發執行。這樣在服務器程序中有兩個局限性:
1.當回調函數執行時間比較長時響應太慢
2.沒有起到多處理器的優勢
如果你注意到這個局限性,一個可供選擇的方案是創建一個線程池去調用io_service.run()函數,這樣實現的回調的並發,我們需要去同步一個共享變量。
下面的例子使用到了An boost::asio::strand ,他保證這些回調函數通過strans派遣,它可以允許一個回調函數在另一個回調函數執行之前完成。簡單點說,這里的strand就是讓回調函數不會並發的執行。但是這里的strand到底的意圖在哪里?不是要演示多線程執行回調嗎?這里又做了strand使回調又依次執行好想沒有達到多線程效果
#include <boost/thread/thread.hpp> class printer { public: printer(boost::asio::io_service & io) :strand_(io), timer1_(io,boost::posix_time::seconds(1)), timer2_(io,boost::posix_time::seconds(1)), count_(0) { timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this))); timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this))); } void print1() { if(count_ < 10) { std::cout<<"Timer 1:"<<count_<<std::endl; ++count_; timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1)); timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this))); } } void print2() { if(count_ < 10) { std::cout<<"Timer 2:"<<count_<<std::endl; ++count_; timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1)); timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this))); } } private: boost::asio::io_service::strand strand_; boost::asio::deadline_timer timer1_; boost::asio::deadline_timer timer2_; int count_; }; int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; printer p(io); boost::thread t(boost::bind(&boost::asio::io_service::run,&io)); io.run(); t.join(); return 0; }
下面有時間研究一下 boost::asio::strand的用法
5.簡單的一個TCP服務端
下面程序演示一個boost做的最簡單的一個服務端程序,客戶端連接之后服務器給客戶端發送一個當前時間的字符串
下面值得一提的是tcp::acceptor,他被封裝為socket的服務端接收器,構造他時需要一個io_service和一個tcp::endpoint。
std::string make_daytime_string() { using namespace std; time_t now = time(0); return ctime(&now); } int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io_service; tcp::acceptor acceptor(io_service,tcp::endpoint(tcp::v4(),13)); for(;;) { tcp::socket socket(io_service); acceptor.accept(socket);//接受一個客戶端socket std::string message = make_daytime_string(); boost::system::error_code ignored_error; boost::asio::write(socket,boost::asio::buffer(message),ignored_error); } } catch(std::exception & e) { std::cerr<<e.what()<<std::endl; } return 0; }
6.簡單的一個TCP客戶端
int _tmain(int argc, _TCHAR* argv[]) { try { if(argc != 2) { std::cerr<<"Usage:client <host>"<<std::endl; return 1; } boost::asio::io_service io_service; tcp::resolver resolver(io_service); tcp::resolver::query query(argv[1],"daytime"); tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); tcp::socket socket(io_service); boost::asio::connect(socket,endpoint_iterator); for(;;) { boost::array<char,128> buf; boost::system::error_code error; size_t len = socket.read_some(boost::asio::buffer((buf)),error); if(error == boost::asio::error::eof) break; else if(error) throw boost::system::system_error(error); std::cout.write(buf.data(),len); } } catch(std::exception & e) { std::cerr<<e.what()<<std::endl; } return 0; }
7.TCP異步服務端
std::string make_daytime_string() { using namespace std; time_t now = time(0); return ctime(&now); } class tcp_connection : public boost::enable_shared_from_this<tcp_connection> { public: typedef boost::shared_ptr<tcp_connection> tcp_connection_ptr; static tcp_connection_ptr Create(boost::asio::io_service & io) { return tcp_connection_ptr(new tcp_connection(io)); } tcp::socket & Socket() { return socket_; } void Start() { message_ = make_daytime_string(); boost::asio::async_write(socket_,boost::asio::buffer(message_), boost::bind(&tcp_connection::handle_write,shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } private: tcp_connection(boost::asio::io_service & io) : socket_(io) {} void handle_write(const boost::system::error_code & ,size_t) { } tcp::socket socket_; std::string message_; }; class TcpServer { public: TcpServer(boost::asio::io_service & io) : acceptor_(io,tcp::endpoint(tcp::v4(),13)) { start_accept(); } protected: void start_accept() { tcp_connection::tcp_connection_ptr new_connection = tcp_connection::Create(acceptor_.get_io_service()); acceptor_.async_accept(new_connection->Socket(), boost::bind(&TcpServer::handle_accept,this,new_connection,boost::asio::placeholders::error)); } tcp::acceptor acceptor_; void handle_accept(tcp_connection::tcp_connection_ptr new_connection, const boost::system::error_code & error) { if(!error) new_connection->Start(); start_accept(); } }; int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io_service; TcpServer server(io_service); io_service.run(); } catch (std::exception & e) { std::cerr << e.what() << std::endl; } return 0; }
以上代碼調用異步函數asyn_accept和asyn_write分別進行異步接受socket和異步socket發送。
以上代碼是官方tutorial的代碼,有幾點特別的地方值得學習:
- 構造函數私有化
一般自己寫代碼構造函數不可能給私有化,而類tcp_connection使用一個靜態類成員函數Create生產一個對象,而使得類的構造函數可以私有。 - 使用enable_shared_from_this
boost類enable_shared_from_this的好處是避免在類成員函數中傳遞this而傳遞一個shared_ptr智能指針,這樣不用擔心釋放的問題。而在這里,如果傳指針則有可能所持有的指針指向的對象已經被釋放,如果用shared_ptr則可以保證不被釋放,引用官方的一句話:We will useshared_ptr
andenable_shared_from_this
because we want to keep thetcp_connection
object alive as long as there is an operation that refers to it. - 不指定沒有用的參數,有可能注意到handle_write()沒有error和byte_transfered參數,因為body中沒有用到這兩個參數,如果參數不使用可能以移除參數
8.Custom Allocation
// Async_Allocation.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include <array> #include <cstdlib> #include <iostream> #include <memory> #include <type_traits> #include <utility> #include <boost/asio.hpp> #include <boost/bind.hpp> using boost::asio::ip::tcp; //這個類實現了一個分配函數,他的功能是不讓頻繁的分配和釋放。 class handler_allocator { public: handler_allocator() : in_use_(false){} handler_allocator(const handler_allocator &) = delete; handler_allocator& operator=(const handler_allocator &) = delete; //分配函數 //如果當前沒有被使用,那標記為已使用 並返回指針 //如果當前正在使用,則分配新的內存 new void * allocate(std::size_t size) { if (!in_use_ && size < sizeof(storage_)) { in_use_ = true; return &storage_; } else return ::operator new(size); } //釋放函數 //如果當前要釋放的指針就是本身,則標記為未使用 //如果當前要釋放的指針不是本身,那進行默認釋放 delete void dealocate(void * pointer) { if (pointer == &storage_) in_use_ = false; else operator delete(pointer); } private: std::aligned_storage<1024> storage_; bool in_use_; }; template<typename Handler> class custom_alloc_handler { public: custom_alloc_handler(handler_allocator & a, Handler h) : allocator_(a), handler_(h){} //這個函數重置()運算符,使用可變參模板,調用handler_() template<typename ...Args> void operator()(Args&&... args) { handler_(std::forward<Args>(args)...); } //?? friend void * asio_handler_allocate(std::size_t size, custom_alloc_handler<Handler>*this_handler) { return this_handler->allocator_.allocate(size); } //?? friend void asio_handler_deallocate(void * pointer, std::size_t, custom_alloc_handler<Handler> * this_handler) { this_handler->allocator_.dealocate(pointer); } private: handler_allocator & allocator_; Handler handler_; }; //他返回一個Handle template<typename Handler> inline custom_alloc_handler<Handler> make_custom_alloc_handler(handler_allocator & a, Handler h) { return custom_alloc_handler<Handler>(a, h); } class session : public std::enable_shared_from_this<session> { public: session(tcp::socket socket) : socket_(std::move(socket)){} void start() { do_read(); } private: void do_read() { auto self(shared_from_this()); socket_.async_read_some(boost::asio::buffer(data_), make_custom_alloc_handler(allocator_, [this, self](boost::system::error_code ec, std::size_t length) { if (!ec) do_write(length); } )); } void do_write(std::size_t length) { auto self(shared_from_this()); boost::asio::async_write(socket_,boost::asio::buffer(data_,length),make_custom_alloc_handler(allocator_, [this, self](boost::system::error_code ec, std::size_t) { if (!ec) do_read(); })); } tcp::socket socket_; std::array<char, 1024> data_;//存儲從客戶端接受來的數據 handler_allocator allocator_;//自定義內存分配 }; class server { public: server(boost::asio::io_service & io,short port) : acceptor_(io, tcp::endpoint(tcp::v4(), port)), socket_(io) { do_accept(); } private: void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) std::make_shared<session>(std::move(socket_))->start(); else std::cerr << ec.value() << ec.message() << std::endl; do_accept(); }); } tcp::acceptor acceptor_; tcp::socket socket_; }; int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io; server s(io, 13); io.run(); } catch (std::exception & e) { std::cerr << "Exception " << e.what() << std::endl; } return 0; }
以上代碼在調用socket.async_read_some的時候,第二個參數原來是一個Handler,原型如下:
void handler( const boost::system::error_code& error, // Result of operation. std::size_t bytes_transferred // Number of bytes read. );
函數make_custom_alloc_handler產生一個custom_alloc_handler<Handler>對象,custom_alloc_handler<Handler>重載括號運算符實現對回調的調用,這種方法對於我來說感覺很厲害。總之這片代碼我看得不是很懂。
首先:回調用函數應該是一個執行體,也就是std::function,而這里來一個custom_alloc_handler<Handler>對象,對象也可以當作執行體?
其次:這個函數沒有用到asio_handler_allocate和asio_handler_deallocate,我也不知道如何使用。這個放到以后再研究
經過學習和查詢信息得出的結果:
- 異步操作可以增加一個臨時的分配對象asio_handler_allocate。因為異步操作有一個handler函數對象,這個臨時對象可以堪稱是與handler函數對象相關聯的。本例中asio_handler_allocate為handler類對象的一個友元成員函數。這樣在分配內存時,默認就調用此函數進行分配內存。任何與handler相關聯的臨時對象會在handler執行完之后被析構,而asio_handler_allocate這里除了size參數可以額外增加參數,例如本例中的this_handler參數一樣,所以這里允許同一塊內存可以被后來的異步操作重復利用,asio_handler_allocate原型如下:
void * asio_handler_allocate( std::size_t size, ... );
- Handler允許有多種形式存在
- 函數形式
void read_handler( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... }
這種形式最為普通,就是一個回調用函數而已 - 類對象(重載括號運算符)
struct read_handler { ... void operator()( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... } ... };
本例中應該就是這種 - 類成員函數
void my_class::read_handler( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... } ... socket.async_read(..., boost::bind(&my_class::read_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
- 函數形式
- 通過以上知識點,可以清楚知道本例代碼是如何執行的了。
9.Buffers
// BB_CountedReferenceBuffer.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include <string> #include <boost/asio.hpp> #include <memory> #include <utility> #include <iostream> using boost::asio::ip::tcp; class shared_const_buffer { public: //構造函數用std::string 分別初始化buffer_放data_對象 shared_const_buffer(const std::string & data) : data_(new std::vector<char>(data.begin(), data.end())), buffer_(boost::asio::buffer(*data_)) {} typedef boost::asio::const_buffer value_type; typedef const boost::asio::const_buffer* const_iterator; const boost::asio::const_buffer* begin() const { return &buffer_; } const boost::asio::const_buffer* end() const { return &buffer_ + 1; } private: std::shared_ptr<std::vector<char>> data_;//vector char boost::asio::const_buffer buffer_; }; class session : public std::enable_shared_from_this<session> { public: session(tcp::socket socket) :socket_(std::move(socket)) {} void start() { do_write(); } private: void do_write() { std::time_t now = std::time(0); shared_const_buffer buffer(std::ctime(&now)); auto self(shared_from_this()); boost::asio::async_write(socket_, buffer, [this, self](boost::system::error_code,std::size_t) { }); } tcp::socket socket_; }; class server { public: server(boost::asio::io_service & io,short port) : acceptor_(io, tcp::endpoint(tcp::v4(),port)), socket_(io) { do_accept(); } private: void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) std::make_shared<session>(std::move(socket_))->start(); else std::cerr << ec.message() << std::endl; do_accept(); }); } tcp::acceptor acceptor_; tcp::socket socket_; }; int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io_service; server s(io_service, 13); io_service.run(); } catch (std::exception & e) { std::cerr << "Exception: " << e.what() << std::endl; } return 0; }
代碼解析:
本例主要演示了,異步操作中可以自定義的buffer。
以上代碼自定義一個類shared_const_buffer,在調用async_write用這個類對象。async_write有多個重載,這里主要說示例中用到的重載形式,即:
template< typename AsyncWriteStream, typename ConstBufferSequence, typename WriteHandler> void-or-deduced async_write( AsyncWriteStream & s, const ConstBufferSequence & buffers, WriteHandler handler);
第二具模板參數ConstBufferSequence為一個模板參數,自定義ConstBufferSequence模板類有一些要求如下 :
在下面要求列表中,X表示為一個包含類型T對象的類。a表示表示一個類型為X的值,u表示一個標識符
本例中T為boost::asio::const_buffer buffer_,X為本例中的shared_const_buffer。
- X::value_type 返回類型為T,用於表示X實際表示的value_type為T,本例中為boost::asio::const_buffer
- X::const_iterator 指向T的迭代器類型,表示iterator類型實際為哪種類型,本例中為 const boost::asio::const_buffer *
- X(a) 構造函數
- X u(a) 暫時不知如何解釋
- (&a)->~X() 暫時不知如何解釋
- a.begin() 返回起始迭代器
- a.end() 返回終止迭代器
10.Chat_message數據包類
class chat_message { public: enum { header_length = 4 }; enum { max_body_length = 512 }; chat_message() : body_length_(0) {} const char * data() const { return data_; } char * data() { return data_; } std::size_t length() const { return header_length + body_length_; } const char * body() const { return data_ + header_length; } char * body() { return data_ + header_length; } std::size_t body_length() const { return body_length_; } void body_length(std::size_t new_length) { body_length_ = new_length; if (body_length_ > max_body_length) body_length_ = max_body_length; } bool decode_header() { char header[header_length + 1] = ""; strncat_s(header, data_, header_length); body_length_ = std::atoi(header); if (body_length_ > max_body_length) { body_length_ = 0; return false; } return true; } void encode_header() { char header[header_length + 1] = ""; sprintf_s(header, "%4d", static_cast<int>(body_length_)); std::memcpy(data_, header, header_length); } private: char data_[header_length + max_body_length]; std::size_t body_length_; };
這個類比較簡單,他把一個數據包定義為頭和體。頭部是一個整形,代表body的大小。
11.Chat_Server詳解
先上代碼
// CB_ChatServer.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include "..\CA_ChatMessage\chat_message.h" #include <deque> #include <set> #include <memory> #include <boost/asio.hpp> #include <iostream> using boost::asio::ip::tcp; //deque類似於vector,但他可以快速的在頭部和尾部插入元素 typedef std::deque<chat_message> chat_message_queue; //聊天參與者 class chat_participant { public: //析構函數 virtual ~chat_participant() {} //交付 virtual void deliver(const chat_message & msg) = 0; }; typedef std::shared_ptr<chat_participant> chat_participant_ptr; /* chat_room 對所有client進行管理 */ class chat_room { public: //有客戶端加入 void join(chat_participant_ptr participant) { participants_.insert(participant); //當客戶端加入之后,先把最近消息給廣播一下 for (auto msg : recent_msg_) participant->deliver(msg); } //有客戶端離開 void leave(chat_participant_ptr participant) { participants_.erase(participant); } //廣播一條消息 void deliver(const chat_message & msg) { recent_msg_.push_back(msg);//增加到最近消息列表 while (recent_msg_.size() > max_recent_msgs) recent_msg_.pop_front(); //給所有客戶端廣播這條消息 for (auto participant : participants_) participant->deliver(msg); } private: std::set<chat_participant_ptr> participants_; enum{max_recent_msgs = 100}; chat_message_queue recent_msg_; }; class chat_session : public chat_participant, public std::enable_shared_from_this<chat_session> { public: chat_session(tcp::socket socket,chat_room & room) :socket_(std::move(socket)), room_(room){} void start() { room_.join(shared_from_this()); do_read_header(); } //廣播一個消息,這里最主要做的其實是 write_msgs_.push_back(msg); //而do_write,只顯為了驅動,大多數的write_msgs是在驅動后的on write里面執行的。 virtual void deliver(const chat_message & msg) { //為什么要這樣寫,因為到后面room 的recent_msg有可能有幾十個,例如是50個。則 //Post write會 Post這么多次,而這里直接Post進一個對隊列write_msgs,然后post一次 //而其它的post只在OnPost里面再次去調用post write bool write_in_process = !write_msgs_.empty();//先前write_msgs是否不為空 write_msgs_.push_back(msg);//添加要廣播的消息到write_msgs對列里面 if (!write_in_process)//如果先前write_msgs為空的話,說明寫的消息正在投遞。 do_write(); } private: //讀消息 void do_read_header() { auto self(shared_from_this()); boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.data(),chat_message::header_length), [this, self](boost::system::error_code ec, std::size_t) { if (!ec && read_msg_.decode_header()) do_read_body(); else room_.leave(shared_from_this()); }); } void do_read_body() { auto self(shared_from_this()); boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.body(),read_msg_.body_length()), [this, self](boost::system::error_code ec, std::size_t) { if (!ec) { room_.deliver(read_msg_);//聊天室將消息保存到recent消息之后再將此消息廣播出去 do_read_header(); } else room_.leave(shared_from_this()); }); } //這里是給客戶端發次所有的write_msgs_。直到write_msgs_為空停止post write void do_write() { auto self(shared_from_this()); boost::asio::async_write(socket_, boost::asio::buffer(write_msgs_.front().data(),write_msgs_.front().length()), [this, self](boost::system::error_code ec, std::size_t) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else room_.leave(shared_from_this()); }); } tcp::socket socket_;//通信socket chat_room & room_;//房間引用 chat_message read_msg_;//通信用到的read_msg chat_message_queue write_msgs_;//這個是最近消息隊列 }; class chat_server { public: chat_server(boost::asio::io_service & io, const tcp::endpoint & endpoint) : acceptor_(io, endpoint), socket_(io) { do_accept(); } private: void do_accept() { acceptor_.async_accept(socket_,[this](boost::system::error_code ec) { if (!ec) std::make_shared<chat_session>(std::move(socket_), room_)->start(); do_accept(); }); } tcp::acceptor acceptor_; tcp::socket socket_; chat_room room_; }; int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io; chat_server s(io, tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 13)); io.run(); } catch (std::exception & e) { std::cerr << "Exception: " << e.what() << std::endl; } return 0; }
服務端做異步監聽,當有客戶端到來,把這個客戶端(session) 放到聊天室對象里,當這個客戶端斷開時,從聊天室客戶端列表里刪除。
這個聊天室實現了一個廣播功能,當客戶端發送消息至服務器時,服務器給所有客戶端廣播這條消息,並且聊天室記錄最近客戶端發送到服務器的消息,當客戶端連接到服務器時,服務器主動把最近消息記錄發送給這個客戶端。
這里要注意到一點的是,他的發送是類似消息驅動的形式,就是用一個對象保存要發送的消息,當發送成功回調OnSend里發現有未發完的消息時,再駢PostSend。而不是主動發送。我暫時不知道這種做法的意圖。但是可以注意到的一點是這種發送是依次的,也就是PostSend順序是這樣的 PostSend OnSend PostSend OnSend,而我們經常的做法則是PostSend PostSend OnSend OnSend。這個好處不言而喻。提供了一種緩存機制。
12.Chat_Client詳解
先上代碼
// CC_ChatClient.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include <cstdlib> #include <deque> #include <iostream> #include <thread> #include <boost/asio.hpp> #include "..\CA_ChatMessage\chat_message.h" using boost::asio::ip::tcp; typedef std::deque<chat_message> chat_message_queue; class chat_client { public: chat_client(boost::asio::io_service & io, tcp::resolver::iterator endpoint_iterator) : io_service_(io), socket_(io) { do_connect(endpoint_iterator); } //這里的做法與平時做法不太一樣, //平時我們一般是write一條就去do_write一次, //而這里是write一次把內容加到write_msgs里面 //當目前沒有正在post正在執行時去do_write一下,否則把do_write 的操作放到on_write里面進行 void write(const chat_message& msg) { io_service_.post( [this, msg]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { do_write(); } }); } private: void do_connect(tcp::resolver::iterator endpoint_iterator) { boost::asio::async_connect(socket_, endpoint_iterator, [this](boost::system::error_code ec, tcp::resolver::iterator) { if (!ec) do_read_header(); else std::cerr << "connect failed:" << ec.message() << std::endl; }); } void do_read_header() { boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.data(),chat_message::header_length), [this](boost::system::error_code ec, std::size_t) { if (!ec && read_msg_.decode_header()) do_read_body(); else socket_.close(); }); } void do_read_body() { boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this](boost::system::error_code ec, std::size_t) { if (!ec) { std::cout.write(read_msg_.body(), read_msg_.body_length()); std::cout << "\n"; do_read_header(); } else socket_.close(); }); } void do_write() { boost::asio::async_write(socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this](boost::system::error_code ec, std::size_t) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) do_write(); } else socket_.close(); }); } boost::asio::io_service & io_service_; tcp::socket socket_; chat_message read_msg_; chat_message_queue write_msgs_; }; int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io; tcp::resolver resolver(io); auto end_point_iter = resolver.resolve({ "127.0.0.1", "13" }); chat_client c(io, end_point_iter); std::thread t([&io](){io.run(); }); char line[chat_message::max_body_length + 1] = { 0 }; while (std::cin.getline(line, chat_message::max_body_length + 1)) { chat_message msg; msg.body_length(std::strlen(line)); std::memcpy(msg.body(), line, msg.body_length()); msg.encode_header(); c.write(msg); } } catch (std::exception & e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }
這個客戶端沒有什么特點,最大的特別就是我上節在服務端說到的,消息回調Post機制。
13.echo
echo都是非常簡單的socket示例,暫時不做熬述
14.Futures
// EA_Futures.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include <array> #include <future> #include <iostream> #include <thread> #include <boost/asio/io_service.hpp> #include <boost/asio/ip/udp.hpp> #include <boost/asio/use_future.hpp> using boost::asio::ip::udp; void get_daytime(boost::asio::io_service & io, const char * host_name) { try { udp::resolver resv(io); std::future<udp::resolver::iterator> iter = resv.async_resolve({ udp::v4(), host_name, "daytime" }, boost::asio::use_future); udp::socket sock(io, udp::v4()); std::array<char, 1> send_buf = { { 0 } }; std::future < std::size_t> send_length = sock.async_send_to(boost::asio::buffer(send_buf), *iter.get(), boost::asio::use_future); send_length.get();//阻塞,直到發送完成 std::array<char, 128> recv_buf; udp::endpoint sender_endpoint; std::future<std::size_t> recv_length = sock.async_receive_from(boost::asio::buffer(recv_buf), sender_endpoint, boost::asio::use_future); //當接收完成去做其它事 std::cout.write(recv_buf.data(), recv_length.get()); } catch (std::exception &e) { std::cerr << e.what() << std::endl; } } int _tmain(int argc, _TCHAR* argv[]) { try { boost::asio::io_service io; boost::asio::io_service::work work(io); std::thread t([&io](){io.run(); }); get_daytime(io, "127.0.0.1"); io.stop(); t.join(); } catch (std::exception & e) { std::cerr << e.what() << std::endl; } return 0; }
知識點:
- io_service::work 這是一個很小的輔助類,只支持構造函數和析構函數。構造一個 work時,outstanding_work_+1,使得io.run在完成異步消息之后判斷outstanding_work_時不為0,因而會使io.run()不至於返回。通俗的講它就是讓io.run一直運行不退出,只到work析構。
- std::future 他是獲取異步執行函數的返回值的,相當於你創建了一個線程線程在計算某個結果,你要得到這個結果時,你得同步一下,還要看一下,結果算完了沒有。future就是做這件事的。關於這個std::future我會另外開一篇文章寫一下。這里有一篇文件詳細介紹一下這個std::future干了什么http://blog.csdn.net/wangshubo1989/article/details/49872199
- io.stop 這個函數是告訴io_service要停止 。
18.HttpServer
本例用boost asio 寫了一個簡易http服務器,與前面的相比新的知識點不多。
下面提供源碼下載: