boost asio的協程


http://purecpp.org/?p=362

http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/coroutine.html

 

The coroutine class may be used to implement stackless coroutines. The class itself is used to store the current state of the coroutine.

 

1.asio的coroutine會被網絡IO阻塞,雖然使用者可以使用異步網絡io,但是第三方庫並不會,因此現有的使用同步網絡IO的第三方庫統統不能用
2.asio的coroutine只是簡單的網絡層面的協程,缺失很多feature,比如channel mutex等,僅僅是個協程模型的網絡庫. 這個corotuine卻是一個完整的協程模型的框架,特性完整.

 

https://taozj.org/2016/09/Boost-Asio%E4%B8%ADCoroutine%E5%8D%8F%E7%A8%8B%E4%B9%8B%E4%BD%BF%E7%94%A8/

由於C++原生支持多進程多線程,可以被操作系統直接調度,所以感覺對協程的支持沒有那么的急迫,不過現在網上搜到很多資料,說是建議要把協程推到標准庫中,可見協程還是蠻有用的。

從原理上看,協程保存了執行當前位置,后續可以切換回來,像是一個用戶態的線程,但是和一般的線程不同的是不是搶占式的(pre-emptive)切換,而是一種協作式的(cooperative)推拉;

對於用戶來說,可以類似用符合思維習慣的同步手法,寫出具有異步功能的高效代碼,而不用像傳統異步開發設置各種回調函數把代碼割離弄的支離破碎的;最后還是得意於協程比線程更加的輕量級,切換過程也不會陷入內核態,增加系統的運行效率。

 

同時最近發現了Tecent Phxteam開源出來的phxsql項目,里面就有協程相關的使用,可見協程是可以用在高性能需求的生產環境上的。

Boost庫中的協程支持兩種方式:一種是封裝了Boost.Coroutine的spawn,是一個stackful類型的協程;一種是asio作者寫出的stackless協程。下面就兩類分別羅列出相關特性。

 

一、stackless協程

  在C++中有函數對象的概念后,只要類提供operator()的接口,那么類對象就可以當作函數調用,同時類的其他成員可以保存相關的狀態信息。

其實stackless就是通過class coroutine這個類本身來實現當前協程的狀態保存的,其實其內部就是用的一個int來保留下次resume的行號的,同時提供is_child()、is_parent()、is_complete()三個函數來輔助控制協程的行為。
  要支持協程的函數類必須是可拷貝構造和賦值構造的,其既可以作為實現類的基類派生,也可以作為實現類的一個成員變量,甚至是lambda、bind的參數。其定義了幾個C++標准之外的偽關鍵字方便使用,通過包含就可以使用。
(1) reenter

1
2
reenter( this) {} //繼承形式
reenter(coro_) {} //成員變量形式

 

  當reenter被執行的時候,控制流會跳轉到最后yield或者fork的位置
  需要注意的是reenter宏是通過switch實現的,意味着當在協程體中使用局部變量的時候,當重入協程體時候不能忽略局部變量的定義。如果當前需要局部變量,那么用下面的方式使用符合的語句塊。
(2) yield statement
  常常用在異步操作的時候,比如

     yield socket_->async_read_some(buffer(*buffer_),  * this);   //異步handler是*this, *this是定義的operator()的類,而operator()函數開頭的reenter(this*)又保存了上次的執行行數,這就實現了重入

 

  其執行的邏輯為:

  1. yield保存了當前協程的狀態;
  2. 其表達式初始化了異步操作;
  3. 定義恢復點為statement后的一條語句;
  4. 控制流被轉移到了協程體的結尾

  當異步操作結束的時候,函數對象重新被喚醒,然后reenter使得執行流轉移到了恢復點。當然statement表達式也可以是復合表達式,比如:

yield{
  mutable_buffers_1 b = buffer(*buffer_);
  socket_->async_read_some(b, * this);
}

 

(3) yield return expression ;
  通常用於生成器的環境下使用,其return后面的值作為函數的返回值傳遞出來,比如

 

struct interleave : coroutine
{
  istream& is1;
  istream& is2;
  char operator()(char c)
  {
    reenter ( this) for (;;) {
      yield  return is1.get();
      yield  return is2.get();
    }
  }
};

 

  上面的例子會交替的從is1和is2中產生字符,其會使得return后面表達式的值被返回。
(4) yield ;
  用於顯式的控制執行的流程,通常在多個協程交替的運行完成協作工作。
(5) yield break ;
  主要用來終止協程的,yield首先設置協程的終止狀體,然后流程被轉移到了協程體的結尾。
  一旦終止,使用is_complete()就會返回true,同時協程不能夠被再次reenter了。當然不一定要yield break,當流程執行到了協程體結尾,這些協程也會自動terminate了。
  突然意識到為啥要break了,因為reenter本來就是用switch實現的嘛。
(6) fork statement
  可以創建多個協程的拷貝,常用的情況是在服務端,協程被fork出來用於處理客戶端的請求。父協程和子協程通過is_parent()、is_child()進行界定。


reenter ( this) {
  do{
    socket_.reset( new tcp::socket(io_service_));
    yield acceptor->async_accept(*socket_, * this);
    fork server(*this)();
  }  while (is_parent());
... client-specific handling follows ...
}

 

  其fork語句會創建當前協程的一個拷貝,然后可能會立即執行或者被后面再調度執行,或者使用io_service::post()調度執行。

二、stackful協程【Boost::】

  其實現使用的Boost.Context來進行上下文的切換。使用需要包含頭文件,位於名字空間boost::coroutines。
  其實現原理是每個協程都有自己的stack和control-block(boost::contexts::fcontext_t),在協程需要暫停的時候,當前協程的所有非易失的寄存器(包括ESP、EIP)都會被保存在control-block當中,而新激活的協程會從其相關的control-block中加載回復相關的寄存器信息,稱之為上下文切換,相關的上下文切換不需要系統特權。
  Boost.Context提供的協程包括兩類:非對稱型協程asymmetric_coroutine的和對稱型協程symmetric_coroutine,

前者協程知道喚醒自己的協程,當需要暫停的時候控制流轉換給那個特定的協程;

對稱協程中所有的協程都是相等的,協程可以把控制流給任何一個其它的協程。

所以對稱協程主要是表達並發編程中的多個獨立的執行單元,而非對稱協程常常用於對數據進行順序處理的過程。


  stackful協程可以從嵌套的stackframe中暫停執行,在恢復的時候可以在其暫停的地方繼續執行,而stackless協程只有頂層的函數(top-level routine)可以被暫停,所有頂層函數調用的函數都不允許被暫停,也就是不允許嵌套使用攜程
  stackful的協程可以被嵌套使用,但是要求協程是可移動(move)但不可拷貝(copy)的,因為其為RAII實現的,結束的時候資源會被自動清理釋放,但是拷貝會導致內部的狀態不可控。同時使用時候在context_switch切換到正在執行的相同協程的行為是未定義的。

2.1 Asymmetric coroutine

1
2
asymmetric_coroutine<>::pull_type
asymmetric_coroutine<>::push_type

  其提供了單向的數據傳輸操作,在數據傳輸的過程中伴隨着Context的切換。模板參數的類型決定了數據傳輸的類型,如果不需要傳遞數據只進行Context切換,可以使用void。
(1) pull_type
  從另外的一個context獲取數據,其構造函數的參數是一個cor-function函數對象,cor-function的參數是一個push_type的引用。初始化pull_type的時候,執行流被切換到了cor-function,並且synthesize一個push_type並將引用傳遞給協程函數。其同時還提供operator(),只進行context切換,不傳輸數據(即構造函數參數為空而不是模板類型指定的實參)。
  pull_type提供迭代器和std::begin()、std::end()重載,從而可以增量的切換context並進行數據的傳輸。pull_type的提供的成員函數get()可以從別的context中拉取數據,但是不會造成context切換,如需切換需要手動調用operator()。

1
2
3
4
5
6
7
8
9
10
11
12
13
boost::coroutines::asymmetric_coroutine< int>::pull_type source(
[&](boost::coroutines::asymmetric_coroutine< int>::push_type& sink){
int first=1, second=1;
sink(first); sink(second);
for(int i=0;i<8;++i){
int third=first+second;
first=second; second=third;
sink(third);
}
});
 
for(auto i:source)
std::cout << i << " ";

 

(2) push_type
  用於將數據傳輸到別的執行context,其構造函數接收的cor-function參數類型是pull_type類型的引用。在初始化push_type的時候,不同的是執行流沒有轉移到cor-function,而是先執行push_type::operator()去synthesize一個pull_type並將其引用傳遞給協程函數。其push_type::operator(T)成員函數用於推送數據給對應的context。
(3) coroutine-function
  通過pull_type::operator bool可以判斷協程是否還有效(即協程函數是否已經terminated),除非第一個模板參數是void,否則返回true的同時也意味着其還可以提供數據的。
  從pull-coroutine向main-context傳遞數據的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
boost::coroutines::asymmetric_coroutine< int>::pull_type source(
[&](boost::coroutines::asymmetric_coroutine< int>::push_type& sink){
sink( 1); // push {1} back to main-context
sink( 2); // push {2} back to main-context
});
 
while(source){ // test if pull-coroutine is valid
int ret=source.get(); // access data value
source(); // context-switch to coroutine-function
}
'''
 
  從main-context向push-coroutine傳遞數據的例子:
'''cpp
// constructor does NOT enter cor-function
boost::coroutines::asymmetric_coroutine< int>::push_type sink(
[&](boost::coroutines::asymmetric_coroutine< int>::pull_type& source){
for (int i:source) {
std::cout << i << " ";
}
});
 
std::vector<int> v{1,1,2,3,5,8,13,21,34,55};
for( int i:v) {
sink(i); // push {i} to coroutine-function
}

 

2.2 Symmetric coroutine

  其caller和callee的關系是不固定的,symmetric的協程可以把執行控制轉移給任意的symmetric協程,而不一定是自己的caller。

1
2
symmetric_coroutine<>::call_type
symmetric_coroutine<>::yield_type

 

(1) call_type
  call_type其構造函數是一個coroutine-function函數對象,協程函數接受一個yield_type的引用作為參數。實例化call_type不會將執行流傳遞到協程函數,其會先調用operator()去強制合成一個yield_type並將其引用傳遞給協程函數。
  call_type不提供get()成員函數,即不可以從其他的執行context中獲取數據。
(2) yield_type
  通過調用yield_type::operator()並使用其它call_type對象作為參數,可以把數據和執行流傳遞給其他的context。
  其模板參數規定了傳輸的數據類型,通過yield_type::get()可以訪問該數據。如果實例化模板使用void類型,那么可以只用作控制流傳遞,而不進行數據的傳遞。

1
2
3
4
5
6
7
8
9
10
boost::coroutines::symmetric_coroutine< int>::call_type coro(
[&](boost::coroutines::symmetric_coroutine< int>::yield_type& yield){
for (;;) {
std::cout << yield.get() << " ";
yield(); // jump back to starting context
}
});
 
coro( 1); // transfer {1} to coroutine-function
coro( 2); // transfer {2} to coroutine-function

 

2.3 spawn

  如果是這么寫程序,不是蛋疼,而是要蛋碎了。幸好Boost.Coroutine庫給了一個高度的封裝,其使用yield_context來保存協程的運行環境,然后允許程序以同步的方式執行各種異步函數操作,而這個yield_context對象是spawn函數自動產生的。當然要想知道其內部封裝的實現,還是需要另外花一份功夫的。

1
2
3
4
5
6
7
template<typename Function>
void spawn(boost::asio::io_service::strand strand, Function function);
template<typename Function>
void spawn(boost::asio::io_service & io_service, Function function);
 
// Function需要的簽名
void coroutine(boost::asio::yield_context yield);

 

  要求spawn的第二個參數Function的簽名必須是上面的類型,如果不符合,需要使用bind/lambda方式來包裝。
  此后,對於所有的異步操作,只需要把yield作為參數傳遞在原來需要callback的位置就可以了,當異步操作完成的時候,此處的程序會resume繼續執行。
對於異步操作的函數,其前面可能是

1
2
3
4
5
void handler(boost::system::error_code ec);
void handler(boost::system::error_code ec, result_type result);
// std::size_t length = my_socket.async_read_some( boost::asio::buffer(data), yield);
// boost::system::error_code ec;
// std::size_t length = my_socket.async_read_some( boost::asio::buffer(data), yield[ec]);

 

  對於有result_type的類型,其result的值已經作為參數返回了(比如上面的std::size_t),而如果出錯,下面的調用方法會直接拋出異常,如果想使用原先返回錯誤的方式而不是拋出system_error異常的方式,可以使用yield[ec]的方式調用,operator[]用於外部獲取發生的錯誤碼。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void do_echo(boost::asio::yield_context yield)
{
char data[128];
for (;;)
{
std::size_t length = my_socket.async_read_some(
boost::asio::buffer(data), yield);
 
boost::asio::async_write(my_socket,
boost::asio::buffer(data, length), yield);
}
}
 
boost::asio::spawn(my_strand, do_echo);

2.4 舉個栗子

  下面寫一個小例子,看看封裝后的協程寫異步程序是多么爽的一件事,至於為什么爽是因為同步編程才是符合人類的思維習慣的。以前設置異步讀取操作后,數據的處理都必須在回調函數中處理,現在可以直接在異步操作后接着處理啦!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <string>
#include <ctime>
#include <iostream>
#include <boost/enable_shared_from_this.hpp>
 
using namespace boost::asio;
using std::cerr; using std::endl;
 
io_service io_service_;
 
class session: public boost::enable_shared_from_this<session>
{
public:
explicit session(ip::tcp::socket socket):
sock_(std::move(socket)),
strand_(io_service_),
uuid_(std::rand())
{}
 
~session() { cerr << "~sessoin ->" << uuid_ << endl; }
 
void go()
{
auto self(shared_from_this());
boost::asio::spawn(strand_,
boost::bind(&session::do_echo, self, _1));
}
 
void do_echo(yield_context yield)
{
char data[128];
std::size_t n = sock_.async_read_some(boost::asio::buffer(data), yield);
cerr << "RECVED:【" << data << "】->" << uuid_ <<endl;
std::time_t now = std::time(nullptr);
std::string time_str = std::ctime(&now);
async_write(sock_, buffer(time_str), yield);
sock_.shutdown(ip::tcp::socket::shutdown_send);
}
 
private:
ip::tcp::socket sock_;
io_service::strand strand_;
std::size_t uuid_;
};
 
 
void start_accept(yield_context yield)
{
ip::tcp::acceptor acceptor(io_service_, ip::tcp::endpoint(ip::tcp::v4(), 2016));
 
for (;;) {
boost::system::error_code ec;
ip::tcp::socket socket(io_service_);
 
acceptor.async_accept(socket, yield[ec]);
if(!ec)
boost::make_shared<session>(std::move(socket))->go();
}
}
 
int main(int argc, char* argv[])
{
boost::asio::spawn(io_service_, start_accept);
io_service_.run();
}

 

編譯后就可以看出運行效果了:

1
➜ ~ g++ -std=c++11 test.cpp -lboost_system -lboost_coroutine -lboost_context -o test

 

  其實,感覺現實中協程更多的是對編程方式的改變,對控制流的操控可以用同步的結構寫出異步的效果,但是協程是用戶態的而不是原生的多線程,所以並不能並行執行提高並發率。但是協程能夠在各個協程間進行高效的切換,這一點可以做到比傳統依賴於異步調度的效率更高,這才體現出協作的本質吧!

參考文獻

boost::asio::coroutine 文檔翻譯 + 源碼解析
Coroutine
coroutine
Stackful Coroutines - spawn
Coroutines
協程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM