Intel Thread Building Blocks (TBB) 入門篇


一、什么是TBB

       TBB(Thread Building Blocks)是英特爾發布的一個庫,全稱為 Threading Building Blocks。TBB 獲得過 17 屆 Jolt Productivity Awards,是一套 C++ 模板庫,和直接利用 OS API 寫程序的 raw thread 比,在並行編程方面提供了適當的抽象,當然還包括更多其他內容,比如 task 概念,常用算法的成熟實現,自動負載均衡特 性還有不綁定 CPU 數量的靈活的可擴展性等等。STL 之父,Alexander Stepanov 對此評價不錯,他說“Threading Building Blocks… could become a basis for the concurrency dimension of the C++ standard library”。其他 TBB 的早期用戶,包括 Autodesk,Sun,Red Hat, Turbo Linux 等亦然。現在 O’Reilly 已經出版了一本 Intel Threading Building Blocks: Outfitting C++ for Multi-core Processor Parallelism。

二、為什么要TBB

        在多核的平台上開發並行化的程序,必須合理地利用系統的資源 - 如與內核數目相匹配的線程,內存的合理訪問次序,最大化重用緩存。有時候用戶使用(系統)低級的應用接口創建、管理線程,很難保證是否程序處於最佳狀態。 

       而 Intel Thread Building Blocks (TBB) 很好地解決了上述問題: 
  1)TBB提供C++模版庫,用戶不必關注線程,而專注任務本身。 
  2)抽象層僅需很少的接口代碼,性能上毫不遜色。 
  3)靈活地適合不同的多核平台。 
  4)線程庫的接口適合於跨平台的移植(Linux, Windows, Mac) 
  5)支持的C++編譯器 – Microsoft, GNU and Intel  

三、TBB庫包含的內容

        TBB包含了 Algorithms、Containers、Memory Allocation、Synchronization、Timing、Task Scheduling這六個模塊。TBB的結構:

1、通用的並行算法 
      1) 循環的並行: 
       ① parallel_for
       parallel_for是在一個值域執行並行迭代操作的模板函數(如對數組求和),
           
parallel_for(range, body, partitioner)提供了並行迭代的泛型形式。它表示在區域的每個值,並行執行body。partitioner選項指定了分割策略。Range類型必須符合Range概念模型。body必須符合下表的要求:
                
例子:
 1 #include <iostream>
 2 #include <vector>
 3 #include <tbb/tbb.h>
 4 #include <tbb/blocked_range.h>
 5 #include <tbb/parallel_for.h>
 6  
 7 using namespace std;  8 using namespace tbb;  9  
10 typedef vector<int>::iterator IntVecIt; 11  
12 struct body 13 { 14    void operator()(const blocked_range<IntVecIt>&r)const
15  { 16       for(auto i = r.begin(); i!=r.end(); i++) 17  
18         cout<<*i<<' '; 19  } 20 }; 21  
22 int main() 23 { 24    vector<int> vec; 25    for(int i=0; i<10; i++) 26  vec.push_back(i); 27  
28    parallel_for(blocked_range< IntVecIt>(vec.begin(), vec.end()) 29  , body()); 30    return 0; 31 }
       ②parallel_reduce
       parallel_reduce模板在一個區域迭代,將由各個任務計算得到的部分結果合並,得到最終結果。parallel_reduce對區域(range)類型的要求與parallel_for一樣。body類型需要分割構造函數以及一個join方法。body的分割構造函數拷貝運行循環體需要的只讀數據,並分配並歸操作中初始化並歸變量的標志元素。join方法會組合並歸操作中各任務的結果。
parallel_reduce使用分割構造函數來為每個線程生成一個或多個body的拷貝。當它拷貝body的時候,也許body的operator()或者join()正在並發運行。要確保這種並發運行下的安全。典型應用中,這種安全要求不會消耗你太多的精力。 
 1 #include <iostream>
 2 #include <tbb/parallel_reduce.h>
 3 #include <tbb/blocked_range.h>
 4 #include <vector> 
 5  
 6 using namespace std;  7 using namespace tbb;  8  
 9 int main() 10 { 11    vector<int> vec; 12    for(int i=0; i<100; i++) 13  vec.push_back(i); 14  
15 int result = parallel_reduce(blocked_range<vector<int>::iterator>(vec.begin(), vec.end()), 16       0,[](const blocked_range<vector<int>::iterator>& r, int init)->int{ 17  
18         for(auto a = r.begin(); a!=r.end(); a++) 19            init+=*a; 20         return init; 21  }, 22  
23       [](int x, int y)->int{ 24         return x+y; 25  } 26  ); 27       cout<<"result:"<<result<<endl; 28    return 0; 29  
30 }
       ③parallel_scan 

        並行計算前束(prefix)的函數模板。即輸入一個數組,生成一個數組,其中每個元素的值都是原數組中在此元素之前的元素的某個運算符的結果的累積。比如求和:
      輸入:[2, 8,  9, -4,   1, 3, -2,  7]
     生成:[0, 2, 10, 19,  15, 16, 19, 17]

  例子:

 1 #include <tbb/parallel_scan.h>
 2 #include <tbb/blocked_range.h>
 3 #include <iostream>
 4 using namespace tbb;  5 using namespace std;  6  
 7 template<typename T>
 8 class Body  9 { 10  T _sum; 11    T* const _y; 12    const T* const _x; 13 public: 14    Body(T y[], const T x[]):_sum(0), _x(x), _y(y){} 15    T get_sum() const 
16  { 17       return _sum; 18  } 19  
20    template<typename Tag>
21    void operator()(const blocked_range<int>& r, Tag) 22  { 23       T temp = _sum; 24       for(int i = r.begin(); i< r.end(); i++) 25  { 26         temp+=_x[i]; 27         if(Tag::is_final_scan()) 28            _y[i] = temp; 29  } 30  
31       _sum = temp; 32  } 33  
34    Body(Body&b, split):_x(b._x), _y(b._y), _sum(0){} 35    void reverse_join(Body& a) 36  { 37      _sum+=a._sum; 38  } 39    void assign(Body& b) 40  { 41       _sum = b._sum; 42  } 43  
44 }; 45  
46 int main() 47 { 48    int x[10] = {0,1,2,3,4,5,6,7,8,9}; 49    int y[10]; 50    Body<int> body(y,x); 51    parallel_scan(blocked_range<int>(0, 10), body); 52    cout<<"sum:"<<body.get_sum()<<endl; 53    return 0; 54 }
  ④parallel_do 
  並行處理工作項的模板函數。

       如果所有來自輸入流的元素不能隨機訪問,那么parallel_do中的並行就不具備可擴展性。為達到可擴展性,可按如下方式之一處理:
      為了提高速度,B::operator()的粒度至少要約10萬個時鍾周期。否則,parallel_do的內在開銷就會影響有效工作。算法可以傳遞一個task_group_context對象,這樣它的任務可以在此組內執行。默認情況下,算法在它自己的有界組中執行。
      例子:
 1 #include <tbb/parallel_do.h>
 2 #include <iostream>
 3 #include <vector>
 4 using namespace std;  5 using namespace tbb;  6  
 7 struct t_test  8 {  9        string msg; 10        int ref; 11        void operator()()const
12  { 13            cout<<msg<<endl; 14  } 15 }; 16  
17 template <typename T>
18 struct body_test 19 { 20        void operator()(T* t, parallel_do_feeder<T*>& feeder) const
21  { 22               (*t)(); 23               if(t->ref == 0) 24  { 25                    t->msg = "added msg"; 26  feeder.add(t); 27                    t->ref++; 28  } 29  } 30 }; 31  
32 int main() 33 { 34        t_test *pt = new t_test; 35        pt->ref = 0; 36        pt->msg = "original msg"; 37  
38        vector<t_test*> vec; 39  vec.push_back(pt); 40        parallel_do(vec.begin(), vec.end(), body_test<t_test>()); 41        delete pt; 42        return 0; 43 }
  2)流的並行算法 
      ① pipeline
       其定義:
 1 class pipeline  2 {  3 public:  4  pipeline();  5     ~pipeline();  6     void add_filter( filter& f );  7     void run( size_t max_number_of_live_tokens  8         [,task_group_context& group] );  9     void clear(); 10 };
可按以下步驟使用pipeline類:
        1、從filter繼承類f,f的構造函數傳遞給基類filter的構造函數一個參數,來指定它的模式
        2、重載虛方法filter::operator()來實現過濾器對元素處理,並返回一個將被下一個過濾器處理的元素指針。如果流里沒有其他的要處理的元素,返回空值。最后一個過濾器的返回值將被忽略。
       3、生成pipeline類的實例
       4、生成過濾器f的實例,並將它們按先后順序加給pipeline。一個過濾器的實例一次只能加給一個pipeline。同一時間,一個過濾器禁止成為多個pipeline的成員。
       5、調用pipeline::run方法。參數max_number_of_live_tokens指定了能並發運行的階段數量上限。較高的值會以更多的內存消耗為代價來增加並發性。 
   ② 過濾器基類 filter 
 1 class filter  2 {  3 public:  4   enum mode  5   {  6     parallel = implementation-defined,  7     serial_in_order = implementation-defined,  8     serial_out_of_order =implementation-defined  9   }; 10   bool is_serial() const; 11   bool is_ordered() const; 12   virtual void* operator()( void* item ) = 0; 13   virtual void finalize( void* item ) {} 14   virtual ~filter(); 15 protected: 16   filter( mode ); 17 };
 
        由於parallel過濾器支持並行加速,所以推薦使用。如果必須使用serial過濾器,那么serial_out_of_order類型的過濾器是優先考慮的,因為他在處理順序上的約束較少。
  ③ 線程綁定過濾器thread_bound_filter
 1 classthread_bound_filter: public filter  2 {  3 protected:  4   thread_bound_filter(mode filter_mode);  5 public:  6   enum result_type  7   {  8     success,  9     item_not_available, 10     end_of_stream 11   }; 12   result_type try_process_item(); 13   result_type process_item(); 14 };
管道中過濾器的抽象基類,線程必須顯式為其提供服務。當一個過濾器必須由某個指定線程執行的時候會派上用場。服務於thread_bound_filter的線程不能是調用pipeline::run()的線程。例如:
 1 #include<iostream>
 2  
 3 #include <tbb/pipeline.h>
 4  
 5 #include<tbb/compat/thread>
 6  
 7 #include<tbb/task_scheduler_init.h>
 8  
 9 using namespacestd; 10 using namespacetbb; 11 char input[] ="abcdefg\n"; 12  
13 classinputfilter:public filter 14 { 15        char *_ptr; 16 public: 17        void *operator()(void *) 18  { 19               if(*_ptr) 20  { 21                      cout<<"input:"<<*_ptr<<endl; 22                      return _ptr++; 23  } 24               else   return 0; 25  
26  } 27  inputfilter():filter(serial_in_order),_ptr(input){} 28 }; 29  
30 classoutputfilter: public thread_bound_filter 31 { 32 public: 33        void *operator()(void *item) 34  { 35               cout<<*(char*)item; 36               return 0; 37  } 38  outputfilter():thread_bound_filter(serial_in_order){} 39 }; 40  
41 voidrun_pipeline(pipeline *p) 42 { 43     p->run(8); 44 } 45  
46 int main() 47 { 48  inputfilter inf; 49  outputfilter ouf; 50  pipeline p; 51  p.add_filter(inf); 52  p.add_filter(ouf); 53         //由於主線程服務於繼承自thread_bound_filter的outputfilter,所以pipeline要運行在另一個單獨的線程
54        thread t(run_pipeline, &p); 55        while(ouf.process_item()!=thread_bound_filter::end_of_stream) 56               continue; 57  t.join(); 58        return 0; 59 }
      ⑤ parallel_pipeline
         函數parallel_pipeline提供了一種強類型的面向lambda的方式來建立並運行管道。parallel_while – 用於非結構化的流或堆 
      pipeline - 對流水線的每一階段並行,有效使用緩存 
        3)並行排序 
        parallel_sort – 並行快速排序,調用了parallel_for 

2)任務調度者 
管理線程池,及隱藏本地線程復雜度 
並行算法的實現由任務調度者的接口完成 
任務調度者的設計考慮到本地線程的並行所引起的性能問題 

3)並行容器 
concurrent_hash_map 
concurrent_vector 
concurrent_queue 

4)同步原語 
atomic 
mutex 
spin_mutex – 適合於較小的敏感區域 
queuing_mutex – 線程按次序等待(獲得)一個鎖 
spin_rw_mutex 
queuing_rw_mutex 
說明:使用read-writer mutex允許對多線程開放”讀”操作 


5)高性能的內存申請 
使用TBB的allocator 代替 C語言的 malloc/realloc/free 調用 
使用TBB的allocator 代替 C++語言的 new/delete 操作 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM