std::thread::hardware_concurrency()
在新版C++標准庫中是一個很有用的函數。這個函數會返回能並發在一個程序中的線程數量。例如,多核系統中,返回值可以是CPU核芯的數量。返回值也僅僅是一個提示,當系統信息無法獲取時,函數也會返回0。但是,這也無法掩蓋這個函數對啟動線程數量的幫助。
下面的代碼實現了一個並行版的std::accumulate
。代碼中將整體工作拆分成小任務交給每個線程去做,其中設置最小任務數,是為了避免產生太多的線程。程序可能會在操作數量為0的時候拋出異常。比如,std::thread
構造函數無法啟動一個執行線程,就會拋出一個異常。
template<typename Iterator,typename T> struct accumulate_block { void operator()(Iterator first,Iterator last,T& result) { result=std::accumulate(first,last,result); } }; template<typename Iterator,typename T> T parallel_accumulate(Iterator first,Iterator last,T init) { unsigned long const length=std::distance(first,last); if(!length) // 1 return init; unsigned long const min_per_thread=25; unsigned long const max_threads= (length+min_per_thread-1)/min_per_thread; // 2 unsigned long const hardware_threads= std::thread::hardware_concurrency(); unsigned long const num_threads= // 3 std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); unsigned long const block_size=length/num_threads; // 4 std::vector<T> results(num_threads); std::vector<std::thread> threads(num_threads-1); // 5 Iterator block_start=first; for(unsigned long i=0; i < (num_threads-1); ++i) { Iterator block_end=block_start; std::advance(block_end,block_size); // 6 threads[i]=std::thread( // 7 accumulate_block<Iterator,T>(), block_start,block_end,std::ref(results[i])); block_start=block_end; // #8 } accumulate_block<Iterator,T>()( block_start,last,results[num_threads-1]); // 9 std::for_each(threads.begin(),threads.end(), std::mem_fn(&std::thread::join)); // 10 return std::accumulate(results.begin(),results.end(),init); // 11 }
函數看起來很長,但不復雜。如果輸入的范圍為空①,就會得到init的值。反之,如果范圍內多於一個元素時,都需要用范圍內元素的總數量除以線程(塊)中最小任務數,從而確定啟動線程的最大數量②,這樣能避免無謂的計算資源的浪費。比如,一台32芯的機器上,只有5個數需要計算,卻啟動了32個線程。計算量的最大值和硬件支持線程數中,較小的值為啟動線程的數量③。因為上下文頻繁的切換會降低線程的性能,所以你肯定不想啟動的線程數多於硬件支持的線程數量。當std::thread::hardware_concurrency()
返回0,你可以選擇一個合適的數作為你的選擇;在本例中,我選擇了”2”。你也不想在一台單核機器上啟動太多的線程,因為這樣反而會降低性能,有可能最終讓你放棄使用並發。
每個線程中處理的元素數量,是范圍中元素的總量除以線程的個數得出的④。對於分配是否得當,我們會在后面討論。
現在,確定了線程個數,通過創建一個std::vector<T>
容器存放中間結果,並為線程創建一個std::vector<std::thread>
容器 #5。這里需要注意的是,啟動的線程數必須比num_threads少1個,因為在啟動之前已經有了一個線程(主線程)。
使用簡單的循環來啟動線程:block_end迭代器指向當前塊的末尾⑥,並啟動一個新線程為當前塊累加結果⑦。當迭代器指向當前塊的末尾時,啟動下一個塊⑧。
啟動所有線程后,⑨中的線程會處理最終塊的結果。對於分配不均,因為知道最終塊是哪一個,那么這個塊中有多少個元素就無所謂了。
當累加最終塊的結果后,可以等待std::for_each
⑩創建線程的完成(如同在清單2.8中做的那樣),之后使用std::accumulate
將所有結果進行累加⑪。
T類型的加法運算不滿足結合律(比如,對於float型或double型,在進行加法操作時,系統很可能會做截斷操作),因為對范圍中元素的分組,會導致parallel_accumulate得到的結果可能與std::accumulate
得到的結果不同。同樣的,這里對迭代器的要求更加嚴格:必須都是向前迭代器,而std::accumulate
可以在只傳入迭代器的情況下工作。對於創建出results容器,需要保證T有默認構造函數。對於算法並行,通常都要這樣的修改;不過,需要根據算法本身的特性,選擇不同的並行方式。
當線程運行時,所有必要的信息都需要傳入到線程中去,包括存儲計算結果的位置。不過,並非總需如此:有時候這是識別線程的可行方案,可以傳遞一個標識數,不過,當需要標識的函數在調用棧的深層,同時其他線程也可調用該函數,那么標識數就會變的捉襟見肘。好消息是在設計C++的線程庫時,就有預見了這種情況,在之后的實現中就給每個線程附加了唯一標識符。
標識線程
線程標識類型為std::thread::id
,並可以通過兩種方式進行檢索。第一種,可以通過調用std::thread
對象的成員函數get_id()
來直接獲取。如果std::thread
對象沒有與任何執行線程相關聯,get_id()
將返回std::thread::type
默認構造值,這個值表示“無線程”。第二種,當前線程中調用std::this_thread::get_id()
(這個函數定義在<thread>
頭文件中)也可以獲得線程標識。std::thread::id
對象可以自由的拷貝和對比,因為標識符就可以復用。如果兩個對象的std::thread::id
相等,那它們就是同一個線程,或者都“無線程”。如果不等,那么就代表了兩個不同線程,或者一個有線程,另一沒有線程。
C++線程庫不會限制你去檢查線程標識是否一樣,std::thread::id
類型對象提供相當豐富的對比操作;比如,提供為不同的值進行排序。這意味着允許程序員將其當做為容器的鍵值,做排序,或做其他方式的比較。按默認順序比較不同值的std::thread::id
,所以這個行為可預見的:當a<b
,b<c
時,得a<c
,等等。標准庫也提供std::hash<std::thread::id>
容器,所以std::thread::id
也可以作為無序容器的鍵值。
std::thread::id
實例常用作檢測線程是否需要進行一些操作,比如:當用線程來分割一項工作,主線程可能要做一些與其他線程不同的工作。這種情況下,啟動其他線程前,它可以將自己的線程ID通過std::this_thread::get_id()
得到,並進行存儲。就是算法核心部分(所有線程都一樣的),每個線程都要檢查一下,其擁有的線程ID是否與初始線程的ID相同。
std::thread::id master_thread; void some_core_part_of_algorithm() { if(std::this_thread::get_id()==master_thread) { do_master_thread_work(); } do_common_work(); }
另外,當前線程的std::thread::id
將存儲到一個數據結構中。之后在這個結構體中對當前線程的ID與存儲的線程ID做對比,來決定操作是被“允許”,還是“需要”(permitted/required)。
同樣,作為線程和本地存儲不適配的替代方案,線程ID在容器中可作為鍵值。例如,容器可以存儲其掌控下每個線程的信息,或在多個線程中互傳信息。
std::thread::id
可以作為一個線程的通用標識符,當標識符只與語義相關(比如,數組的索引)時,就需要這個方案了。也可以使用輸出流(std::cout
)來記錄一個std::thread::id
對象的值。
std::cout<<std::this_thread::get_id();