一. std::async函數模板
(一)std::async和std::thread的區別
1. 兩者最明顯的區別在於async采用默認啟動策略時並不一定創建新的線程。如果系統資源緊張,那么std::thread創建線程可能失敗,系統報告異常,整個程序可能崩潰。而std::async一般則不會,它在無法創建新線程時,會將任務分配給后續調用future.get()函數的線程,並以同步的方式執行(即不創建新線程)。
2. std::async表現為更高階的抽象,它把用戶從線程管理的細節解放出來,將這些責任轉交給C++標准庫的實現者。而std::thread要求自行處理線程耗盡、超訂、負載均衡以及新平台適配問題。
3. std::thread未提供直接獲取線程函數返回值的方法。但std::async可以通過future對象來獲取。
(二)std::async函數模板及分析
1. “共享狀態”對象,用於保存線程函數(一般是可調用對象)及其參數、返回值以及新線程狀態等信息。該對象保存在堆中,由std::async、std::promise或std::package_task提供,並交由future或shared_future管理其生命期。被調方(通常指調用promise.set_value()的一方)將計算所得的結果寫入“共享狀態”,而調用方通過std::future的get()讀取該結果。
2. 調用std::async是會創建一個“_Deferred_async_state”或_“Task_async_state”類的“共享狀態”對象,該對象是_Packaged_state的子類。注意,直接創建std::promise時,生成的是“_associated_state”類的共享狀態對象,而std::package_task創建的是“_Packaged_state”類的共享狀態對象。
3. _Get_associated_state是個工廠函數,通過不同的策略創建不同的“共享狀態”對象,並將其交由future管理,負責其生命周期。future類似於std::unique_ptr,對“共享狀態”對象“獨占”所有權。
4. 與std::thread一樣,傳入std::async中的可調用對象及其參數會被按值以副本形成保存成一個tuple對象,然后再以右值的方式傳入線程函數中對應的參數。
【編程實驗】創建異步任務
#include <iostream> #include <thread> #include <future> #include <mutex> #include <vector> #include <numeric> //for std::accumulate using namespace std; std::mutex mtx; class Widget { public: void foo(int x, const std::string& s) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = "<<std::this_thread::get_id()<< " void Foo::foo(int, const std::string&): x = " << x << ", s = " << s<< endl; } void bar(const std::string& s) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() <<" void Widget::bar(const std::string&): s = " << s << endl; } void operator()(double val) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " void Widget::operator(): val = " << val << endl; } }; class NonCopyable //只移動對象 { public: NonCopyable() {}; NonCopyable(const NonCopyable&) = delete; NonCopyable& operator=(const NonCopyable&) = delete; NonCopyable(NonCopyable&&) = default; NonCopyable& operator=(NonCopyable&&) = default; double operator()(double d) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " void NonCopyable::operator(): d = " << d << endl; return d; } }; //並行計算 template<typename RandomIt> int parallel_sum(RandomIt beg, RandomIt end) { auto len = end - beg; if (len < 1000) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " invoke parallel_sum()" << endl; return std::accumulate(beg, end, 0); //遍歷[beg,end)區別的每個元素並累加。初始值為0 } RandomIt mid = beg + len / 2; auto handle = std::async(std::launch::async, //子線程將[mid,end)元素進行累加 parallel_sum<RandomIt>, mid, end); int sum = parallel_sum(beg, mid);//本線程將[begin,mid)區間元素進行累加 return sum + handle.get(); //返回兩個區間結果的累加和 } int main() { Widget w; cout << "main thread id = " << std::this_thread::get_id() << endl; //1. 參數傳遞 auto fut1 = std::async(&Widget::foo, &w, 42, "hello"); //傳入this指針:&w auto fut2 = std::async(&Widget::bar, w, "goodbye"); //傳入x的副本如tmp。 tmp.bar(...) auto fut3 = std::async(Widget(), 3.14159); //傳入Widget臨時對象,調用operator() auto fut4 = std::async(std::ref(w), 2.718); //傳入w的引用,調用operator(); NonCopyable mo; //只移動對象 auto fut5 = std::async(std::move(mo),3.14159); //mo是只移動對象,必須被轉為右值 //2. 同步、異步 auto fut6 = std::async(std::launch::async, Widget(), 1.2); //在新線程上運行,operator() auto fut7 = std::async(std::launch::deferred, &Widget::bar, &w, "deferred"); //線程延遲到調用get或wait才執行 auto fut8 = std::async(std::launch::async | std::launch::deferred, //等價於默認啟動策略 &Widget::bar, &w, "async | deferred"); fut7.get(); //主線程阻塞,等待fut7子線程。(子線程延遲到這時才執行)。 //3. 並行計算 std::vector<int> vec(10000, 1); //10000個1 int res = parallel_sum(vec.begin(), vec.end()); { std::lock_guard<std::mutex> lk(mtx); cout << "The sum is: " << res << endl; cout << "main thread end." << endl; } return 0; } /*輸出結果 main thread id = 16756 thread id = 1928 void Foo::foo(int, const std::string&): x = 42, s = hello thread id = 16756 void Widget::bar(const std::string&): s = deferred //注意,由主線程執行 thread id = 13216 void Widget::bar(const std::string&): s = goodbye thread id = 7940 void Widget::operator(): val = 3.14159 thread id = 16080 void Widget::operator(): val = 2.718 thread id = 11492 void NonCopyable::operator(): d = 3.14159 thread id = 1928 void Widget::operator(): val = 1.2 thread id = 13216 void Widget::bar(const std::string&): s = async | deferred thread id = 16756 invoke parallel_sum() thread id = 7940 invoke parallel_sum() thread id = 16080 invoke parallel_sum() thread id = 11492 invoke parallel_sum() thread id = 1928 invoke parallel_sum() thread id = 13216 invoke parallel_sum() thread id = 1928 invoke parallel_sum() thread id = 7636 invoke parallel_sum() thread id = 5816 invoke parallel_sum() thread id = 15856 invoke parallel_sum() thread id = 15832 invoke parallel_sum() thread id = 7636 invoke parallel_sum() thread id = 15400 invoke parallel_sum() thread id = 16968 invoke parallel_sum() thread id = 15856 invoke parallel_sum() thread id = 15476 invoke parallel_sum() The sum is: 10000 main thread end. */
二. std::async的啟動策略
(一)std::async的啟動策略
1. 三種啟動策略(std::async通過指定不同的啟動策略來決定創建是“共享狀態”對象)
(1)異步方式(std::launch::async):會創建一個“_Task_async_state”類的共享狀態對象。使用該策略時異味着線程函數必須以異步的方式運行,即在另一個線程之上執行。
(2)同步方式(std::launch::deferred):會創建一個“_Deferred_async_state”類的共享狀態對象。使用該策略意味着線程函數延遲到調用future的get/wait時才得以運行,而且兩者是在同一線程上以同步的方式運行。即調用future的一方會阻塞至線程函數運行結束為止。如果get/wait沒有得到調用,則線程函數不會被執行。
(3)默認啟動策略(std::launch::async|std::launch::deferred):即兩者或運算的結果,這意味着任務可能以異步或同步的方式被運行。也就是說是否創建新線程來運行任務,取決於系統資源是否緊張,由標准庫的線程管理組件承擔線程創建和銷毀、避免超訂以及負載均衡的責任。
(二)默認啟動策略
1. 帶來的問題
(1)用戶無法預知是異步還是同步運行,因為線程函數可能被調度為延遲執行。
(2)無法預知線程函數是否與調用future的get/wait函數線程是否在同一線程運行。如果此時線程函數會讀取線程局部存儲(thread_local storage, TLS),那么也就無法預知會取到哪個線程的局部存儲。
(3)有時甚至連線程函數是否會運行,這件起碼的事情都是無法預知的。這是因此無法保證在程序的每條路徑上future的get或wait都會得以調用。
2. 注意事項:
(1)默認啟動策略能正常工作需要滿足以下所有條件。
①任務不需要與調用get/wait的線程並發執行。
②讀/寫哪個線程的thread_local變量無關緊要。
③可以保證在std::async返回的future上調用get/wait,或者可以接受任務可能永不執行。
④用戶已清楚使用wait_for或wait_unitil的代碼任務可能被推遲執行,這種可能性己被納入考量。
(2)只要其中一個條件不滿足,就必須手動指定啟動策略以保證任務以異步或同步的方式運行。
【編程實驗】默認啟動策略問題的解決
#include <iostream> #include <future> using namespace std; using namespace literals; //for duration suffixes(時長后綴,如1s) void func() { std::this_thread::sleep_for(1s); } //reallyAsync函數模板:用於保證任務被異步執行 template<typename Func, typename ...Args> inline auto reallyAsync(Func&& f, Args... args) { return std::async(std::launch::async, std::forward<Func>(func), std::forward<Args>(args)...); } int main() { //wait_for函數必須可慮任務是同步或異步運行 auto fut1 = std::async(func); //默認啟動策略,無法預估任務是被同步還是異步運行 //解決方案1:wait_for(0s) if (fut1.wait_for(0s) == std::future_status::deferred){ //同步運行,wait_for(0s) fut1.get(); //等待結果 }else { //異步運行 while (fut1.wait_for(100ms) != std::future_status::ready) { //輪詢子線程是否結束 //... //並發做其他任務 } //... //fut is ready } //解決方案2:確實以異步運行任務 auto fut2 = reallyAsync(func); while (fut2.wait_for(100ms) != std::future_status::ready) //異步方式,確保wait_for返回ready的結果 { //從而消除future_status::deferred的可能 } return 0; }