第26課 std::async異步任務


一. std::async函數模板

(一)std::async和std::thread的區別

  1. 兩者最明顯的區別在於async采用默認啟動策略時並不一定創建新的線程。如果系統資源緊張,那么std::thread創建線程可能失敗,系統報告異常,整個程序可能崩潰。而std::async一般則不會,它在無法創建新線程時,會將任務分配給后續調用future.get()函數的線程,並以同步的方式執行(即不創建新線程)。

  2. std::async表現為更高階的抽象,它把用戶從線程管理的細節解放出來,將這些責任轉交給C++標准庫的實現者。而std::thread要求自行處理線程耗盡、超訂、負載均衡以及新平台適配問題

  3. std::thread未提供直接獲取線程函數返回值的方法。但std::async可以通過future對象來獲取

(二)std::async函數模板及分析 

  1. “共享狀態”對象,用於保存線程函數(一般是可調用對象)及其參數、返回值以及新線程狀態等信息。該對象保存在堆中,由std::async、std::promise或std::package_task提供,並交由future或shared_future管理其生命期。被調方(通常指調用promise.set_value()的一方)將計算所得的結果寫入“共享狀態”,而調用方通過std::future的get()讀取該結果。

 

  2. 調用std::async是會創建一個“_Deferred_async_state”或_“Task_async_state”類的“共享狀態”對象,該對象是_Packaged_state的子類。注意,直接創建std::promise時,生成的是“_associated_state”類的共享狀態對象,而std::package_task創建的是“_Packaged_state”類的共享狀態對象

  3. _Get_associated_state是個工廠函數,通過不同的策略創建不同的“共享狀態”對象,並將其交由future管理,負責其生命周期。future類似於std::unique_ptr,對“共享狀態”對象“獨占”所有權。

  4. 與std::thread一樣,傳入std::async中的可調用對象及其參數會被按值以副本形成保存成一個tuple對象,然后再以右值的方式傳入線程函數中對應的參數。

【編程實驗】創建異步任務

#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <vector>
#include <numeric> //for std::accumulate

using namespace std;

std::mutex mtx;

class Widget
{
public:
    void foo(int x, const std::string& s)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = "<<std::this_thread::get_id()<<
            " void Foo::foo(int, const std::string&): x = " <<  x << ", s = " << s<< endl;
    }

    void bar(const std::string& s)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()  
             <<" void Widget::bar(const std::string&): s = " << s << endl;
    }

    void operator()(double val)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
             << " void Widget::operator(): val = " << val << endl;
    }
};

class NonCopyable //只移動對象
{
public:
    NonCopyable() {};

    NonCopyable(const NonCopyable&) = delete;
    NonCopyable& operator=(const NonCopyable&) = delete;

    NonCopyable(NonCopyable&&) = default;
    NonCopyable& operator=(NonCopyable&&) = default;

    double operator()(double d)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
             << " void NonCopyable::operator(): d = " << d << endl;
        return d;
    }
};

//並行計算
template<typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
            << " invoke parallel_sum()" << endl;
        return std::accumulate(beg, end, 0); //遍歷[beg,end)區別的每個元素並累加。初始值為0
    }
        

    RandomIt mid = beg + len / 2;
    auto handle = std::async(std::launch::async,  //子線程將[mid,end)元素進行累加
                            parallel_sum<RandomIt>, mid, end);

    int sum = parallel_sum(beg, mid);//本線程將[begin,mid)區間元素進行累加

    return sum + handle.get(); //返回兩個區間結果的累加和
}

int main()
{
    Widget w;

    cout << "main thread id = " << std::this_thread::get_id() << endl;
    //1. 參數傳遞
    auto fut1 = std::async(&Widget::foo, &w, 42, "hello"); //傳入this指針:&w
    auto fut2 = std::async(&Widget::bar, w, "goodbye"); //傳入x的副本如tmp。 tmp.bar(...)

    auto fut3 = std::async(Widget(), 3.14159); //傳入Widget臨時對象,調用operator()
    auto fut4 = std::async(std::ref(w), 2.718);  //傳入w的引用,調用operator();

    NonCopyable mo;    //只移動對象
    auto fut5 = std::async(std::move(mo),3.14159); //mo是只移動對象,必須被轉為右值

    //2. 同步、異步
    auto fut6 = std::async(std::launch::async, Widget(), 1.2); //在新線程上運行,operator()
    auto fut7 = std::async(std::launch::deferred, &Widget::bar, &w, "deferred"); //線程延遲到調用get或wait才執行

    auto fut8 = std::async(std::launch::async | std::launch::deferred, //等價於默認啟動策略
                         &Widget::bar, &w, "async | deferred");

    fut7.get(); //主線程阻塞,等待fut7子線程。(子線程延遲到這時才執行)。

    //3. 並行計算
    std::vector<int> vec(10000, 1); //10000個1
    int res = parallel_sum(vec.begin(), vec.end());
    
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "The sum is: " << res << endl;

        cout << "main thread  end." << endl;
    }

    return 0;
}
/*輸出結果
main thread id = 16756
thread id = 1928 void Foo::foo(int, const std::string&): x = 42, s = hello
thread id = 16756 void Widget::bar(const std::string&): s = deferred  //注意,由主線程執行
thread id = 13216 void Widget::bar(const std::string&): s = goodbye
thread id = 7940 void Widget::operator(): val = 3.14159
thread id = 16080 void Widget::operator(): val = 2.718
thread id = 11492 void NonCopyable::operator(): d = 3.14159
thread id = 1928 void Widget::operator(): val = 1.2
thread id = 13216 void Widget::bar(const std::string&): s = async | deferred
thread id = 16756 invoke parallel_sum()
thread id = 7940 invoke parallel_sum()
thread id = 16080 invoke parallel_sum()
thread id = 11492 invoke parallel_sum()
thread id = 1928 invoke parallel_sum()
thread id = 13216 invoke parallel_sum()
thread id = 1928 invoke parallel_sum()
thread id = 7636 invoke parallel_sum()
thread id = 5816 invoke parallel_sum()
thread id = 15856 invoke parallel_sum()
thread id = 15832 invoke parallel_sum()
thread id = 7636 invoke parallel_sum()
thread id = 15400 invoke parallel_sum()
thread id = 16968 invoke parallel_sum()
thread id = 15856 invoke parallel_sum()
thread id = 15476 invoke parallel_sum()
The sum is: 10000
main thread  end.
*/

