C++並發之運行時決定線程的數量


std::thread::hardware_concurrency()在新版C++標准庫中是一個很有用的函數。這個函數會返回能並發在一個程序中的線程數量。例如,多核系統中,返回值可以是CPU核芯的數量。返回值也僅僅是一個提示,當系統信息無法獲取時,函數也會返回0。但是,這也無法掩蓋這個函數對啟動線程數量的幫助。

下面的代碼實現了一個並行版的std::accumulate。代碼中將整體工作拆分成小任務交給每個線程去做,其中設置最小任務數,是為了避免產生太多的線程。程序可能會在操作數量為0的時候拋出異常。比如,std::thread構造函數無法啟動一個執行線程,就會拋出一個異常。

template<typename Iterator,typename T>
struct accumulate_block
{
  void operator()(Iterator first,Iterator last,T& result)
  {
    result=std::accumulate(first,last,result);
  }
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);
  if(!length) // 1
    return init;
  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
      (length+min_per_thread-1)/min_per_thread; // 2
  unsigned long const hardware_threads=
      std::thread::hardware_concurrency();
  unsigned long const num_threads=  // 3
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  unsigned long const block_size=length/num_threads; // 4
  std::vector<T> results(num_threads);
  std::vector<std::thread> threads(num_threads-1);  // 5
  Iterator block_start=first;
  for(unsigned long i=0; i < (num_threads-1); ++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);  // 6
    threads[i]=std::thread(     // 7
        accumulate_block<Iterator,T>(),
        block_start,block_end,std::ref(results[i]));
    block_start=block_end;  // #8
  }
  accumulate_block<Iterator,T>()(
      block_start,last,results[num_threads-1]); // 9
  std::for_each(threads.begin(),threads.end(),
       std::mem_fn(&std::thread::join));  // 10
  return std::accumulate(results.begin(),results.end(),init); // 11
}

  函數看起來很長,但不復雜。如果輸入的范圍為空①,就會得到init的值。反之,如果范圍內多於一個元素時,都需要用范圍內元素的總數量除以線程(塊)中最小任務數,從而確定啟動線程的最大數量②,這樣能避免無謂的計算資源的浪費。比如,一台32芯的機器上,只有5個數需要計算,卻啟動了32個線程。計算量的最大值和硬件支持線程數中,較小的值為啟動線程的數量③。因為上下文頻繁的切換會降低線程的性能,所以你肯定不想啟動的線程數多於硬件支持的線程數量。當std::thread::hardware_concurrency()返回0,你可以選擇一個合適的數作為你的選擇;在本例中,我選擇了”2”。你也不想在一台單核機器上啟動太多的線程,因為這樣反而會降低性能,有可能最終讓你放棄使用並發。

每個線程中處理的元素數量,是范圍中元素的總量除以線程的個數得出的④。對於分配是否得當,我們會在后面討論。

現在,確定了線程個數,通過創建一個std::vector<T>容器存放中間結果,並為線程創建一個std::vector<std::thread>容器 #5。這里需要注意的是,啟動的線程數必須比num_threads少1個,因為在啟動之前已經有了一個線程(主線程)。

使用簡單的循環來啟動線程:block_end迭代器指向當前塊的末尾⑥,並啟動一個新線程為當前塊累加結果⑦。當迭代器指向當前塊的末尾時,啟動下一個塊⑧。

啟動所有線程后,⑨中的線程會處理最終塊的結果。對於分配不均,因為知道最終塊是哪一個,那么這個塊中有多少個元素就無所謂了。

當累加最終塊的結果后,可以等待std::for_each ⑩創建線程的完成(如同在清單2.8中做的那樣),之后使用std::accumulate將所有結果進行累加⑪。

T類型的加法運算不滿足結合律(比如,對於float型或double型,在進行加法操作時,系統很可能會做截斷操作),因為對范圍中元素的分組,會導致parallel_accumulate得到的結果可能與std::accumulate得到的結果不同。同樣的,這里對迭代器的要求更加嚴格:必須都是向前迭代器,而std::accumulate可以在只傳入迭代器的情況下工作。對於創建出results容器,需要保證T有默認構造函數。對於算法並行,通常都要這樣的修改;不過,需要根據算法本身的特性,選擇不同的並行方式。

當線程運行時,所有必要的信息都需要傳入到線程中去,包括存儲計算結果的位置。不過,並非總需如此:有時候這是識別線程的可行方案,可以傳遞一個標識數,不過,當需要標識的函數在調用棧的深層,同時其他線程也可調用該函數,那么標識數就會變的捉襟見肘。好消息是在設計C++的線程庫時,就有預見了這種情況,在之后的實現中就給每個線程附加了唯一標識符。

標識線程

線程標識類型為std::thread::id,並可以通過兩種方式進行檢索。第一種,可以通過調用std::thread對象的成員函數get_id()來直接獲取。如果std::thread對象沒有與任何執行線程相關聯,get_id()將返回std::thread::type默認構造值,這個值表示“無線程”。第二種,當前線程中調用std::this_thread::get_id()(這個函數定義在<thread>頭文件中)也可以獲得線程標識。std::thread::id對象可以自由的拷貝和對比,因為標識符就可以復用。如果兩個對象的std::thread::id相等,那它們就是同一個線程,或者都“無線程”。如果不等,那么就代表了兩個不同線程,或者一個有線程,另一沒有線程。

C++線程庫不會限制你去檢查線程標識是否一樣,std::thread::id類型對象提供相當豐富的對比操作;比如,提供為不同的值進行排序。這意味着允許程序員將其當做為容器的鍵值,做排序,或做其他方式的比較。按默認順序比較不同值的std::thread::id,所以這個行為可預見的:當a<bb<c時,得a<c,等等。標准庫也提供std::hash<std::thread::id>容器,所以std::thread::id也可以作為無序容器的鍵值。

std::thread::id實例常用作檢測線程是否需要進行一些操作,比如:當用線程來分割一項工作,主線程可能要做一些與其他線程不同的工作。這種情況下,啟動其他線程前,它可以將自己的線程ID通過std::this_thread::get_id()得到,並進行存儲。就是算法核心部分(所有線程都一樣的),每個線程都要檢查一下,其擁有的線程ID是否與初始線程的ID相同。

std::thread::id master_thread;
void some_core_part_of_algorithm()
{
  if(std::this_thread::get_id()==master_thread)
  {
    do_master_thread_work();
  }
  do_common_work();
}

  

另外,當前線程的std::thread::id將存儲到一個數據結構中。之后在這個結構體中對當前線程的ID與存儲的線程ID做對比,來決定操作是被“允許”,還是“需要”(permitted/required)。

同樣,作為線程和本地存儲不適配的替代方案,線程ID在容器中可作為鍵值。例如,容器可以存儲其掌控下每個線程的信息,或在多個線程中互傳信息。

std::thread::id可以作為一個線程的通用標識符,當標識符只與語義相關(比如,數組的索引)時,就需要這個方案了。也可以使用輸出流(std::cout)來記錄一個std::thread::id對象的值。

std::cout<<std::this_thread::get_id();

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM