C++98
標准中並沒有線程庫的存在,直到C++11
中才終於提供了多線程的標准庫,提供了管理線程、保護共享數據、線程間同步操作、原子操作等類。多線程庫對應的頭文件是#include <thread>
,類名為std::thread
。
然而線程畢竟是比較貼近系統的東西,使用起來仍然不是很方便,特別是線程同步及獲取線程運行結果上就更加麻煩。我們不能簡單的通過thread.join()
得到結果,必須定義一個線程共享的變量來傳遞結果,同時還要考慮線程間的互斥問題。好在C++11
中提供了一個相對簡單的異步接口std::async
,通過這個接口可以簡單的創建線程並通過std::future
中獲取結果。以往都是自己去封裝線程實現自己的async,現在有線程的跨平台接口可以使用就極大的方便了C++多線程編程。
先看一下std::async
的函數原型
//(C++11 起) (C++17 前)
template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async( Function&& f, Args&&... args );
//(C++11 起) (C++17 前)
template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async( std::launch policy, Function&& f, Args&&... args );
第一個參數是線程的創建策略,有兩種策略可供選擇:
std::launch::async
:在調用async就開始創建線程。std::launch::deferred
:延遲加載方式創建線程。調用async時不創建線程,直到調用了future的get或者wait時才創建線程。
默認策略是:std::launch::async | std::launch::deferred
也就是兩種策略的合集,具體什么意思后面詳細再說
第二個參數是線程函數
線程函數可接受function, lambda expression, bind expression, or another function object
第三個參數是線程函數的參數
不再說明
返回值std::future
std::future
是一個模板類,它提供了一種訪問異步操作結果的機制。從字面意思上看它表示未來,這個意思就非常貼切,因為她不是立即獲取結果但是可以在某個時候以同步的方式來獲取結果。我們可以通過查詢future的狀態來獲取異步操作的結構。future_status有三種狀態:
- deferred:異步操作還未開始
- ready:異步操作已經完成
- timeout:異步操作超時,主要用於std::future
.wait_for()
示例:
//查詢future的狀態
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred" << std::endl;
} else if (status == std::future_status::timeout) {
std::cout << "timeout" << std::endl;
} else if (status == std::future_status::ready) {
std::cout << "ready!" << std::endl;
}
} while (status != std::future_status::ready);
std::future
獲取結果的方式有三種:
- get:等待異步操作結束並返回結果
- wait:等待異步操作結束,但沒有返回值
- waite_for:超時等待返回結果,上面示例中就是對超時等待的使用展示
介紹完了std::async
的函數原型,那么它到底該如何使用呢?
std::async
的基本用法:示例鏈接
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}};
template <typename RandomIt>int parallel_sum(RandomIt beg, RandomIt end){
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main(){
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// 以默認策略調用 x.foo(42, "Hello") :
// 可能同時打印 "Hello 42" 或延遲執行
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// 以 deferred 策略調用 x.bar("world!")
// 調用 a2.get() 或 a2.wait() 時打印 "world!"
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// 以 async 策略調用 X()(43) :
// 同時打印 "43"
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // 打印 "world!"
std::cout << a3.get() << '\n'; // 打印 "53"
} // 若 a1 在此點未完成,則 a1 的析構函數在此打印 "Hello 42"
可能的結果
The sum is 10000
43
world!
53
Hello 42
由此可見,std::async
是異步操作做了一個很好的封裝,使我們不用關注線程創建內部細節,就能方便的獲取異步執行狀態和結果,還可以指定線程創建策略。
深入理解線程創建策略
- std::launch::async調度策略意味着函數必須異步執行,即在另一線程執行。
- std::launch::deferred調度策略意味着函數可能只會在std::async返回的future對象調用get或wait時執行。那就是,執行會推遲到其中一個調用發生。當調用get或wait時,函數會同步執行,即調用者會阻塞直到函數運行結束。如果get或wait沒有被調用,函數就絕對不會執行。
兩者策略都很明確,然而該函數的默認策略卻很有趣,它不是你顯示指定的,也就是第一個函數原型中所用的策略即std::launch::async | std::launch::deferred
,c++標准中給出的說明是:
進行異步執行還是惰性求值取決於實現
auto future = std::async(func); // 使用默認發射模式執行func
這種調度策略我們沒有辦法預知函數func是否會在哪個線程執行,甚至無法預知會不會被執行,因為func可能會被調度為推遲執行,即調用get或wait的時候執行,而get或wait是否會被執行或者在哪個線程執行都無法預知。
同時這種調度策略的靈活性還會混淆使用thread_local變量,這意味着如果func寫或讀這種線程本地存儲(Thread Local Storage,TLS),預知取到哪個線程的本地變量是不可能的。
它也影響了基於wait循環中的超時情況,因為調度策略可能為deferred
的,調用wait_for或者wait_until會返回值std::launch::deferred。這意味着下面的循環,看起來最終會停止,但是,實際上可能會一直運行:
void func() // f睡眠1秒后返回
{
std::this_thread::sleep_for(1);
}
auto future = std::async(func); // (概念上)異步執行f
while(fut.wait_for(100ms) != // 循環直到f執行結束
std::future_status::ready) // 但這可能永遠不會發生
{
...
}
為避免陷入死循環,我們必須檢查future是否把任務推遲,然而future無法獲知任務是否被推遲,一個好的技巧就是通過wait_for(0)來獲取future_status是否是deferred:
auto future = std::async(func); // (概念上)異步執行f
if (fut.wait_for(0) == std::future_status::deferred) // 如果任務被推遲
{
... // fut使用get或wait來同步調用f
} else { // 任務沒有被推遲
while(fut.wait_for(100ms) != std::future_status::ready) { // 不可能無限循環
... // 任務沒有被推遲也沒有就緒,所以做一些並發的事情直到任務就緒
}
... // fut就緒
}
有人可能會說既然有這么多缺點為啥還要用它,因為畢竟我們考慮的極限情況下的可能,有時候我不要求它是並發還是同步執行,也不需要考慮修改那個線程thread_local變量,同時也能接受可能任務永遠不會執行,那么這種方式就是一種方便且高效的調度策略。
綜上所述,我們總結出以下幾點:
- std::async的默認調度策略既允許任務異步執行,又允許任務同步執行。
- 默認策略靈活性導致了使用thread_local變量時的不確定性,它隱含着任務可能不會執行,它還影響了基於超時的wait調用的程序邏輯。
- 如果異步執行是必需的,指定std::launch::async發射策略。
參考文章: