一、什么是TBB
TBB(Thread Building Blocks)是英特爾發布的一個庫,全稱為 Threading Building Blocks。TBB 獲得過 17 屆 Jolt Productivity Awards,是一套 C++ 模板庫,和直接利用 OS API 寫程序的 raw thread 比,在並行編程方面提供了適當的抽象,當然還包括更多其他內容,比如 task 概念,常用算法的成熟實現,自動負載均衡特 性還有不綁定 CPU 數量的靈活的可擴展性等等。STL 之父,Alexander Stepanov 對此評價不錯,他說“Threading Building Blocks… could become a basis for the concurrency dimension of the C++ standard library”。其他 TBB 的早期用戶,包括 Autodesk,Sun,Red Hat, Turbo Linux 等亦然。現在 O’Reilly 已經出版了一本 Intel Threading Building Blocks: Outfitting C++ for Multi-core Processor Parallelism。
二、為什么要TBB
在多核的平台上開發並行化的程序,必須合理地利用系統的資源 - 如與內核數目相匹配的線程,內存的合理訪問次序,最大化重用緩存。有時候用戶使用(系統)低級的應用接口創建、管理線程,很難保證是否程序處於最佳狀態。
而 Intel Thread Building Blocks (TBB) 很好地解決了上述問題:
1)TBB提供C++模版庫,用戶不必關注線程,而專注任務本身。
2)抽象層僅需很少的接口代碼,性能上毫不遜色。
3)靈活地適合不同的多核平台。
4)線程庫的接口適合於跨平台的移植(Linux, Windows, Mac)
5)支持的C++編譯器 – Microsoft, GNU and Intel
三、TBB庫包含的內容
TBB包含了 Algorithms、Containers、Memory Allocation、Synchronization、Timing、Task Scheduling這六個模塊。TBB的結構:

1) 循環的並行:
① parallel_for


1 #include <iostream>
2 #include <vector>
3 #include <tbb/tbb.h>
4 #include <tbb/blocked_range.h>
5 #include <tbb/parallel_for.h>
6
7 using namespace std; 8 using namespace tbb; 9
10 typedef vector<int>::iterator IntVecIt; 11
12 struct body 13 { 14 void operator()(const blocked_range<IntVecIt>&r)const
15 { 16 for(auto i = r.begin(); i!=r.end(); i++) 17
18 cout<<*i<<' '; 19 } 20 }; 21
22 int main() 23 { 24 vector<int> vec; 25 for(int i=0; i<10; i++) 26 vec.push_back(i); 27
28 parallel_for(blocked_range< IntVecIt>(vec.begin(), vec.end()) 29 , body()); 30 return 0; 31 }

1 #include <iostream>
2 #include <tbb/parallel_reduce.h>
3 #include <tbb/blocked_range.h>
4 #include <vector>
5
6 using namespace std; 7 using namespace tbb; 8
9 int main() 10 { 11 vector<int> vec; 12 for(int i=0; i<100; i++) 13 vec.push_back(i); 14
15 int result = parallel_reduce(blocked_range<vector<int>::iterator>(vec.begin(), vec.end()), 16 0,[](const blocked_range<vector<int>::iterator>& r, int init)->int{ 17
18 for(auto a = r.begin(); a!=r.end(); a++) 19 init+=*a; 20 return init; 21 }, 22
23 [](int x, int y)->int{ 24 return x+y; 25 } 26 ); 27 cout<<"result:"<<result<<endl; 28 return 0; 29
30 }
並行計算前束(prefix)的函數模板。即輸入一個數組,生成一個數組,其中每個元素的值都是原數組中在此元素之前的元素的某個運算符的結果的累積。比如求和:
輸入:[2, 8, 9, -4, 1, 3, -2, 7]
生成:[0, 2, 10, 19, 15, 16, 19, 17]
例子:
1 #include <tbb/parallel_scan.h>
2 #include <tbb/blocked_range.h>
3 #include <iostream>
4 using namespace tbb; 5 using namespace std; 6
7 template<typename T>
8 class Body 9 { 10 T _sum; 11 T* const _y; 12 const T* const _x; 13 public: 14 Body(T y[], const T x[]):_sum(0), _x(x), _y(y){} 15 T get_sum() const
16 { 17 return _sum; 18 } 19
20 template<typename Tag>
21 void operator()(const blocked_range<int>& r, Tag) 22 { 23 T temp = _sum; 24 for(int i = r.begin(); i< r.end(); i++) 25 { 26 temp+=_x[i]; 27 if(Tag::is_final_scan()) 28 _y[i] = temp; 29 } 30
31 _sum = temp; 32 } 33
34 Body(Body&b, split):_x(b._x), _y(b._y), _sum(0){} 35 void reverse_join(Body& a) 36 { 37 _sum+=a._sum; 38 } 39 void assign(Body& b) 40 { 41 _sum = b._sum; 42 } 43
44 }; 45
46 int main() 47 { 48 int x[10] = {0,1,2,3,4,5,6,7,8,9}; 49 int y[10]; 50 Body<int> body(y,x); 51 parallel_scan(blocked_range<int>(0, 10), body); 52 cout<<"sum:"<<body.get_sum()<<endl; 53 return 0; 54 }
並行處理工作項的模板函數。

1 #include <tbb/parallel_do.h>
2 #include <iostream>
3 #include <vector>
4 using namespace std; 5 using namespace tbb; 6
7 struct t_test 8 { 9 string msg; 10 int ref; 11 void operator()()const
12 { 13 cout<<msg<<endl; 14 } 15 }; 16
17 template <typename T>
18 struct body_test 19 { 20 void operator()(T* t, parallel_do_feeder<T*>& feeder) const
21 { 22 (*t)(); 23 if(t->ref == 0) 24 { 25 t->msg = "added msg"; 26 feeder.add(t); 27 t->ref++; 28 } 29 } 30 }; 31
32 int main() 33 { 34 t_test *pt = new t_test; 35 pt->ref = 0; 36 pt->msg = "original msg"; 37
38 vector<t_test*> vec; 39 vec.push_back(pt); 40 parallel_do(vec.begin(), vec.end(), body_test<t_test>()); 41 delete pt; 42 return 0; 43 }
1 class pipeline 2 { 3 public: 4 pipeline(); 5 ~pipeline(); 6 void add_filter( filter& f ); 7 void run( size_t max_number_of_live_tokens 8 [,task_group_context& group] ); 9 void clear(); 10 };
1、從filter繼承類f,f的構造函數傳遞給基類filter的構造函數一個參數,來指定它的模式
2、重載虛方法filter::operator()來實現過濾器對元素處理,並返回一個將被下一個過濾器處理的元素指針。如果流里沒有其他的要處理的元素,返回空值。最后一個過濾器的返回值將被忽略。
3、生成pipeline類的實例
4、生成過濾器f的實例,並將它們按先后順序加給pipeline。一個過濾器的實例一次只能加給一個pipeline。同一時間,一個過濾器禁止成為多個pipeline的成員。
5、調用pipeline::run方法。參數max_number_of_live_tokens指定了能並發運行的階段數量上限。較高的值會以更多的內存消耗為代價來增加並發性。
1 class filter 2 { 3 public: 4 enum mode 5 { 6 parallel = implementation-defined, 7 serial_in_order = implementation-defined, 8 serial_out_of_order =implementation-defined 9 }; 10 bool is_serial() const; 11 bool is_ordered() const; 12 virtual void* operator()( void* item ) = 0; 13 virtual void finalize( void* item ) {} 14 virtual ~filter(); 15 protected: 16 filter( mode ); 17 };

由於parallel過濾器支持並行加速,所以推薦使用。如果必須使用serial過濾器,那么serial_out_of_order類型的過濾器是優先考慮的,因為他在處理順序上的約束較少。
1 classthread_bound_filter: public filter 2 { 3 protected: 4 thread_bound_filter(mode filter_mode); 5 public: 6 enum result_type 7 { 8 success, 9 item_not_available, 10 end_of_stream 11 }; 12 result_type try_process_item(); 13 result_type process_item(); 14 };
1 #include<iostream>
2
3 #include <tbb/pipeline.h>
4
5 #include<tbb/compat/thread>
6
7 #include<tbb/task_scheduler_init.h>
8
9 using namespacestd; 10 using namespacetbb; 11 char input[] ="abcdefg\n"; 12
13 classinputfilter:public filter 14 { 15 char *_ptr; 16 public: 17 void *operator()(void *) 18 { 19 if(*_ptr) 20 { 21 cout<<"input:"<<*_ptr<<endl; 22 return _ptr++; 23 } 24 else return 0; 25
26 } 27 inputfilter():filter(serial_in_order),_ptr(input){} 28 }; 29
30 classoutputfilter: public thread_bound_filter 31 { 32 public: 33 void *operator()(void *item) 34 { 35 cout<<*(char*)item; 36 return 0; 37 } 38 outputfilter():thread_bound_filter(serial_in_order){} 39 }; 40
41 voidrun_pipeline(pipeline *p) 42 { 43 p->run(8); 44 } 45
46 int main() 47 { 48 inputfilter inf; 49 outputfilter ouf; 50 pipeline p; 51 p.add_filter(inf); 52 p.add_filter(ouf); 53 //由於主線程服務於繼承自thread_bound_filter的outputfilter,所以pipeline要運行在另一個單獨的線程
54 thread t(run_pipeline, &p); 55 while(ouf.process_item()!=thread_bound_filter::end_of_stream) 56 continue; 57 t.join(); 58 return 0; 59 }
3)並行排序
parallel_sort – 並行快速排序,調用了parallel_for
2)任務調度者
管理線程池,及隱藏本地線程復雜度
並行算法的實現由任務調度者的接口完成
任務調度者的設計考慮到本地線程的並行所引起的性能問題
3)並行容器
concurrent_hash_map
concurrent_vector
concurrent_queue
4)同步原語
atomic
mutex
spin_mutex – 適合於較小的敏感區域
queuing_mutex – 線程按次序等待(獲得)一個鎖
spin_rw_mutex
queuing_rw_mutex
說明:使用read-writer mutex允許對多線程開放”讀”操作
5)高性能的內存申請
使用TBB的allocator 代替 C語言的 malloc/realloc/free 調用
使用TBB的allocator 代替 C++語言的 new/delete 操作