(原創)發布一個c++11開發的輕量級的並行Task庫TaskCpp


TaskCpp簡介

  TaskCpp是c++11開發的一個跨平台的並行task庫,它的設計思路來源於微軟的並行計算庫ppl和intel的並行計算庫tbb,關於ppl和tbb我在前面有介紹。既然已經有了這兩個大公司開發的並行計算庫,我為什么還要開發自己的並行計算庫。有兩個原因:

  1. ppl只能在windows上用不能跨平台,tbb能跨平台,但是受限於原始設計,tbb的task比較弱沒有ppl的強大,所以他們不能完全滿足我的要求;
  2. 我覺得可以用c++11可以開發出一個輕量級的好用的並行task庫。

  TaskCpp在接口設計上盡量和ppl保持一致,因為我覺得ppl的接口很好很強大。因此,TaskCpp的接口用法和語義和ppl基本是一致的。因為TaskCpp是一個輕量級的task庫,總共也不過三百多行代碼,本着簡單夠用的原則,只提供了一些和ppl類似的常用用法, 有些不常用的特性不考慮支持。比如,不支持任務的取消,因為加入任務的取消會導致增加很多復雜性,而實際中用得比較少,所以不考慮支持,夠用就好。

支持的平台

需要支持c++11的編譯器,建議編譯器:

  • linux: GCC4.7+
  • windows: vs2012 nov ctp+, 最好是vs2013

庫的使用

  使用TaskCpp僅僅需要包含頭文件即可,在程序中使用只需要包含#include <TaskCpp.h>和少量的boost頭文件即可。

TaskCpp的功能

TaskCpp提供一下功能:

  • 並行任務:一種並行執行若干工作任務的機制。
    • 基本的異步任務
    • 延續的任務
    • 組合任務
      • WhenAll
      • WhenAny
    • 任務組
  • 並行算法:並行作用於數據集合的泛型算法。
    • ParallelForeach算法
    • ParallelInvoke算法
    • ParallelReduce算法

TaskCpp用法介紹

並行任務

基本的異步任務Task

  Task會創建一個異步操作,這個異步操作發起方式是延遲加載方式發起的,即在調用Task的Wait或者Get時才真正發起異步操作。Task可以通過std::function或者lambda表達式去創建,不支持直接原生函數創建,如果要用原生函數需要先通過lambda或者std::function包裝一下。Task的Wait接口只是等待異步操作結束。Task的Get接口接收參數並等待異步操作結束並返回結果。PPL中的get接口是不能接收參數的,TaskCpp的Get接口是可以接受任意參數的,更靈活一點,算是較PPL的一個小優點吧。下面是Task的基本用法:

#include <TaskCpp.h>
using namespace Cosmos;

void TestTask()
{
    Task<void()> task([]{cout << 1 << endl; });
    task.Wait();

    Task<void()> task1 = []{cout << 1 << endl; };
    task1.Wait();

    Task<int()> task2 = []{cout << 1 << endl; return 1; };
    cout << task2.Get() << endl;

    Task<int(int)> task3 = [](int i){cout << i << endl; return i; };
    cout << task3.Get(3) << endl;
}
View Code

組合任務--WhenAll

  WhenAll保證一個任務集合中所有的任務完成。WhenAll函數會生成一個任務,該任務可在完成一組任務之后完成。 此函數可返回一個std::vector 對象,該對象包含集合中每個任務的結果。 以下基本示例使用WhenAll創建表示完成其他三個任務的任務。下面是WhenAll的基本用法:

void PrintThread()
{
    cout << std::this_thread::get_id() << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(5));
}

void TestWhenAll()
{
    vector<Task<int()>> v = {
        Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }),
        Task<int()>([]{PrintThread(); return 2; }),
        Task<int()>([]{PrintThread(); return 3; }),
        Task<int()>([]{PrintThread(); return 4; })
    };

    cout << "when all " << endl;
    WhenAll(v).Get();

}
View Code

  注意:WhenAll是非阻塞的,它只是創建一個任務,在Wait或Get時才發起異步操作。傳遞給WhenAll的任務必須是統一的。 換言之,它們必須都返回相同類型。

組合任務--WhenAny

  WhenAny在任務集合中任意一個任務結束之后就返回。函數會生成一個任務,該任務可在完成一組任務的第一個任務之后完成。 此函數可返回一個 std::pair 對象,該對象包含已完成任務的結果和集合中任務的索引。下面是WhenAny的基本用法:

void TestWhenAny()
{
    vector<Task<int()>> v = {
        Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }),
        Task<int()>([]{PrintThread(); return 2; }),
        Task<int()>([]{PrintThread(); return 3; }),
        Task<int()>([]{PrintThread(); return 4; })
    };

    cout << "when any " << endl;
    WhenAny(v).Then([](std::pair<int, int>& result)
    {
        cout << " index " << result.first << " result " << result.second << endl;
        return result.second;
    }).Then([](int result){cout << "any result: " << result << endl; }).Get();
}
View Code


  注意:WhenAny是非阻塞的,它只是創建一個任務,在Wait或Get時才發起異步操作。傳遞給WhenAny的任務必須是統一的。 換言之,它們必須都返回相同類型。

任務組--TaskGroup

  TaskGroup可以並行的處理一組任務,TaskGroup可以接受多個task或者function,TaskGroup的Wait等待所有任務完成。下面是TaskGroup的基本用法:

void TestTaskGroup()
{
    Task<int()> t1([]{PrintThread(); return 1; });
    Task<double()> t2([]{PrintThread(); return 2.123; });
    Task<void()> t3([]{PrintThread(); });
    Task<string()> t4([]{PrintThread(); return "ok"; });

    TaskGroup group;
    group.Run(t1); 
    group.Run(t2); 
    group.Run(t3);  
    group.Run(t4); 
    
    //如果你覺得這樣一個一個Run加入任務,你也可以一起Run
    group.Run(t1, t2, t3, []{PrintThread(); return 1; });

    group.Wait();
}
View Code

  PPL的task_group的任務只能是void()形式的,TaskCpp允許一些簡單類型的任務如int()、double()、string()等,其實任務的返回類型沒有實際意義,因為Wait沒有返回值,這里支持多種返回類型的任務只不過是為了減少一點限制,用起來稍微方便一點罷了。PPL加入任務只能一個一個Run,要加入多個任務時有點繁瑣,TaskCpp可以一次Run多個任務,比PPL要方便一些。這兩點算是較PPL的兩個小優點吧。

並行算法

ParallelForeach算法

  ParallelForeach算法與 STL std::for_each 算法類似,只是 parallel_for_each 算法並發執行任務。用法比較簡單:

bool check_prime(int x) // 為了體現效果, 該函數故意沒有優化.
{
    for (int i = 2; i < x; ++i)
    if (x % i == 0)
        return false;
    return true;
}

void TestParallelFor()
{
    vector<int> v;
    for (int i = 0; i < 100000; i++)
    {
        v.push_back(i + 1);
    }

    boost::timer t;

    ParallelForeach(v.begin(), v.end(), check_prime);
    ParallelForeach(v.begin(), v.end(), check_prime);

    cout << "taskcpp: " << t.elapsed() << endl;
}
View Code

ParallelInvoke算法

  ParallelInvoke算法並行執行一組任務。 在完成所有任務之前,此算法不會返回。 當您需要同時執行多個獨立的任務時,此算法很有用。ParallelInvoke和TaskGroup的作用是一樣的。用法比較簡單:

void TestParaInvoke()
{
    auto f = []{cout << "1" << endl; return 1; };
    ParallelInvoke(f, []{cout << "2" << endl; });
}

ParallelReduce算法

  ParallelReduce算法在實際應用中比較常用,有點類似於map-reduce,可以並行的對一個集合進行reduce操作。ParallelReduce的用法稍微復雜一點,它的原型:

    • ParallelReduce(range,init, reduceFunc);
    • ParallelReduce(range,init, rangeFunc, reduceFunc);

  第一個參數是集合,第二個參數是算法的初始值,第三個參數rangeFunc是一個聲稱中間結果的函數,第四個參數是中間結果的匯聚函數。如果調用ParallelReduce(range,init, reduceFunc),則表示rangeFunc和reduceFunc是一個函數。

  下面的例子是並行的計算100000000個整數的和:

void TestParallelSum()
{
    vector<int> v;
    const int Size = 100000000;
    v.reserve(Size);
    for (int i = 0; i < Size; i++)
    {
        v.push_back(i + 1);
    }

    int i = 0;

    boost::timer t;
    auto r = ParallelReduce(v, i, [](const vector<int>::iterator& begin, 

vector<int>::iterator&end, int val)
    {
        return std::accumulate(begin, end, val);
    });
    cout << t.elapsed() << " " << r << endl;
}
View Code

  下面是並行查找最長的字符串的例子:

void TestFindString()
{
    vector<string> v;
    v.reserve(10000000);
    for (int i = 0; i < 10000000; i++)
    {
        v.emplace_back(std::to_string(i + 1));
    }

    string init = "";

    auto f = [](const vector<string>::iterator& begin, vector<string>::iterator&end, string& val)
    {
        return *std::max_element(begin, end, [](string& str1, string& str2){return str1.length()<str2.length(); });
    };

    boost::timer t;
    auto r = ParallelReduce(v, init, f, f);
    cout << t.elapsed() << " " << r << endl;
}
View Code

性能

  用四個測試用例對比測試了tbb、ppl、TaskCpp和單線程的性能。下圖是測試對比的結果:

  可以看到TaskCpp的性能比單線程效率要高,總體上也優於ppl和tbb,其中ppl和tbb在某些場景下性能還不如單線程高,所以在使用時要以實際測試數據為准,並不是一用並行庫效率就能提高。

下載地址

TaskCpp 更新版本TaskCppV1.1

TaskCpp開源協議

  遵循LGPL(GNU General Public License)協議。

注意

TaskCpp是一個任務庫,不是線程池,每啟動一個task就會創建一個線程,如果需要線程池可以看這里

聯系我

  如果發現問題或者有什么建議請給我留言或者發郵件qicosmos@163.com . 

  c++11 boost技術交流群:296561497,歡迎大家來交流技術。

后記

  TaskCpp的開發和測試花費了我兩三周的時間,開發之初我就計划將其開源,我希望更多的人能用起來並推廣TaskCpp,促進它的發展。曾經有人問我,為什么堅持發原創文章分享技術,是不是有什么好處。我一下子還真答不上來,因為我根本就沒有想過有啥好處,現在再想一下,好處嘛,分享的術也許對別人學習有幫助吧。再想想我這樣做的原因,一個原因是興趣,這要感謝c++11,是c++11讓我覺得c++語言是非常有意思和有魅力的語言,總能帶給人驚喜,沒有c++11我也不可能完成TaskCpp庫。還有就是一點點分享快樂的精神,我分享我快樂。最重要的原因是一點點夢想,c++中開源庫太少了,很多框架和基礎庫都還不夠,遠遠趕不上java,所以在使用和推廣上不如java。但是我有一點夢想:我希望通過自己的一點努力能讓c++的世界變得更加美好,能讓c++開發者的日子變得美好。是的,正是這個夢想促使我將我開發的大部分代碼都開源出來!也正是這個夢想促使我堅持寫原創博客分享技術!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM