Boost - 從Coroutine2 到Fiber
協程引子
我開始一直搞不懂協程是什么,網上搜一搜,(尤其是Golang的goroutine)感覺從概念上聽起來有點像線程池,尤其是類似Java的ExcutorService類似的東西
package helloworld;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallMe {
static class Call implements Callable<String>{
@Override
public String call() throws Exception {
Date d = Calendar.getInstance().getTime();
return d.toString();
}
}
public static void main(String[] args) throws Exception{
ExecutorService pool = Executors.newSingleThreadExecutor();
Future<String> str = pool.submit(new Call());
pool.shutdown();
String ret = str.get();
System.out.println(ret);
}
}
package main
import (
"fmt"
"time"
)
func CallMe(pipe chan string) {
t := time.Now()
pipe <- t.String()
}
func main() {
pipe := make(chan string, 1)
defer close(pipe)
go CallMe(pipe)
select {
case v, ok := <-pipe:
if !ok {
fmt.Println("Read Error")
}
fmt.Println(v)
}
}
是的,協程除了它要解決的問題上,其他可以說就是線程。
那么協程要解決什么問題呢?
這要從協程為什么火起來說起。線程池很好,但線程是由操作系統調度的,並且線程切換代價太大,往往需要耗費數千個CPU周期。
在同步阻塞的編程模式下,
當並發量很大、IO密集時,往往一個任務剛進入線程池就阻塞在IO,就可能(因為線程切換是不可控的)需要切換線程,這時線程切換的代價就不可以忽視了。
后來人們發現異步非阻塞的模型能解決這個問題,當被IO阻塞時,直接調用非阻塞接口,注冊一個回調函數,當前線程繼續進行,也就不用切換線程了。但理想很豐滿,現實很骨感,異步的回調各種嵌套讓程序員的人生更加悲慘。
於是協程應運重生。
協程就是由程序員控制跑在線程里的“微線程”。它可以由程序員調度,切換協程時代價小(切換根據實現不同,消耗的CPU周期從幾十到幾百不等),創建時耗費資源小。十分適用IO密集的場景。
Boost::Coroutine2
boost的Coroutine2不同於Goroutine,golang的協程調度是由Go語言完成,而boost::coroutine2的協程需要自己去調度。
#include <boost\coroutine2\all.hpp>
#include <cstdlib>
#include <iostream>
using namespace boost;
using namespace std;
class X {
public:
X() {
cout << "X()\n";
}
~X() {
cout << "~X()\n";
system("pause");
}
};
void foo(boost::coroutines2::coroutine<void>::pull_type& pull) {
X x;
cout << "a\n";
pull();
cout << "b\n";
pull();
cout << "c\n";
}
int main() {
coroutines2::coroutine<void>::push_type push(foo);
cout << "1\n";
push();
cout << "2\n";
push();
cout << "3\n";
push();
return 0;
}
調用push_type和pull_type的operator()就會讓出當前執行流程給對應的協程。push_type可以給pull_type傳遞參數,而pull_type通過調用get來獲取。
你也可以寫成這樣
boost::coroutines2::coroutine<void>::pull_type pull([](coroutine<void>::push_type &push){...})
它和上面的區別是,新建的pull_type會立即進入綁定的函數中(哪里可以調用push(),哪個協程先執行)
那如果在main結束之前,foo里沒有執行完,那foo里的X會析構嗎?
Boost文檔里說會的,這個叫做Stack unwinding
。
我們不妨把main函數里最后一個push();
去掉,這樣后面就不會切換到foo的context了。會發現雖然foo中的"c"沒有輸出,但X還是析構了的。
Fiber
在實際生產中,我們更適合用fiber來解決問題。fiber有調度器,使用簡單,不需要手動控制執行流程。
#include <boost\fiber\all.hpp>
#include <chrono>
#include <string>
#include <ctime>
#include <iostream>
#include <cstdlib>
using namespace std;
using namespace boost;
void callMe(fibers::buffered_channel<string>& pipe) {
std::time_t result = std::time(nullptr);
string timestr = std::asctime(std::localtime(&result));
pipe.push(timestr);
}
int main() {
fibers::buffered_channel<string> pipe(2);
fibers::fiber f([&]() {callMe(pipe); });
f.detach();
string str;
pipe.pop(str);
cout << str << "\n";
system("pause");
return 0;
}
boost::fibers是一個擁有調度器的協程。看上去fiber已經和goroutine完全一樣了。在fiber里不能調用任何阻塞線程的接口,因為一旦當前fiber被阻塞,那意味着當前線程的所有fiber都被阻塞了。因此所有跟協程相關的阻塞接口都需要自己實現一套協程的包裝,比如this_fiber::sleep_for()
。這也意味着數據庫之類的操作沒辦法被fiber中直接使用。但好在fiber提供了一系列方法去解決這個問題。
使用非阻塞IO
int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) {
int error;
while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) {
boost::this_fiber::yield();
}
return error;
}
主要思想就是,當前fiber調用非阻塞api輪詢,一旦發現該接口會阻塞,就調用boost::this_fiber::yield()
讓出執行權限給其他協程,知道下次獲得執行權限,再次查看是否阻塞。
異步IO
std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) {
typedef std::pair< AsyncAPI::errorcode, std::string > result_pair;
boost::fibers::promise< result_pair > promise;
boost::fibers::future< result_pair > future( promise.get_future() );
// We promise that both 'promise' and 'future' will survive until our lambda has been called.
// Need C++14
api.init_read([promise=std::move( promise)]( AsyncAPI::errorcode ec, std::string const& data) mutable {
promise.set_value( result_pair( ec, data) );
});
return future.get();
}
這種實現方法主要是利用了異步IO不會阻塞當前fiber,在異步的回調中給fibers::promise
設值。當異步操作未返回時,如果依賴到異步的結果,在調用future.get()
時就會讓出執行權限給其他協程。
同步IO
同步IO不可以直接應用到fiber中,因為會阻塞當前線程而導致線程所有的fiber都阻塞。
如果一個接口只有同步模式,比如官方的Mysql Connector,那我們只能先利用多線程模擬一個異步接口,然后再把它當做異步IO去處理。
如何用多線程把同步接口包裝成異步接口呢?
如下,包裝好后就可以利用上面異步IO的方法再包裝一個fiber可以使用的IO接口。
#include <boost/asio.hpp>
#include <boost/fiber/all.hpp>
#include <string>
#include <thread>
#include <cstdlib>
#include <stdio.h>
using namespace std;
using namespace boost;
class AsyncWrapper
{
public:
void async_read(const string fileName, function<void (const string &)> callback) {
auto fun = [=]() {
FILE* fp = fopen(fileName.c_str(), "r");
char buff[1024];
string tmp;
while (nullptr != fgets(buff, 1024, fp)) {
tmp += buff;
}
fclose(fp);
callback(tmp);
};
asio::post(pool, fun);
}
static void wait() {
pool.join();
}
protected:
static asio::thread_pool pool;
};
asio::thread_pool AsyncWrapper::pool(2);
int main()
{
AsyncWrapper wrap;
string file = "./temp.txt";
wrap.async_read(file, [](const string& result) {printf("%s\n", result.c_str()); });
AsyncWrapper::wait();
std::system("pause");
return 0;
}
golang方便在哪
golang的好處就在於它已經幫你完成了上述的封裝過程,它把所有的IO操作都封裝成了阻塞同步調用模式,無非也是通過上面兩種方法。這樣程序員調用的時候 感覺自己在寫同步的代碼,但卻能享受異步/非阻塞帶來的好處。
例如
package main
import (
"log"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func blockSleep() {
log.Printf("blockSleep Before bock----------")
time.Sleep(1 * time.Second)
log.Printf("blockSleep After bock----------")
wg.Done()
}
func writeFile() {
log.Printf("writeFile Before bock+++++++++++")
f, _ := os.Create("./temp.txt")
defer f.Close()
log.Printf("writeFile Before Write+++++++++++")
f.WriteString("Hello World")
log.Printf("writeFile After bock+++++++++++")
wg.Done()
}
func main() {
go writeFile()
wg.Add(1)
for i := 1; i < 5; i++ {
go blockSleep()
wg.Add(1)
}
wg.Wait()
}
這段代碼time.Sleep
和os.Create
都會造成當前協程讓出CPU。其輸出如下
2018/05/31 13:53:19 blockSleep Before bock----------
2018/05/31 13:53:19 blockSleep Before bock----------
2018/05/31 13:53:19 writeFile Before bock+++++++++++
2018/05/31 13:53:19 blockSleep Before bock----------
2018/05/31 13:53:19 writeFile Before Write+++++++++++
2018/05/31 13:53:19 writeFile After bock+++++++++++
2018/05/31 13:53:19 blockSleep Before bock----------
2018/05/31 13:53:20 blockSleep After bock----------
2018/05/31 13:53:20 blockSleep After bock----------
2018/05/31 13:53:20 blockSleep After bock----------
2018/05/31 13:53:20 blockSleep After bock----------