https://segmentfault.com/a/1190000007225464
環境: Boost v1.66, VS 2013 & 2015
說明:
這篇教程形成於 Boost v1.62 時代,最近(2018/01)針對 v1.66 做了一次大的更新。
此外,在代碼風格上,C++11 用得更多了。
概述
近期學習 Boost Asio,依葫蘆畫瓢,寫了不少例子,對這個「輕量級」的網絡庫算是有了一定理解。但是秉着理論與實踐結合的態度,決定寫一篇教程,把腦子里一知半解的東西,試圖說清楚。
Asio,即「異步 IO」(Asynchronous Input/Output),本是一個 獨立的 C++ 網絡程序庫,似乎並不為人所知,后來因為被 Boost 相中,才聲名鵲起。
從設計上來看,Asio 相似且重度依賴於 Boost,與 thread、bind、smart pointers 等結合時,體驗順滑。從使用上來看,依然是重組合而輕繼承,一貫的 C++ 標准庫風格。
什么是「異步 IO」?
簡單來說,就是你發起一個 IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。
當然這種表述是不精確的,操作系統並沒有直接提供這樣的機制。以 Unix 為例,有五種 IO 模型可用:
- 阻塞 I/O
- 非阻塞 I/O
- I/O 多路復用(multiplexing)(
select
和poll
) - 信號驅動 I/O(
SIGIO
) - 異步 I/O(POSIX
aio_
系列函數)
這五種模型的定義和比較,詳見「Unix Network Programming, Volume 1: The Sockets Networking API」一書 6.2 節,或者可參考 這篇筆記。
Asio 封裝的正是「I/O 多路復用」。具體一點,epoll
之於 Linux,kqueue
之於 Mac 和 BSD。epoll
和 kqueue
比 select
和 poll
更高效。當然在 Windows 上封裝的則是 IOCP(完成端口)。
Asio 的「I/O 操作」,主要還是指「網絡 IO」,比如 socket 讀寫。由於網絡傳輸的特性,「網絡 IO」相對比較費時,設計良好的服務器,不可能同步等待一個 IO 操作的結束,這太浪費 CPU 了。
對於普通的「文件 IO」,操作系統並沒有提供“異步”讀寫機制,libuv 的做法是用線程模擬異步,為網絡和文件提供了一致的接口。Asio 並沒有這樣做,它專注於網絡。提供機制而不是策略,這很符合 C++ 哲學。
下面以示例,由淺到深,由簡單到復雜,逐一介紹 Asio 的用法。
簡單起見,頭文件一律省略。
I/O Context
每個 Asio 程序都至少有一個 io_context
對象,它代表了操作系統的 I/O 服務(io_context
在 Boost 1.66 之前一直叫 io_service
),把你的程序和這些服務鏈接起來。
下面這個程序空有 io_context
對象,卻沒有任何異步操作,所以它其實什么也沒做,也沒有任何輸出。
int main() { boost::asio::io_context ioc; ioc.run(); return 0; }
io_context.run
是一個阻塞(blocking)調用,姑且把它想象成一個 loop(事件循環),直到所有異步操作完成后,loop 才結束,run
才返回。但是這個程序沒有任何異步操作,所以 loop 直接就結束了。
Timer
有了 io_context
還不足以完成 I/O 操作,用戶一般也不跟 io_context
直接交互。
根據 I/O 操作的不同,Asio 提供了不同的 I/O 對象,比如 timer(定時器),socket,等等。
Timer 是最簡單的一種 I/O 對象,可以用來實現異步調用的超時機制,下面是最簡單的用法:
void Print(boost::system::error_code ec) { std::cout << "Hello, world!" << std::endl; } int main() { boost::asio::io_context ioc; boost::asio::deadline_timer timer(ioc, boost::posix_time::seconds(3)); timer.async_wait(&Print); ioc.run(); return 0; }
先創建一個 deadline_timer
,指定時間 3 秒,然后異步等待這個 timer,3 秒后,timer 超時結束,Print
被調用。
以下幾點需要注意:
- 所有 I/O 對象都依賴
io_context
,一般在構造時指定。 async_wait
初始化了一個異步操作,但是這個異步操作的執行,要等到io_context.run
時才開始。- Timer 除了異步等待(
async_wait
),還可以同步等待(wait
)。同步等待是阻塞的,直到 timer 超時結束。基本上所有 I/O 對象的操作都有同步和異步兩個版本,也許是出於設計上的完整性。 async_wait
的參數是一個函數對象,異步操作完成時它會被調用,所以也叫 completion handler,簡稱 handler,可以理解成回調函數。- 所有 I/O 對象的
async_xyz
函數都有 handler 參數,對於 handler 的簽名,不同的異步操作有不同的要求,除了官方文檔里的說明,也可以直接查看 Boost 源碼。
async_wait
的 handler 簽名為 void (boost::system::error_code)
,如果要傳遞額外的參數,就得用 bind
。不妨修改一下 Print
,讓它每隔一秒打印一次計數,從 0
遞增到 3
。
void Print(boost::system::error_code ec, boost::asio::deadline_timer* timer, int* count) { if (*count < 3) { std::cout << *count << std::endl; ++(*count); timer->expires_at(timer->expires_at() + boost::posix_time::seconds(1)); timer->async_wait(std::bind(&Print, std::placeholders::_1, timer, count)); } }
與前版相比,Print
多了兩個參數,以便訪問當前計數及重啟 timer。
int main() { boost::asio::io_context ioc; boost::asio::deadline_timer timer(ioc, boost::posix_time::seconds(1)); int count = 0; timer.async_wait(std::bind(&Print, std::placeholders::_1, &timer, &count)); ioc.run(); return 0; }
調用 bind
時,使用了占位符(placeholder)std::placeholders::_1
。數字占位符共有 9 個,_1
- _9
。占位符也有很多種寫法,這里就不詳述了。
Echo Server
Socket 也是一種 I/O 對象,這一點前面已經提及。相比於 timer,socket 更為常用,畢竟 Asio 是一個網絡程序庫。
下面以經典的 Echo 程序為例,實現一個 TCP Server。所謂 Echo,就是 Server 把 Client 發來的內容原封不動發回給 Client。
先從同步方式開始,異步太復雜,慢慢來。
同步方式
Session
代表會話,負責管理一個 client 的連接。參數 socket
傳的是值,但是會用到 move 語義來避免拷貝。
void Session(tcp::socket socket) { try { while (true) { boost::array<char, BUF_SIZE> data; boost::system::error_code ec; std::size_t length = socket.read_some(boost::asio::buffer(data), ec); //成員函數 if (ec == boost::asio::error::eof) { std::cout << "連接被 client 妥善的關閉了" << std::endl; break; } else if (ec) { // 其他錯誤 throw boost::system::system_error(ec); } boost::asio::write(socket, boost::asio::buffer(data, length)); //自由函數 } } catch (const std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } }
其中,tcp
即 boost::asio::ip::tcp
;BUF_SIZE
定義為 enum { BUF_SIZE = 1024 };
。這些都是細節,后面的例子不再贅述。
int main(int argc, char* argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " <port>" << std::endl; return 1; } unsigned short port = std::atoi(argv[1]); boost::asio::io_context ioc; // 創建 Acceptor 偵聽新的連接 tcp::acceptor acceptor(ioc, tcp::endpoint(tcp::v4(), port)); try { // 一次處理一個連接 while (true) { Session(acceptor.accept()); } } catch (const std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } return 0; }
啟動時,通過命令行參數指定端口號,比如:
$ echo_server_sync 8080
因為 Client 部分還未實現,先用 netcat
測試一下:
$ nc localhost 8080 hello hello
以下幾點需要注意:
tcp::acceptor
也是一種 I/O 對象,用來接收 TCP 連接,連接端口由tcp::endpoint
指定。- 數據 buffer 以
boost::array<char, BUF_SIZE>
表示,也可以用char data[BUF_SIZE]
,或std::vector<char> data(BUF_SIZE)
。事實上,用std::vector
是最推薦的,因為它不但可以動態調整大小,還支持 Buffer Debugging。 - 同步方式下,沒有調用
io_context.run
,因為accept
、read_some
和write
都是阻塞的。這也意味着一次只能處理一個 Client 連接,但是可以連續 echo,除非 Client 斷開連接。 - 寫回數據時,沒有直接調用
socket.write_some
,因為它不能保證一次寫完所有數據,但是boost::asio::write
可以。我覺得這是 Asio 接口設計不周,應該提供socket.write
。 acceptor.accept
返回一個新的 socket 對象,利用 move 語義,直接就轉移給了Session
的參數,期間並沒有拷貝開銷。
異步方式
異步方式下,困難在於對象的生命周期,可以用 shared_ptr
解決。
為了同時處理多個 Client 連接,需要保留每個連接的 socket 對象,於是抽象出一個表示連接會話的類,叫 Session
:
class Session : public std::enable_shared_from_this<Session> { public: Session(tcp::socket socket) : socket_(std::move(socket)) { } void Start() { DoRead(); } void DoRead() { auto self(shared_from_this()); socket_.async_read_some( boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t length) { if (!ec) { DoWrite(length); } }); } void DoWrite(std::size_t length) { auto self(shared_from_this()); boost::asio::async_write( socket_, boost::asio::buffer(buffer_, length), [this, self](boost::system::error_code ec, std::size_t length) { //捕獲列表:this表示函數體內可以使用Lambda所在類中的成員變量; self: 匿名函數內; 參數列表必須符合async_write的handler簽名 if (!ec) { DoRead(); } }); } private: tcp::socket socket_; std::array<char, BUF_SIZE> buffer_; };
就代碼風格來說,有以下幾點需要注意:
- 優先使用 STL,比如
std::enable_shared_from_this
,std::bind
,std::array
,等等。 - 定義 handler 時,盡量使用匿名函數(lambda 表達式)。
- 以 C++
std::size_t
替 Csize_t
。
剛開始,你可能會不習慣,我也是這樣,過了好久才慢慢擁抱 C++11 乃至 C++14。
Session
有兩個成員變量,socket_
與 Client 通信,buffer_
是接收 Client 數據的緩存。只要 Session
對象在,socket 就在,連接就不斷。Socket 對象是構造時傳進來的,而且是通過 move 語義轉移進來的。
雖然還沒看到 Session
對象是如何創建的,但可以肯定的是,它必須用 std::shared_ptr
進行封裝,這樣才能保證異步模式下對象的生命周期。
此外,在 Session::DoRead
和 Session::DoWrite
中,因為讀寫都是異步的,同樣為了防止當前 Session
不被銷毀(因為超出作用域),所以要增加它的引用計數,即 auto self(shared_from_this());
這一句的作用。
至於讀寫的邏輯,基本上就是把 read_some
換成 async_read_some
,把 write
換成 async_write
,然后以匿名函數作為 completion handler。
接收 Client 連接的代碼,提取出來,抽象成一個類 Server
:
class Server { public: Server(boost::asio::io_context& ioc, std::uint16_t port) : acceptor_(ioc, tcp::endpoint(tcp::v4(), port)) { DoAccept(); } private: void DoAccept() { acceptor_.async_accept( [this](boost::system::error_code ec, tcp::socket socket) { if (!ec) { std::make_shared<Session>(std::move(socket))->Start(); } DoAccept(); }); } private: tcp::acceptor acceptor_; };
同樣,async_accept
替換了 accept
。async_accept
不再阻塞,DoAccept
即刻就會返回。
為了保證 Session
對象繼續存在,使用 std::shared_ptr
代替普通的棧對象,同時把新接收的 socket 對象轉移過去。
最后是 main()
:
int main(int argc, char* argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " <port>" << std::endl; return 1; } std::uint16_t port = std::atoi(argv[1]); boost::asio::io_context ioc; Server server(ioc, port); ioc.run(); return 0; }
Echo Client
雖然用 netcat
測試 Echo Server 非常方便,但是自己動手寫一個 Echo Client 仍然十分必要。
還是先考慮同步方式。
同步方式
首先通過 host
和 port
解析出 endpoints(對,是復數!):
tcp::resolver resolver(ioc); auto endpoints = resolver.resolve(tcp::v4(), host, port);
resolve
返回的 endpoints 類型為 tcp::resolver::results_type
,代之以 auto
可以簡化代碼。類型推導應適當使用,至於連 int
都用 auto
就沒有必要了。host
和 port
通過命令行參數指定,比如 localhost
和 8080
。
接着創建 socket,建立連接:
tcp::socket socket(ioc); boost::asio::connect(socket, endpoints);
這里沒有直接調用 socket.connect
,因為 endpoints
可能會有多個,boost::asio::connect
會挨個嘗試,逐一調用 socket.connect
直到連接成功。
其實這樣說不太嚴謹,根據我的測試,resolve
在沒有指定 protocol 時,確實會返回多個 endpoints,一個是 IPv6,一個是 IPv4。但是我們已經指定了 protocol 為 tcp::v4()
:
resolver.resolve(tcp::v4(), host, port)
所以,應該只有一個 endpoint。
接下來,從標准輸入(std::cin
)讀一行數據,然后通過 boost::asio::write
發送給 Server:
char request[BUF_SIZE]; std::size_t request_length = 0; do { std::cout << "Enter message: "; std::cin.getline(request, BUF_SIZE); request_length = std::strlen(request); } while (request_length == 0); boost::asio::write(socket, boost::asio::buffer(request, request_length));
do...while
是為了防止用戶直接 Enter 導致輸入為空。boost::asio::write
是阻塞調用,發送完才返回。
從 Server 同步接收數據有兩種方式:
- 使用
boost::asio::read
(對應於boost::asio::write
); - 使用
socket.read_some
。
兩者的差別是,boost::asio::read
讀到指定長度時,就會返回,你需要知道你想讀多少;而 socket.read_some
一旦讀到一些數據就會返回,所以必須放在循環里,然后手動判斷是否已經讀到想要的長度,否則無法退出循環。
下面分別是兩種實現的代碼。
使用 boost::asio::read
:
char reply[BUF_SIZE]; std::size_t reply_length = boost::asio::read( socket, boost::asio::buffer(reply, request_length)); std::cout.write(reply, reply_length);
使用 socket.read_some
:
std::size_t total_reply_length = 0; while (true) { std::array<char, BUF_SIZE> reply; std::size_t reply_length = socket.read_some(boost::asio::buffer(reply)); std::cout.write(reply.data(), reply_length); total_reply_length += reply_length; if (total_reply_length >= request_length) { break; } }
不難看出,socket.read_some
用起來更為復雜。
Echo 程序的特殊之處就是,你可以假定 Server 會原封不動的把請求發回來,所以你知道 Client 要讀多少。
但是很多時候,我們不知道要讀多少數據。
所以,socket.read_some
反倒更為實用。
此外,在這個例子中,我們沒有為各函數指定輸出參數 boost::system::error_code
,而是使用了異常,把整個代碼塊放在 try...catch
中。
try { // ... } catch (const std::exception& e) { std::cerr << e.what() << std::endl; }
Asio 的 API 基本都通過重載(overload),提供了 error_code
和 exception
兩種錯誤處理方式。使用異常更易於錯誤處理,也可以簡化代碼,但是 try...catch
該包含多少代碼,並不是那么明顯,新手很容易誤用,什么都往 try...catch
里放。
一般來說,異步方式下,使用 error_code
更方便一些。所以 complete handler 的參數都有 error_code
。
異步方式
就 Client 來說,異步也許並非必要,除非想同時連接多個 Server。
異步讀寫前面已經涉及,我們就先看 async_resolve
和 async_connect
。
首先,抽取出一個類 Client
:
class Client { public: Client(boost::asio::io_context& ioc, const std::string& host, const std::string& port) : socket_(ioc), resolver_(ioc) { } private: tcp::socket socket_; tcp::resolver resolver_; char cin_buf_[BUF_SIZE]; std::array<char, BUF_SIZE> buf_; };
resolver_
是為了 async_resolve
,作為成員變量,生命周期便得到了保證,不會因為函數結束而失效。
下面來看 async_resolve
實現(代碼在構造函數中):
Client(...) {
resolver_.async_resolve(tcp::v4(), host, port,
std::bind(&Client::OnResolve, this, std::placeholders::_1, std::placeholders::_2)); }
async_resolve
的 handler:
void OnResolve(boost::system::error_code ec, tcp::resolver::results_type endpoints) { if (ec) { std::cerr << "Resolve: " << ec.message() << std::endl; } else { boost::asio::async_connect(socket_, endpoints, std::bind(&Client::OnConnect, this, std::placeholders::_1, std::placeholders::_2)); } }
async_connect
的 handler:
void OnConnect(boost::system::error_code ec, tcp::endpoint endpoint) { if (ec) { std::cout << "Connect failed: " << ec.message() << std::endl; socket_.close(); } else { DoWrite(); } }
連接成功后,調用 DoWrite
,從標准輸入讀取一行數據,然后異步發送給 Server。
下面是異步讀寫相關的函數,一並給出:
void DoWrite() { std::size_t len = 0; do { std::cout << "Enter message: "; std::cin.getline(cin_buf_, BUF_SIZE); len = strlen(cin_buf_); } while (len == 0); boost::asio::async_write(socket_, boost::asio::buffer(cin_buf_, len), std::bind(&Client::OnWrite, this, std::placeholders::_1)); } void OnWrite(boost::system::error_code ec) { if (!ec) { std::cout << "Reply is: "; socket_.async_read_some(boost::asio::buffer(buf_), std::bind(&Client::OnRead, this, std::placeholders::_1, std::placeholders::_2)); } } void OnRead(boost::system::error_code ec, std::size_t length) { if (!ec) { std::cout.write(buf_.data(), length); std::cout << std::endl; // 如果想繼續下一輪,可以在這里調用 DoWrite()。 } }
異步讀寫在異步 Server 那一節已經介紹過,這里就不再贅述了。
最后是 main()
:
int main(int argc, char* argv[]) { if (argc != 3) { std::cerr << "Usage: " << argv[0] << " <host> <port>" << std::endl; return 1; } const char* host = argv[1]; const char* port = argv[2]; boost::asio::io_context ioc; Client client(ioc, host, port); ioc.run(); return 0; }
至此,異步方式的 Echo Client 就算實現了。
為了避免文章太長,Asio 的介紹暫時先告一段落。若有補遺,會另行記錄。
完整及更加豐富的示例代碼,請移步 GitHub。