本文承接前文 現代 C++ 對多線程/並發的支持(上),翻譯自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅(A Tour of C++)一書的第 13 章 Concurrency。本文將繼續介紹 C++ 並發中的 future/promise,packaged_task 以及 async() 的用法。
13.7 通信任務
標准庫還在頭文件 <future> 中提供了一些機制,能夠讓編程人員基於更高的抽象層次任務來開發,而不是直接使用低層的線程、鎖:
future和promise:用於從任務(另一個線程)中返回一個值packaged_task:幫助啟動任務,封裝了future和promise,並且建立兩者之間的關聯async():像調用一個函數那樣啟動一個任務。形式最簡單,但也最強大!
13.7.1 future 和 promise
future 和 promise 可以在兩個任務之間傳值,而無需顯式地使用鎖,實現了高效地數據傳輸。其基本想法很簡單:當一個任務向另一個任務傳值時,把值放入 promise,通過特定的實現,使得值可以通過與之關聯的 future 讀出(一般誰啟動了任務,誰從 future 中取結果)。

假如有一個 future<X> 叫 fx,我們可以通過 get() 獲取類型 X 的值:
X v = fx.get(); // if necessary, wait for the value to get computed
如果值還沒有計算出,則調用 get() 的線程阻塞,直到有值返回。如果值無法計算出,get()可能拋出異常。
promise 的主要目的是提供一個簡單的“put”的操作(set_value 或 set_exception),和 future 的 get() 相呼應。
如果你有一個 promise,需要發送一個類型為 X 的結果到一個 future,你要么傳遞一個值,要么傳遞一個異常。舉個例子:
void f(promise<X>& px) // 一個任務:把結果放入 px
{
try {
X res;
// 計算 res 的值
px.set_value(res);
}
catch(...) { // 如果無法計算 res 的值
px.set_exception(current_exception()); // 傳異常到 future 的線程
}
}
current_exception() 即捕獲到的異常。
要處理通過 future 傳遞的異常,get() 的調用者必須在什么地方捕獲,例如:
void g(future<X>& fx) // 一個任務;從 fx 提取結果
{
try {
X v = fx.get(); // 如有必要,等待值計算完成
// 使用 v
}
catch(...){ // 無法計算 v
// 錯誤處理
}
}
如果 g() 不需要自己處理錯誤,代碼可以進一步簡化:
void g(future<X>& fx) // 一個任務;從 fx 提取結果
{
X v = fx.get(); // 如有必要,等待值計算完成
// 使用 v
}
思考:
future和promise是怎么關聯起來的?
13.7.2 packaged_task
如何把 future 放入一個需要結果的任務,並且把與之關聯的、產生結果的 promise 放入線程?packaged_task 可以簡化任務的設置,關聯 future/promise。packaged_task 封裝了把返回值或異常放入 promise 的操作,並且調用 packaged_task 的 get_future() 方法,可以得到一個與 promise 關聯的 future。舉個例子,我們可以設置兩個任務,借助標准庫的 accumulate() 分別累加 vector<double> 的前后部分:
double accum (double* beg, double* end, double init) // 計算以 init 為初值,[beg,end) 的和
{
return accumulate(beg,end,init);
}
double comp2(vector<double>& v)
{
using Task_type = double(double*,double*,double); // 任務的類型
packaged_task<Task_type> pt0 {accum}; // 打包任務(即 accum)
packaged_task<Task_type> pt1 {accum};
future<double> f0 {pt0.get_future()}; // 取得 pt0 的 future
future<double> f1 {pt1.get_future()}; // 取得 pt1 的 future
double* first = &v[0];
thread t1{move(pt0),first,first+v.size()/2,0}; // 為 pt0 啟動線程
thread t2{move(pt1),first+v.size()/2,first+v.size(),0}; // 為 pt1 啟動線程
return f0.get() + f1.get();
}
packaged_task 模板以任務的類型(Task_type,double(double*,double*,double) 的別名)作為其模板參數,以任務(accum)作為其構造函數的參數。move() 操作是必要的,因為 packaged_task 不可拷貝(只能移動)。packaged_task 不可拷貝是因為它是一個資源處理程序(resource handler),擁有 promise 的所有權,並且(間接地)負責與之關聯的任務可能擁有的資源。
請注意,這里的代碼沒有顯式地使用鎖:我們能夠專注於要完成的任務,而不是來管理它們通信的機制。這兩個任務在不同的線程中執行,具有了潛在的並發性。
13.7.3 async()
我在本章所追求的思路,最簡單,但也非常強大:把任務看成是一個恰巧可能和其他任務同時運行的函數。這並不是 C++ 標准庫所支持的唯一模型,但它能很好地滿足各類廣泛的需求。其他更微妙、棘手的模型,如依賴於共享內存的編程風格也可以根據實際需要使用。
要啟動潛在異步執行的任務,我們可以用 async():
double comp4(vector<double>& v) // 如果 v 足夠大,派生多個任務
{
if(v.size()<10000) // 犯得着用並發嗎?
return accum(v.begin(),v.end(),0);
auto v0 = &v[0];
auto sz = v.size();
auto f0 = async(accum,v0,v0+sz/4,0.0);
auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0);
auto f2 = async(accum,v0+sz/2,v0+sz*3/4,0.0);
auto f3 = async(accum,v0+sz*3/4,v0+sz,0.0);
return f0.get()+f1.get()+f2.get()+f3.get(); // 收集 4 部分的結果,求和
}
大體上,async() 把“調用部分”和“獲取結果部分“分離開來,並且將兩者和實際執行的任務分離。使用 async() 你不需要考慮線程、鎖;你只要從任務(潛在地、異步地計算結果)的角度去考慮就可以了。async() 也有明顯的限制:使用了共享資源、需要上鎖的任務無法使用 async(),你甚至不知道會用到多少線程,這完全是由 async() 決定的,它會根據調用時系統可用資源的情況,決定使用多少線程。例如,async() 在決定使用幾個線程前,會檢查有多少核心(處理器)空閑。
示例代碼中的猜測計算開銷和啟動線程的相對開銷(v.size()<10000)只是一個很原始、粗略的性能估計。這里不適合展開討論怎么去管理線程,但這個估計僅僅是一個簡單(可能很爛)的猜測。
請注意,async()不僅僅是專門用於並行計算、提高性能的機制。例如,它也能用於派生任務,從用戶獲取輸入,讓“主程序”忙其他事情。
13.8 建議
- 使用並發改善響應性和吞吐量
- 盡可能在最高級別的抽象上工作(比如優先考慮 async、packaged_task 而不是 thread、mutex)
- 考慮使用進程作為線程的替代方案
- 標准庫的並發支持是類型安全的
- 內存模型把多數程序員從考慮機器架構的工作中解放出來
- 內存模型使得內存的表現和我們的預期基本一致
- 原子操作為無鎖編程提供了可能性
- 把無鎖編程留給專家
- 有時順序操作比起並發更簡單、更快
- 避免數據競爭(不受控地同時訪問可變數據)
std::thread是類型安全的系統線程接口- 用
join()等待一個線程結束 - 盡量避免顯式共享數據
- 用
unique_lock管理 mutexes - 用
lock()一次性獲取多個鎖 - 用
condition_variable管理線程之間的通信 - 從(可以並行執行的)任務的角度思考,而非線程
- 不要低估“簡單性”的價值
- 選擇
packaged_task和future,而不是直接使用thread和mutex - 用
promise返回結果,從future獲取結果 - 用
packaged_task處理任務拋出的異常或返回值 - 用
packaged_task和future來表示對外部服務的請求,以及等待其回復 - 用
async()啟動簡單的任務
