線程同步主要是為了解決對共享數據的競爭訪問問題,所以線程同步主要是對共享數據的訪問同步化(按照既定的先后次序,一個訪問需要阻塞等待前一個訪問完成后才能開始)。這篇文章談到的異步編程主要是針對任務或線程的執行順序,也即一個任務不需要阻塞等待上一個任務執行完成后再開始執行,程序的執行順序與任務的排列順序是不一致的。下面從任務執行順序的角度解釋下同步與異步的區別:
同步:就是在發出一個調用時,在沒有得到結果之前,該調用就不返回。但是一旦調用返回,就得到返回值了。換句話說,就是由調用者主動等待這個調用的結果。
異步:調用在發出之后,這個調用就直接返回了,所以沒有返回結果。換句話說,當一個異步過程調用發出后,調用者不會立刻得到結果。而是在調用發出后,被調用者通過狀態、通知來通知調用者,或通過回調函數處理這個調用。
二、如何使用異步編程
在線程庫< thread >中並沒有獲得線程執行結果的方法,通常情況下,線程調用者需要獲得線程的執行結果或執行狀態,以便后續任務的執行。那么,通過什么方式獲得被調用者的執行結果或狀態呢?
2.1 使用全局變量與條件變量傳遞結果
前面談到的條件變量具有“通知–喚醒”功能,可以把執行結果或執行狀態放入一個全局變量中,當被調用者執行完任務后,通過條件變量通知調用者結果或狀態已更新,可以使用了。下面給出一個程序示例:
//future1.cpp 使用全局變量傳遞被調用線程返回結果,使用條件變量通知調用線程已獲得結果 #include <vector> #include <numeric> #include <iostream> #include <chrono> #include <thread> #include <mutex> #include <condition_variable> int res = 0; //保存結果的全局變量 std::mutex mu; //互斥鎖全局變量 std::condition_variable cond; //全局條件變量 void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last) { int sum = std::accumulate(first, last, 0); //標准庫求和函數 std::unique_lock<std::mutex> locker(mu); res = sum; locker.unlock(); cond.notify_one(); // 向一個等待線程發出“條件已滿足”的通知 } int main() { std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::thread work_thread(accumulate, numbers.begin(), numbers.end()); std::unique_lock<std::mutex> locker(mu); cond.wait(locker, [](){ return res;}); //如果條件變量被喚醒,檢查結果是否被改變,為真則直接返回,為假則繼續等待 std::cout << "result=" << res << '\n'; locker.unlock(); work_thread.join(); //阻塞等待線程執行完成 getchar(); return 0; }
函數執行結果如下:
從上面的代碼可以看出,雖然也實現了獲取異步任務執行結果的功能,但需要的全局變量較多,多線程間的耦合度也較高,編寫復雜程序時容易引入bug。有沒有更好的方式實現異步編程呢?C++ 11新增了一個< future >庫函數為異步編程提供了很大的便利。
2.2 使用promise與future傳遞結果
< future >頭文件功能允許對特定提供者設置的值進行異步訪問,可能在不同的線程中。
這些提供程序(要么是promise 對象,要么是packaged_task對象,或者是對異步的調用async)與future對象共享共享狀態:提供者使共享狀態就緒的點與future對象訪問共享狀態的點同步。< future >頭文件的結構如下:
注意前面提到的共享狀態,多線程間傳遞的返回值或拋出的異常都是在共享狀態中交流的。我們知道多線程間並發訪問共享數據是需要保持同步的,這里的共享狀態是保證返回值或異常在線程間正確傳遞的關鍵,被調用線程可以通過改變共享狀態通知調用線程返回值或異常已寫入完畢,可以訪問或操作了。future的狀態(future_status)有以下三種:
deferred:異步操作還沒開始;
ready:異步操作已經完成;
timeout:異步操作超時。
既然線程間傳遞返回值或異常是通過共享狀態進行的,就涉及到共享狀態的提供方與獲取方,只有該任務或線程擁有包含共享狀態的對象,其他任務或線程才能夠通過共享狀態的通知機制同步獲取到該人物或線程的返回值或異常。我們通常使用的< thread >創建線程並不擁有共享狀態,我們需要為該線程提供一個共享狀態,以便后續對其返回值或異常的訪問。那么,怎么為一個線程提供一個包含共享狀態的對象呢?這就需要借助std::promise< T >類模板實現了,其具體用法如下:
std::promise< T >構造時,產生一個未就緒的共享狀態(包含存儲的T值和是否就緒的狀態)。可設置T值,並讓狀態變為ready。也可以通過產生一個future對象獲取到已就緒的共享狀態中的T值。繼續使用上面的程序示例,改為使用promise傳遞結果,修改后的代碼如下:
//future2.cpp 使用promise傳遞被調用線程返回結果,通過共享狀態變化通知調用線程已獲得結果 #include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono> void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise) { int sum = std::accumulate(first, last, 0); accumulate_promise.set_value(sum); // 將結果存入,並讓共享狀態變為就緒以提醒future } int main() { // 演示用 promise<int> 在線程間傳遞結果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::promise<int> accumulate_promise; std::future<int> accumulate_future = accumulate_promise.get_future(); std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise)); accumulate_future.wait(); //等待結果 std::cout << "result=" << accumulate_future.get() << '\n'; work_thread.join(); //阻塞等待線程執行完成 getchar(); return 0; }
std::promise< T >對象的成員函數get_future()產生一個std::future< T >對象,代碼示例中已經展示了future對象的兩個方法:wait()與get(),下面給出更多操作函數供參考:
值得注意的是,std::future< T >在多個線程等待時,只有一個線程能獲取等待結果。當需要多個線程等待相同的事件的結果(即多處訪問同一個共享狀態),需要用std::shared_future< T >來替代std::future < T >,std::future< T >也提供了一個將future轉換為shared_future的方法f.share(),但轉換后原future狀態失效。這有點類似於智能指針std::unique_ptr< T >與std::shared_ptr< T >的關系,使用時需要留心。
2.3使用packaged_task與future傳遞結果
除了為一個任務或線程提供一個包含共享狀態的變量,還可以直接把共享狀態包裝進一個任務或線程中。這就需要借助std::packaged_task< Func >來實現了,其具體用法如下:
std::packaged_task< Func >構造時綁定一個函數對象,也產生一個未就緒的共享狀態。通過thread啟動或者仿函數形式啟動該函數對象。但是相比promise,沒有提供set_value()公用接口,而是當執行完綁定的函數對象,其執行結果返回值或所拋異常被存儲於能通過 std::future 對象訪問的共享狀態中。繼續使用上面的程序示例,改為使用packaged_task傳遞結果,修改后的代碼如下:
//future3.cpp 使用packaged_task傳遞被調用線程返回結果,通過共享狀態變化通知調用線程已獲得結果 #include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono> int accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last) { int sum = std::accumulate(first, last, 0); return sum; } int main() { // 演示用 packaged_task 在線程間傳遞結果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::packaged_task<int(std::vector<int>::iterator,std::vector<int>::iterator)> accumulate_task(accumulate); std::future<int> accumulate_future = accumulate_task.get_future(); std::thread work_thread(std::move(accumulate_task), numbers.begin(), numbers.end()); accumulate_future.wait(); //等待結果 std::cout << "result=" << accumulate_future.get() << '\n'; work_thread.join(); //阻塞等待線程執行完成 getchar(); return 0; }
一般不同函數間傳遞數據時,主要是借助全局變量、返回值、函數參數等來實現的。上面第一種方法使用全局變量傳遞數據,會使得不同函數間的耦合度較高,不利於模塊化編程。后面兩種方法分別通過函數參數與返回值來傳遞數據,可以降低函數間的耦合度,使編程和維護更簡單快捷。
2.4 使用async傳遞結果
前面介紹的std::promise< T >與std::packaged_task< Func >已經提供了較豐富的異步編程工具,但在使用時既需要創建提供共享狀態的對象(promise與packaged_task),又需要創建訪問共享狀態的對象(future與shared_future),還是覺得使用起來不夠方便。有沒有更簡單的異步編程工具呢?future頭文件也確實封裝了更高級別的函數std::async,其具體用法如下:
std::future std::async(std::launch policy, Func, Args…)
std::async是一個函數而非類模板,其函數執行完后的返回值綁定給使用std::async的std::futrue對象(std::async其實是封裝了thread,packged_task的功能,使異步執行一個任務更為方便)。Func是要調用的可調用對象(function, member function, function object, lambda),Args是傳遞給Func的參數,std::launch policy是啟動策略,它控制std::async的異步行為,我們可以用三種不同的啟動策略來創建std::async:
std::launch::async參數 保證異步行為,即傳遞函數將在單獨的線程中執行;
std::launch::deferred參數 當其他線程調用get()/wait()來訪問共享狀態時,將調用非異步行為;
std::launch::async | std::launch::deferred參數 是默認行為(可省略)。有了這個啟動策略,它可以異步運行或不運行,這取決於系統的負載。
繼續使用上面的程序示例,改為使用std::async傳遞結果,修改后的代碼如下:
//future4.cpp 使用async傳遞被調用線程返回結果 #include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono> int accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last) { int sum = std::accumulate(first, last, 0); return sum; } int main() { // 演示用 async 在線程間傳遞結果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; auto accumulate_future = std::async(std::launch::async, accumulate, numbers.begin(), numbers.end()); //auto可以自動推斷變量的類型 std::cout << "result=" << accumulate_future.get() << '\n'; getchar(); return 0; }
從上面的代碼可以看出使用std::async能在很大程度上簡少編程工作量,使我們不用關注線程創建內部細節,就能方便的獲取異步執行狀態和結果,還可以指定線程創建策略。所以,我們可以使用std::async替代線程的創建,讓它成為我們做異步操作的首選。