二. std::async的啟動策略

(一)std::async的啟動策略

  1. 三種啟動策略(std::async通過指定不同的啟動策略來決定創建是“共享狀態”對象)

  (1)異步方式(std::launch::async):會創建一個“_Task_async_state”類的共享狀態對象。使用該策略時異味着線程函數必須以異步的方式運行,即在另一個線程之上執行

  (2)同步方式(std::launch::deferred):會創建一個“_Deferred_async_state”類的共享狀態對象。使用該策略意味着線程函數延遲到調用future的get/wait時才得以運行,而且兩者是在同一線程上以同步的方式運行。即調用future的一方會阻塞至線程函數運行結束為止。如果get/wait沒有得到調用,則線程函數不會被執行。

  (3)默認啟動策略std::launch::async|std::launch::deferred)即兩者或運算的結果,這意味着任務可能以異步或同步的方式被運行。也就是說是否創建新線程來運行任務,取決於系統資源是否緊張,由標准庫的線程管理組件承擔線程創建和銷毀、避免超訂以及負載均衡的責任。

(二)默認啟動策略

  1. 帶來的問題

  (1)用戶無法預知是異步還是同步運行,因為線程函數可能被調度為延遲執行。

  (2)無法預知線程函數是否與調用future的get/wait函數線程是否在同一線程運行。如果此時線程函數會讀取線程局部存儲(thread_local storage, TLS),那么也就無法預知會取到哪個線程的局部存儲

  (3)有時甚至連線程函數是否會運行,這件起碼的事情都是無法預知的這是因此無法保證在程序的每條路徑上future的get或wait都會得以調用。

  2. 注意事項:

  (1)默認啟動策略能正常工作需要滿足以下所有條件

    ①任務不需要與調用get/wait的線程並發執行。

    ②讀/寫哪個線程的thread_local變量無關緊要。

    ③可以保證在std::async返回的future上調用get/wait,或者可以接受任務可能永不執行。

    ④用戶已清楚使用wait_for或wait_unitil的代碼任務可能被推遲執行,這種可能性己被納入考量。

  (2)只要其中一個條件不滿足,就必須手動指定啟動策略以保證任務以異步或同步的方式運行。

【編程實驗】默認啟動策略問題的解決

#include <iostream>
#include <future>

using namespace std;
using namespace literals; //for duration suffixes(時長后綴,如1s)

void func()
{
    std::this_thread::sleep_for(1s);
}

//reallyAsync函數模板:用於保證任務被異步執行
template<typename Func, typename ...Args>
inline auto reallyAsync(Func&& f, Args... args)
{
    return std::async(std::launch::async,
                      std::forward<Func>(func),
                      std::forward<Args>(args)...);
}

int main()
{
    //wait_for函數必須可慮任務是同步或異步運行

    auto fut1 = std::async(func); //默認啟動策略,無法預估任務是被同步還是異步運行

    //解決方案1:wait_for(0s)
    if (fut1.wait_for(0s) == std::future_status::deferred){ //同步運行,wait_for(0s)
        fut1.get(); //等待結果
    }else { //異步運行
        while (fut1.wait_for(100ms) != std::future_status::ready) { //輪詢子線程是否結束
            //...   //並發做其他任務
        }

        //... //fut is ready
    }

    //解決方案2:確實以異步運行任務
    auto fut2 = reallyAsync(func);
    while (fut2.wait_for(100ms) != std::future_status::ready) //異步方式,確保wait_for返回ready的結果
    {                                                         //從而消除future_status::deferred的可能

    }

    return 0;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM