TaskCpp簡介
TaskCpp是c++11開發的一個跨平台的並行task庫,它的設計思路來源於微軟的並行計算庫ppl和intel的並行計算庫tbb,關於ppl和tbb我在前面有介紹。既然已經有了這兩個大公司開發的並行計算庫,我為什么還要開發自己的並行計算庫。有兩個原因:
- ppl只能在windows上用不能跨平台,tbb能跨平台,但是受限於原始設計,tbb的task比較弱沒有ppl的強大,所以他們不能完全滿足我的要求;
- 我覺得可以用c++11可以開發出一個輕量級的好用的並行task庫。
TaskCpp在接口設計上盡量和ppl保持一致,因為我覺得ppl的接口很好很強大。因此,TaskCpp的接口用法和語義和ppl基本是一致的。因為TaskCpp是一個輕量級的task庫,總共也不過三百多行代碼,本着簡單夠用的原則,只提供了一些和ppl類似的常用用法, 有些不常用的特性不考慮支持。比如,不支持任務的取消,因為加入任務的取消會導致增加很多復雜性,而實際中用得比較少,所以不考慮支持,夠用就好。
支持的平台
需要支持c++11的編譯器,建議編譯器:
- linux: GCC4.7+
- windows: vs2012 nov ctp+, 最好是vs2013
庫的使用
使用TaskCpp僅僅需要包含頭文件即可,在程序中使用只需要包含#include <TaskCpp.h>和少量的boost頭文件即可。
TaskCpp的功能
TaskCpp提供一下功能:
- 並行任務:一種並行執行若干工作任務的機制。
- 基本的異步任務
- 延續的任務
- 組合任務
- WhenAll
- WhenAny
- 任務組
- 並行算法:並行作用於數據集合的泛型算法。
- ParallelForeach算法
- ParallelInvoke算法
- ParallelReduce算法
TaskCpp用法介紹
並行任務
基本的異步任務Task
Task會創建一個異步操作,這個異步操作發起方式是延遲加載方式發起的,即在調用Task的Wait或者Get時才真正發起異步操作。Task可以通過std::function或者lambda表達式去創建,不支持直接原生函數創建,如果要用原生函數需要先通過lambda或者std::function包裝一下。Task的Wait接口只是等待異步操作結束。Task的Get接口接收參數並等待異步操作結束並返回結果。PPL中的get接口是不能接收參數的,TaskCpp的Get接口是可以接受任意參數的,更靈活一點,算是較PPL的一個小優點吧。下面是Task的基本用法:
#include <TaskCpp.h> using namespace Cosmos; void TestTask() { Task<void()> task([]{cout << 1 << endl; }); task.Wait(); Task<void()> task1 = []{cout << 1 << endl; }; task1.Wait(); Task<int()> task2 = []{cout << 1 << endl; return 1; }; cout << task2.Get() << endl; Task<int(int)> task3 = [](int i){cout << i << endl; return i; }; cout << task3.Get(3) << endl; }
組合任務--WhenAll
WhenAll保證一個任務集合中所有的任務完成。WhenAll函數會生成一個任務,該任務可在完成一組任務之后完成。 此函數可返回一個std::vector 對象,該對象包含集合中每個任務的結果。 以下基本示例使用WhenAll創建表示完成其他三個任務的任務。下面是WhenAll的基本用法:
void PrintThread() { cout << std::this_thread::get_id() << endl; std::this_thread::sleep_for(std::chrono::milliseconds(5)); } void TestWhenAll() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when all " << endl; WhenAll(v).Get(); }
注意:WhenAll是非阻塞的,它只是創建一個任務,在Wait或Get時才發起異步操作。傳遞給WhenAll的任務必須是統一的。 換言之,它們必須都返回相同類型。
組合任務--WhenAny
WhenAny在任務集合中任意一個任務結束之后就返回。函數會生成一個任務,該任務可在完成一組任務的第一個任務之后完成。 此函數可返回一個 std::pair 對象,該對象包含已完成任務的結果和集合中任務的索引。下面是WhenAny的基本用法:
void TestWhenAny() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when any " << endl; WhenAny(v).Then([](std::pair<int, int>& result) { cout << " index " << result.first << " result " << result.second << endl; return result.second; }).Then([](int result){cout << "any result: " << result << endl; }).Get(); }
注意:WhenAny是非阻塞的,它只是創建一個任務,在Wait或Get時才發起異步操作。傳遞給WhenAny的任務必須是統一的。 換言之,它們必須都返回相同類型。
任務組--TaskGroup
TaskGroup可以並行的處理一組任務,TaskGroup可以接受多個task或者function,TaskGroup的Wait等待所有任務完成。下面是TaskGroup的基本用法:
void TestTaskGroup() { Task<int()> t1([]{PrintThread(); return 1; }); Task<double()> t2([]{PrintThread(); return 2.123; }); Task<void()> t3([]{PrintThread(); }); Task<string()> t4([]{PrintThread(); return "ok"; }); TaskGroup group; group.Run(t1); group.Run(t2); group.Run(t3); group.Run(t4); //如果你覺得這樣一個一個Run加入任務,你也可以一起Run group.Run(t1, t2, t3, []{PrintThread(); return 1; }); group.Wait(); }
PPL的task_group的任務只能是void()形式的,TaskCpp允許一些簡單類型的任務如int()、double()、string()等,其實任務的返回類型沒有實際意義,因為Wait沒有返回值,這里支持多種返回類型的任務只不過是為了減少一點限制,用起來稍微方便一點罷了。PPL加入任務只能一個一個Run,要加入多個任務時有點繁瑣,TaskCpp可以一次Run多個任務,比PPL要方便一些。這兩點算是較PPL的兩個小優點吧。
並行算法
ParallelForeach算法
ParallelForeach算法與 STL std::for_each 算法類似,只是 parallel_for_each 算法並發執行任務。用法比較簡單:
bool check_prime(int x) // 為了體現效果, 該函數故意沒有優化. { for (int i = 2; i < x; ++i) if (x % i == 0) return false; return true; } void TestParallelFor() { vector<int> v; for (int i = 0; i < 100000; i++) { v.push_back(i + 1); } boost::timer t; ParallelForeach(v.begin(), v.end(), check_prime); ParallelForeach(v.begin(), v.end(), check_prime); cout << "taskcpp: " << t.elapsed() << endl; }
ParallelInvoke算法
ParallelInvoke算法並行執行一組任務。 在完成所有任務之前,此算法不會返回。 當您需要同時執行多個獨立的任務時,此算法很有用。ParallelInvoke和TaskGroup的作用是一樣的。用法比較簡單:
void TestParaInvoke() { auto f = []{cout << "1" << endl; return 1; }; ParallelInvoke(f, []{cout << "2" << endl; }); }
ParallelReduce算法
ParallelReduce算法在實際應用中比較常用,有點類似於map-reduce,可以並行的對一個集合進行reduce操作。ParallelReduce的用法稍微復雜一點,它的原型:
-
- ParallelReduce(range,init, reduceFunc);
- ParallelReduce(range,init, rangeFunc, reduceFunc);
第一個參數是集合,第二個參數是算法的初始值,第三個參數rangeFunc是一個聲稱中間結果的函數,第四個參數是中間結果的匯聚函數。如果調用ParallelReduce(range,init, reduceFunc),則表示rangeFunc和reduceFunc是一個函數。
下面的例子是並行的計算100000000個整數的和:
void TestParallelSum() { vector<int> v; const int Size = 100000000; v.reserve(Size); for (int i = 0; i < Size; i++) { v.push_back(i + 1); } int i = 0; boost::timer t; auto r = ParallelReduce(v, i, [](const vector<int>::iterator& begin, vector<int>::iterator&end, int val) { return std::accumulate(begin, end, val); }); cout << t.elapsed() << " " << r << endl; }
下面是並行查找最長的字符串的例子:
void TestFindString() { vector<string> v; v.reserve(10000000); for (int i = 0; i < 10000000; i++) { v.emplace_back(std::to_string(i + 1)); } string init = ""; auto f = [](const vector<string>::iterator& begin, vector<string>::iterator&end, string& val) { return *std::max_element(begin, end, [](string& str1, string& str2){return str1.length()<str2.length(); }); }; boost::timer t; auto r = ParallelReduce(v, init, f, f); cout << t.elapsed() << " " << r << endl; }
性能
用四個測試用例對比測試了tbb、ppl、TaskCpp和單線程的性能。下圖是測試對比的結果:

可以看到TaskCpp的性能比單線程效率要高,總體上也優於ppl和tbb,其中ppl和tbb在某些場景下性能還不如單線程高,所以在使用時要以實際測試數據為准,並不是一用並行庫效率就能提高。
下載地址
TaskCpp開源協議
遵循LGPL(GNU General Public License)協議。
注意
TaskCpp是一個任務庫,不是線程池,每啟動一個task就會創建一個線程,如果需要線程池可以看這里。
聯系我
如果發現問題或者有什么建議請給我留言或者發郵件qicosmos@163.com .
c++11 boost技術交流群:296561497,歡迎大家來交流技術。
后記
TaskCpp的開發和測試花費了我兩三周的時間,開發之初我就計划將其開源,我希望更多的人能用起來並推廣TaskCpp,促進它的發展。曾經有人問我,為什么堅持發原創文章分享技術,是不是有什么好處。我一下子還真答不上來,因為我根本就沒有想過有啥好處,現在再想一下,好處嘛,分享的術也許對別人學習有幫助吧。再想想我這樣做的原因,一個原因是興趣,這要感謝c++11,是c++11讓我覺得c++語言是非常有意思和有魅力的語言,總能帶給人驚喜,沒有c++11我也不可能完成TaskCpp庫。還有就是一點點分享快樂的精神,我分享我快樂。最重要的原因是一點點夢想,c++中開源庫太少了,很多框架和基礎庫都還不夠,遠遠趕不上java,所以在使用和推廣上不如java。但是我有一點夢想:我希望通過自己的一點努力能讓c++的世界變得更加美好,能讓c++開發者的日子變得美好。是的,正是這個夢想促使我將我開發的大部分代碼都開源出來!也正是這個夢想促使我堅持寫原創博客分享技術!
