folly教程系列之:future/promise


     attension:本文嚴禁轉載。

一、前言

      promise/future是一個非常重要的異步編程模型,它可以讓我們擺脫傳統的回調陷阱,從而使用更加優雅、清晰的方式進行異步編程。c++11中已經開始支持std::future/std::promise,那么為什么folly還要提供自己的一套實現呢?原因是c++標准提供的future過於簡單,而folly的實現中最大的改進就是可以為future添加回調函數(比如then),這樣可以方便的鏈式調用,從而寫出更加優雅、間接的代碼,然后,改進還不僅僅如此。

二、入門實例

     讓我們先來看一個入門實例,代碼如下所示:

 1 #include <folly/futures/Future.h>
 2 using namespace folly;
 3 using namespace std;
 4 
 5 void foo(int x) {
 6   // do something with x
 7   cout << "foo(" << x << ")" << endl;
 8 }
 9 
10 // ...
11 
12   cout << "making Promise" << endl;
13   Promise<int> p;
14   Future<int> f = p.getFuture();
15   f.then(foo);
16   cout << "Future chain made" << endl;
17 
18 // ... now perhaps in another event callback
19 
20   cout << "fulfilling Promise" << endl;
21   p.setValue(42);
22   cout << "Promise fulfilled" << endl;

      代碼非常簡潔,首先定義一個Promise,然后從這個Promise獲取它相關聯的Future(通過getFuture接口),之后通過then為這個Future設置了一個回調函數foo,最后當為Promise賦值填充時(setValue),相關的Future就會變為ready狀態(或者是completed狀態),那么它相關的回調(這里為foo)會被執行。這段代碼的打印結果如下:

making Promise
Future chain made
fulfilling Promise
foo(42)
Promise fulfilled

三、基本概念

1、Promise

     如果你需要包裝一個異步操作、或者向用戶提供一個異步編程接口,那么你就可能會用到promise。每一個Future都有一個與之相關的Promise(除了使用makeFuture()產生的處於completed狀態的Future),Promise的使用是很簡單的:首先是創建Promise,然后從它“提取”出一個Future,最后在適當的時候向Promise填充一個值或者是異常。

     例如使用setValue填充一個值:

1 Promise<int> p;
2 Future<int> f = p.getFuture();
3 
4 f.isReady() == false
5 
6 p.setValue(42);
7 
8 f.isReady() == true
9 f.value() == 42

     下面一個例子是使用setException填充一個異常:

1 Promise<int> p;
2 Future<int> f = p.getFuture();
3 
4 f.isReady() == false
5 
6 p.setException(std::runtime_error("Fail"));
7 
8 f.isReady() == true
9 f.value() // throws the exception

     但是其實更優雅的使用Promise的方式是使用setWith方法,它接收一個函數而且可以自動捕獲函數拋出的異常,示例如下:

 1 Promise<int> p;
 2 p.setWith([]{
 3   try {
 4     // do stuff that may throw
 5     return 42;
 6   } catch (MySpecialException const& e) {
 7     // handle it
 8     return 7;
 9   }
10   // Any exceptions that we didn't catch, will be caught for us
11 });

      注意:通常來說,在基於Future的編程模型中,多數情況下應該都是單獨使用Future而不是Promise(調用返回Future的接口、為Future添加回調函數最終返回另一個Future),Promise在編寫底層的異步操作接口時會變得非常有用,比如:

 1 void fooOldFashioned(int arg, std::function<int(int)> callback);
 2 
 3 Future<int> foo(int arg) {
 4   auto promise = std::make_shared<Promise<int>>();
 5 
 6   fooOldFashioned(arg, [promise](int result) {
 7     promise->setValue(result);
 8   });
 9 
10   return promise->getFuture();
11 }

2、使用then方法為Future設置回調函數

      前面的例子中,我們都是使用Future的value方法獲取值的,除此之外,我們還可以使用回調的方式獲取值或者異常,也就是當Promise被填充時,與之相關的Future的回調函數就會被觸發執行,例如:

1 Promise<int> p;
2 Future<int> f = p.getFuture();
3 
4 f.then([](int i){
5   cout << i;
6 });
7 
8 p.setValue(42);

      注意:上面的例子中,設置回調的動作和填充Promise的動作之前沒有順序要求,也就是可以先填充Promise再使用then設置回調函數,如果是這樣的話,那么回調函數會被立刻執行。

      那么如何獲取一個異常呢?上面的例子中,lambda表達式的參數類型為int,這個顯然不能傳遞一個異常,此時你可以把你的回調函數的參數類型設置為Try,該類型既可以捕獲正常值又可以捕獲一個異常。例如:

1 f.then([](Try<int> const& t){
2   cout << t.value();
3 });

      注意:不推薦在回調函數中使用Try,回調函數中應該只用來捕獲值,對於異常的處理和捕獲,后文還會講到更好的方式。同時,當通過then設置回調函數時,這個回調函數的一個副本會被存儲在Future中直到它被執行,比如你傳遞了一個lambda表示式到then中,這個lambda表達式的captures中捕獲了一個shared_ptr,那么Future將會一直持有這個引用直到回調函數被執行。

      then方法的真正威力在於,它會返回一個新的Future,因此可以進行鏈式嵌套調用,比如:

1 Future<string> f2 = f.then([](int i){
2   return folly::to<string>(i);
3 });
4 
5 f2.then([](string s){ / ... / });

       這里,我們在回調函數中改變了Future的值類型(int變為string),因此為f2設置回調函數的參數類型自然就為string,其實我們更推薦以下寫法:

1 auto finalFuture = getSomeFuture()
2   .then(...)
3   .then(...)
4   .then(...);

      需要注意的是,上面的代碼仍然是同步的,這在組織、編排異步操作的時候是非常有用的。現在假設有一個遠程服務(service)負責將int轉為string,而你擁有一個返回Future的客戶端接口,那么事實上回調函數允許你可以返回一個Future<T>而不僅僅是一個T,例如:

1 Future<string> f2 = f.then([](int i){
2   return getClient()->future_intToString(i); // returns Future<string>
3 });
4 
5 f2.then([](Try<string> const& s){ ... });

      注意:通常情況下,回調函數都是以返回T的形式,除非必須返回Future<T>,這樣會使代碼變得簡潔。

3、Promise/Future的move語義

     Promise/Future都支持move語義、但是禁止拷貝的,這可以保證Promise和Future之間的一對一的關系。

4、同步的創建處於completed狀態的Future

1、可以通過makeFuture<T>()函數創建一個處於completed狀態的Future,該函數接收一個T&&類型參數(或者是一個異常類型)。如果T類型是需要被自動類型推斷的,那么你可以不用指定它。

2、獲取Future的T類型的value值可以通過Future<T>::get()方法,該方法是阻塞的,所以一定要確保該Future已經處於completed狀態或者是其他線程將設置該Future的completed狀態。當然,get()方法可以接受一個超時時間。

3、可以使用Future<T>::wait()進行同步的阻塞等待,這點和get()很像,唯一不同的是wait()不會提取Future內的值或者異常,wait會返回一個新的Futute,該Future持有input Future的結果。同樣,wait也可以設置一個超時時間。

4、getVia()和waitVia()類似於get()和wait(),不同之處在於,它們會在Future處於completed之前一直驅動執行一個Executor。

5、then的其它重載版本

     上面關於then的演示中可以看到回調函數的特點:

  • 返回值類型:Future<T> 或 T
  • 參數類型:T const& 或 Try<T> const& (也可能是 TTry<T>T&&, 和 Try<T>&&)

    then的靈活性不止於此,then其它重載版本還允許你綁定全局函數、成員函數和靜態成員函數,例如:

 1 void globalFunction(Try<int> const& t);
 2 
 3 struct Foo {
 4   void memberMethod(Try<int> const& t);
 5   static void staticMemberMethod(Try<int> const& t);
 6 };
 7 Foo foo;
 8 
 9 // bind global function
10 makeFuture<int>(1).then(globalFunction);
11 // bind member method
12 makeFuture<int>(2).then(&Foo::memberMethod, &foo);
13 // bind static member method
14 makeFuture<int>(3).then(&Foo::staticMemberMethod);

6、SharedPromise

      SharedPromise提供了和Promise相同的接口,唯一的不同在於SharedPromise的getFuture()方法可以被多次調用。當SharedPromise被填充時,所有的與之相關的Future都會被回調。在一個已經被填充的SharedPromise上調用getFuture()將返回一個處於completed狀態的Future。如果你發現你需要構造一個Promise集合並同時為他們填充相同的值,那么可以考慮使用SharedPromise。

四、錯誤處理

     眾所周知,try/catch機制在異步代碼中不再是那么通用,因此Future必須提供了一種自然、簡潔的錯誤處理能力。

1、拋異常

     有很多種方式可以給Future設置一個異常,比如makeFuture<T>() 和 Promise<T>::setException()可以創建一個 failed  Future,這些異常類型可以是

std::exception、folly::exception_wrapper、std::exception_ptr 其中的任何一種。例如:

1 makeFuture<int>(std::runtime_error("oh no!"));
2 makeFuture<int>(folly::make_exception_wrapper<std::runtime_error>("oh no!"));
3 makeFuture<int>(std::current_exception());
4 
5 Promise<int> p1, p2, p3;
6 p1.setException(std::runtime_error("oh no!"));
7 p2.setException(folly::make_exception_wrapper<std::runtime_error>("oh no!"));
8 p3.setException(std::current_exception());

     通常情況下,任何時候當你向Future方法傳遞一個返回Future的函數或者填充一個Promise,你可以放心的是,函數中拋出的任何異常都會被捕獲和存儲,比如:

1 auto f = makeFuture().then([]{
2   throw std::runtime_error("ugh");
3 });

      上面的代碼是完全正確的,異常會被捕獲並被存放在返回的結果Future中,類似的方法還有以下幾種:

  • Future<T>::then() 和它雖有的變體
  • Future<T>::onError(): 后文會提到
  • makeFutureTry(): 拿到一個函數並執行它,然后用這個函數的執行結果(或者異常)創建一個Future
  • Promise<T>::setWith(): 拿到一個函數並執行它,並用執行結果(或異常)來填充這個Promise

2、捕獲異常

     同樣,在Future編程模型中有很多種方式可以捕獲異常。

1)使用Try

      Try是一個抽象概念,既可以代表一個值又可以代表一個異常,所以很適合用在then的回調函數中,例如:

 1 makeFuture<int>(std::runtime_error("ugh")).then([](Try<int> t){
 2   try {
 3     auto i = t.value(); // will rethrow
 4     // handle success
 5   } catch (const std::exception& e) {
 6     // handle failure
 7   }
 8 });
 9 
10 // Try is also integrated with exception_wrapper
11 makeFuture<int>(std::runtime_error("ugh")).then([](Try<int> t){
12   if (t.hasException<std::exception>()) {
13     // this is enough if we only care whether the given exception is present
14   }
15 });
16 
17 makeFuture<int>(std::runtime_error("ugh")).then([](Try<int> t){
18   // we can also extract and handle the exception object
19   // TODO(jsedgwick) infer exception type from the type of the function
20   bool caught = t.withException<std::exception>([](const std::exception& e){
21     // do something with e
22   });
23 });

     但是很不幸的是,上面的代碼邏輯導致成功的處理邏輯和錯誤的處理邏輯相互交織,導致代碼不夠簡潔,同時,上述代碼還存在異常過度rethrow的問題。

2)使用onError()

      Future<T>::onError() 允許你單獨設置一個異常處理器作為回調函數,回調函數的參數類型就是你要捕獲處理的異常類型,如果future沒有異常,那么這個異常處理回調函數會被直接跳過(忽略),否則,它將會被執行,同時它返回的T或者Future<T>將會變為新的結果Future。這里需要注意的是,多次調用onError和多次catch塊的效果是不一樣的,也就是說,如果你在一個onError拋出了一個異常,那么下一個onError將會捕獲它。

 1 intGenerator() // returns a Future<int>, which might contain an exception
 2   // This is a good opportunity to use the plain value (no Try)
 3   // variant of then()
 4   .then([](int i) { 
 5     return 10 * i; // maybe we throw here instead
 6   })
 7   .onError([](const std::runtime_error& e) {
 8     // ... runtime_error handling ...
 9     return -1;
10   })
11   .onError([](const std::exception& e) {
12     // ... all other exception handling ...
13     return -2;
14   });

     你也可以直接使用onError直接處理exception_wrapper,比如當你想處理一個非std::exception異常時,例如:

1 makeFuture().then([]{
2   throw 42;
3 })
4 .onError([](exception_wrapper ew){
5   // ...
6 });

3)ensure()

     Future<T>::ensure(F func)作用非常類型java語言中的finally塊,也就是說,它只有一個void類型的函數並最終執行它而不管Future是否包含異常。結果Future將包含前一個Future的值或異常,除非提供給ensure的函數拋出了新的異常,這種情況下該異常會被捕獲並傳播,例如:

 1 auto fd = open(...);
 2 auto f = makeFuture().then([fd]{
 3   // do some stuff with the file descriptor
 4   // maybe we throw, maybe we don't
 5 })
 6 .ensure([fd]{
 7   // either way, let's release that fd
 8   close(fd);
 9 });
10 
11 // f now contains the result of the then() callback, unless the ensure()
12 // callback threw, in which case f will contain that exception

3)異常處理的性能

     在內部實現中,Future使用folly::exception_wrapper存儲異常以求將rethrow最小化,然而這個機制的有效性取決於我們所使用的庫(和exception_wrapper)是否能夠維持異常的類型信息,實際上,這意味着直接構造異常Future而不是使用throw,比如:

 1 // This version will throw the exception twice
 2 makeFuture()
 3   .then([]{
 4     throw std::runtime_error("ugh");
 5   })
 6   .onError([](const std::runtime_error& e){
 7     // ...
 8   });
 9 // This version won't throw at all!
10 makeFuture()
11   .then([]{
12     // This will properly wrap the exception
13     return makeFuture<Unit>(std::runtime_error("ugh"));
14   })
15   .onError([](const std::runtime_error& e){
16     // ...
17   });

      也就是說,直接使用onError而不是通過Try的throwing可以減少rethrow的次數。如果真的想使用Try,那么可以考慮使用

Try<T>::hasException() 和 Try<T>::withException() 來檢查和處理異常而不用將他們rethrow。

 五、高階語義

      某些時候鏈式、嵌套使用then還不足夠解決所有問題,下面將介紹一些工具便於組裝、構建future。

 1、collectAll()

      collectAll持有一個元素類型為Future<T>的可迭代集合類型,返回一個Future<std::vector<Try<T>>> ,這個返回的Future將在所有的input futures都變為completed狀態時變為completed狀態。結果(resultant)Future中的vector將按照Future被添加的順序包含input futures的值(或者異常)。任何組件Future的錯誤都不會導致這個過程提前終止,input futures都是被move而變得無效,例如:

 1 Future<T> someRPC(int i);
 2 
 3 std::vector<Future<T>> fs;
 4 for (int i = 0; i < 10; i++) {
 5   fs.push_back(someRPC(i));
 6 }
 7 
 8 collectAll(fs).then([](const std::vector<Try<T>>& tries){
 9   for (const auto& t : tries) {
10     // handle each response
11   }
12 });

      注意:和任何then回調一樣,你也可以使用只帶一個Try參數的回調,這樣可以通過編譯,但是你最好不要這么做,因為外部future失敗的唯一原因可能是庫有一個錯誤,這個建議同樣使用下面的組合操作。

 2、collectAll() variadic

     這是collectAll的可變長模板版本,它允許你混合、匹配不同類型的Future,它返回Future<std::tuple<Try<T1>, Try<T2>, ...>>類型,例如:

1 Future<int> f1 = ...;
2 Future<string> f2 = ...;
3 collectAll(f1, f2).then([](const std::tuple<Try<int>, Try<string>>& tup) {
4   int i = std::get<0>(tup).value();
5   string s = std::get<1>(tup).value();
6   // ...
7 });

 3、collect() 

      collect()有點類似collectAll(),唯一不同就是,如果input Futures中任何一個拋出了異常,那么這個Future將會被提前終止,所以collect()的返回類型為

std::vector<T>。和collectAll()一樣,input futures都是被move而變得無效,並且結果(resultant)Future中的vector將按照Future被添加的順序包含input futures的值(如果全部成功)。例如:

 1 collect(fs).then([](const std::vector<T>& vals) {
 2   for (const auto& val : vals) {
 3     // handle each response
 4   }
 5 })
 6 .onError([](const std::exception& e) {
 7   // drat, one of them failed
 8 });
 9 
10 // Or using a Try:
11 collect(fs).then([](const Try<std::vector<T>>& t) {
12  // ...
13 });

 4、collect() variadic

      這是 collect()的變長模板參數版本,它允許你混合、匹配不同類型的Future,它的返回類型為Future<std::tuple<T1, T2, ...>>。

 5、collectN()   

      collectN()類似於collectAll(),都持有一個future集合,但是除此之外,它還持有一個size_t類的N,只要input futures中有N個處於completed狀態,那么這個Future就處於completed狀態。它的返回類型為Future<std::vector<std::pair<size_t, Try<T>>>>,每一個pair都持有相關的Future在原始集合中的索引和結果,但是這些pair本身是隨機順序的。同樣,input futures都是被move而變得無效。如果input futures中同時有多個Future處於completed狀態,獲勝者將被選中,但是選擇是未定義的。

 1 // Wait for 5 of the input futures to complete
 2 collectN(fs, 5,
 3   [](const std::vector<std::pair<size_t, Try<int>>>& tries){
 4     // there will be 5 pairs
 5     for (const auto& pair : tries) {
 6       size_t index = pair.first;
 7       int result = pair.second.value();
 8       // ...
 9     }
10   });

 6、collectAny()      

      collectAny()同樣持有一個Future的集合,但是它會在input Futures中的任何一個處於completed狀態時變為completed狀態,它的返回類型為

Future<std::pair<size_t, Try<T>>>,其中pair對中持有第一個變為completed狀態的Future在原始集合中的索引和結果,input futures都是被move而變得無效。input futures都是被move而變得無效。

1 collectAny(fs, [](const std::pair<size_t, Try<int>>& p){
2   size_t index = p.first;
3   int result = p.second.value();
4   // ...
5 });

 7、map()      

      map()屬於Future的高階函數應用,它持有一個元素類型為Future<A>的集合和一個可以被傳遞給Future<A>::then()的函數,然后用這些函數作為參數反過來調用集合中每一個Future的then,然后返回一個結果(resultant )future的vector集合(順序和原始集合一致)。這個過程好比以下代碼的語法糖:

1 std::vector<Future<A>> fs;
2 std::vector<Future<B>> fs2;
3 for (auto it = fs.begin(); it < fs.end(); it++) {
4   fs2.push_back(it->then(func));
5 }

 8、reduce

      reduce()是Future的另一個高階函數,它持有一個元素類型為Future<A>的集合,一個類型為B的初始值以及一個擁有兩個參數的函數(reducing function,參數類型分別為類型為B的reduced值,來自集合中Future<A>的下一個結果值),該函數的返回值只能為B或者Future<B>,reduce()函數本身返回Future<B>,開始時,初始值和第一個Future的結果值會被應用在該函數上,然后本次應用的結果和第二個Future的結果值會被繼續應用在該函數上,以此來推,直到集合中的所有Future都被reduced或者出現了一個未處理的異常。

     reducing function的第二個參數可以為A或者Try<A>,這依賴於你是否想處理input Futures中的異常。如果 input Future中有一個異常並且你沒有去Try,那么reduce操作將會被短路,同樣,reducing function中拋出的所有異常同樣會短路整個reduce操作。

     例如,有一個Future<int> 類型的集合,現在想得到一個Future<bool>用來標識是否集合中所有的Future的值為0,那么可以這樣寫:

1 reduce(fs, true, [](bool b, int i){
2   // You could also return a Future<bool> if you needed to
3   return b && (i == 0); 
4 })
5 .then([](bool result){
6   // result is true if all inputs were zero
7 });
8 // You could use onError or Try here in case one of your input Futures
9 // contained an exception or if your reducing function threw an exception 

     為了演示異常處理,假設有一個Future<T>類型的集合,現在想獲取一個Future<bool>用於標識集合中所有的Future都沒有異常,那么可以這么寫:

1 reduce(fs, true, [](bool b, Try<T> t){
2   return b && t.hasValue();
3 })
4 .then([](bool result){
5   // result is true if all inputs were non-exceptional
6 });

      最后一個例子來看一下求和的應用:

1 reduce(fs, 0, [](int a, int b){
2   return a + b;
3 })
4 .then([](int sum){
5   // ...
6 });           

六、多線程via()

      Promise/Future的核心操作都是線程安全的,如果被誤用就會拋異常(比如有些方法重復調用了兩次,包括在不同線程中同時調用),比如then()、onError()以及其他設置回調函數的函數,只要被重復調用就會拋出異常。同樣,Promise中的setValue()和setException()同樣不能調用兩次。

      下面先來看一段代碼:

1 // Thread A
2 Promise<Unit> p;
3 auto f = p.getFuture();
4 
5 // Thread B
6 f.then(x).then(y);
7 
8 // Thread A
9 p.setValue();

      上面的代碼中,x和y分別會在哪個線程執行?不幸的是,這個是不確定的。這里Promise的填充操作和設置回調函數的操作是存在競態的,如果設置回調函數的動作先發生,那么x和y就會在Promise被填充的線程執行(也就是線程A)。如果Promise的填充操作先發生,那么x和y會在設置回調函數的線程中執行(也就是線程B),而且是立即執行。如果恰好setValue發生在兩個then之間,那么x將在線程A中執行,而y會在線程B中執行。可以想象,這種不確定性會帶來很多的問題。幸運的是,我們有另一種方法可以解決這個問題。

      Future擁有一個via()函數,該函數需要一個Executor類型的參數。Executor是一個非常簡單的接口,它只存在一個線程安全的add(std::function<void()> func) 方法,它會在某個時候執行這個func,盡管不是立即執行。而via()可以確保被設置的回調函數在指定的Executor上執行。例如:

1 makeFutureWith(x)
2   .via(exe1).then(y)
3   .via(exe2).then(z);

      在上面的例子中,y將在exe1中執行,z將在exe2中執行,這是一個相當大的抽象,它不但解決了上文提到的競態現象,還給我們提供了一個清晰、簡潔可控的線程執行模型。比如可以使用不同類型的Executor來執行不同類型的工作(io密集型和cpu密集型)。

      為了便於使用,還存在一個static類型的via版本,它創建並返回一個處於completed狀態的Future<Unit> ,同時這個Future的回調被指定在Executor上執行,例如:

1 via(exe).then(a);
2 via(exe, a).then(b);

      via()的一個另類的用法是,把Executor作為第一個參數傳遞給then,也能保證回調函數在指定的Executor上執行,與via不同的是,使用then設置的Executor不具備粘滯性,也就是只對then本身設置的回調函數有效。

      那么folly都提供了哪些Executor實現呢?

  • ThreadPoolExecutor :是一個抽象的線程池實現,支持調整大小、自定義線程工廠、池和每個任務的統計信息、支持NUMA、用戶自定義的任務終結。它和它的子類正在積極的開發之中,當前它有兩個實現。CPUThreadPoolExecutor(是一個通用線程池,除了上述功能之外,它還支持任務優先級)、IOThreadPoolExecutor (類似CPUThreadPoolExecutor,但是每一個線程都在一個EventBase 事件循環上旋轉)。
  • EventBase :是一個Executor,把任務作為一個回調在事件循環上執行。
  • ManualExecutor : 僅在手動起動時執行工作。 這對測試非常有用。
  • InlineExecutor :以內聯的方式立刻執行。
  • QueuedImmediateExecutor :類似於InlineExecutor,但在其它回調執行期間添加的工作將被放入等待隊列,而不是立即執行。
  • ScheduledExecutor:是Executor接口的子接口,支持延遲執行。
  • FutureExecutor:包裝了其他Executor,並提供了Future<T> addFuture(F func)函數返回一個Future用於異步獲取函數的執行結果。這個和futures::async(executor, func) 是等價的。

 七、超時處理

 1、時間分辨率

      后面要提到的接收時間的函數和方法時間精度都為Duration類型(std::chrono::milliseconds的別名),但是不要直接使用Duration類型,相反的,應該適當的使std::chrono::duration,例如std::chrono::seconds 或 std::chrono::milliseconds。

2、TimeKeeper

      大多數時間相關的方法都有一個可選的TimeKeeper參數。如果你想自己控制Future底層的時間運行那么可以實現TimeKeeper接口,如果沒有提供,那么一個默認的單例TimeKeeper將被使用懶漢式創建出來,默認的實現使用folly::HHWheelTimer在一專門的EventBase線程管理超時。

3、within()

      Future<T>::within()將返回一個新的Future,如果這個Future沒有在指定的時間內變為completed狀態,那么將會以一個異常(默認為TimedOut異常)變為completed狀態。例如:

1 using std::chrono::milliseconds;
2 Future<int> foo();
3 
4 // f will complete with a TimedOut exception if the Future returned by foo()
5 // does not complete within 500 ms
6 f = foo().within(milliseconds(500));
7 
8 // Same deal, but a timeout will trigger the provided exception instead
9 f2 = foo().within(milliseconds(500), std::runtime_error("you took too long!"));

4、onTimeout()

       Future<T>::onTimeout() 允許你同時設置一個超時時間和超時處理函數,例如:

1 Future<int> foo();
2 foo()
3   .onTimeout(milliseconds(500), []{
4     // You must maintain the resultant future's type
5     // ... handle timeout ...
6     return -1;
7   })
8   .then(...);

        細心的你可能會發現上述代碼只是下面的一個語法糖。

1 foo()
2   .within(milliseconds(500))
3   .onError([](const TimedOut& e) {
4     // handle timeout
5     return -1;
6   })
7   .then(...);

5、get() and wait() with timeouts

      可以為get()和wait()設置超時參數,例如:

1 Future<int> foo();
2 // Will throw TimedOut if the Future doesn't complete within one second of
3 // the get() call
4 int result = foo().get(milliseconds(1000));
5 
6 // If the Future doesn't complete within one second, f will remain
7 // incomplete. That is, if a timeout occurs, it's as if wait() was
8 // never called.
9 Future<int> f = foo().wait(milliseconds(1000));

6、delayed() 

       Future<T>::delayed()返回一個新的Future,該Future將會被延遲一定時間變為completed狀態。例如:

1 makeFuture()
2   .delayed(milliseconds(1000))
3   .then([]{
4     // This will be executed when the original Future has completed or when
5     // 1000ms has elapsed, whichever comes last.
6   });

7、futures::sleep() 

        sleep() 返回一個Future<Unit>,該Future將會在指定時間間隔之后變為completed狀態。

1 futures::sleep(milliseconds(1000)).then([]{
2   // This will be executed after 1000ms
3 }); 

八、中斷機制

     中斷是一種future持有者向Promose發送信號的機制,假設你的Future代碼在另外一個線程中執行了一個耗時很長的操作,一段時間之后你可能不需要這個操作的結果了,那么此時就可以使用中斷機制。

      中斷機制允許Future機制以異常的形式向Promise發送信號,Promise可以自由的選擇異常的處理方式(甚至可以不處理)。例如:

 1 auto p = std::make_shared<Promise<int>>();
 2 p->setInterruptHandler([weakPromise = folly::to_weak_ptr(p)](
 3     const exception_wrapper& e) {
 4   auto promise = weakPromise.lock();
 5   // Handle the interrupt. For instance, we could just fulfill the Promise
 6   // with the given exception:
 7   if (promise) {
 8     promise->setException(e);
 9   }
10 
11   // Or maybe we want the Future to complete with some special value
12   if (promise) {
13     promise->setValue(42);
14   }
15 
16   // Or maybe we don't want to do anything at all! Including not setting
17   // this handler in the first place.
18 });
19 
20 auto f = p->getFuture();
21 // The Future holder can now send an interrupt whenever it wants via raise().
22 // If the interrupt beats out the fulfillment of the Promise and there is
23 // an interrupt handler set on the Promise, that handler will be called with
24 // the provided exception
25 f.raise(std::runtime_error("Something went awry! Abort!"));
26 
27 // cancel() is syntactic sugar for raise(FutureCancellation())
28 f.cancel();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM