應用OpenMP的一個簡單的設計模式


  小喵的嘮叨話:最近很久沒寫博客了,一是因為之前寫的LSoftmax后饋一直沒有成功,所以在等作者的源碼。二是最近沒什么想寫的東西。前兩天,在預處理圖片的時候,發現處理200w張圖片,跑了一晚上也才處理完一半。早上的時候,出於無奈,花半小時改寫了一個簡單調用OpenMP的處理程序,用了30個核心,然后一小時不到就處理完了。感慨在多核的時代,即使是簡單的程序,如果能支持多核,應該都能節省不少時間。

本文系原創,轉載請注明出處~

小喵的博客:http://www.miaoerduo.com

博客原文:http://www.miaoerduo.com/openmp/應用openmp的一個簡單的設計模式.html

 

一、寫在前面

對於OpenMP,小喵其實並不是了解很多,而且小喵本身也只用到了OpenMP的最簡單的功能。在這里主要是分享一個自己常用的寫簡單的並行程序的思路。希望能幫助到大家。

這個設計模式的主要特點如下:

1,處理的任務是獨立的;

2,可以在運行中輸出結果,而不是最終才輸出;

3,有限的資源占用;

4,在每次任務的執行時間不同的情況下,也能很好的工作;

5,在每次任務執行需要占用私有的數據時(依賴了線程不安全的庫),也可以很好的工作;

6,輸出是有序的

 

注意:本文中僅介紹小喵自己用到的幾個OpenMP的功能。既不深入也不完善。僅適合初學者。

小喵學習OpenMP主要是看了周明偉的博客:

OpenMP編程指南:http://blog.csdn.net/drzhouweiming/article/details/4093624

想要比較深入地學習的童鞋請看周老師的博客。

對於什么是OpenMP,OpenMP有什么優點等的問題。周老師的博客也很詳細的說明。這里小喵就不多廢話了。直奔主題。

二、如何使用OpenMP

小喵使用的開發環境是Linux,windows的童鞋可以看一下這個博客:http://www.cnblogs.com/yangyangcv/archive/2012/03/23/2413335.html。MAC上的GCC實際上是Clang,想要使用OpenMP的話比較麻煩。要額外裝一些東東,自己bing一下就有。

本喵的編譯環境是CentOS 7, GCC 4.8.5。大多數系統和編譯器都支持OpenMP了。

先舉個小栗子:

不使用OpenMP:

 1 #include <iostream>
 2 #define N 100000000
 3 
 4 int fun() {
 5     int a = 0;
 6     for (int i = 0; i < N; ++ i) {
 7         a += i;
 8     }
 9     return a;
10 }
11 
12 int main() {
13 
14     for (int i = 0; i < 100; ++ i) {
15         fun();
16     }
17     std::cout << "finish" << std::endl;
18     return 0;
19 }

之后使用g++編譯,並計時:

g++ sample_without_omp.cpp -o sample_without_omp.bin
time ./sample_with_omp.bin

運行結果:

./sample_without_omp.bin  24.42s user 0.00s system 100% cpu 24.417 total

這里可以看到用了100%的cpu,總時間是24.417 s。

使用OpenMP,調用2個線程:

 1 #include <iostream>
 2 #include <omp.h>
 3 
 4 #define N 100000000
 5 
 6 int fun() {
 7     int a = 0;
 8     for (int i = 0; i < N; ++ i) {
 9         a += i;
10     }
11     return a;
12 }
13 
14 int main() {
15 
16 #pragma omp parallel for num_threads(2) schedule(dynamic)
17     for (int i = 0; i < 100; ++ i) {
18         fun();
19     }
20     std::cout << "finish" << std::endl;
21     return 0;
22 }

這里源碼的差別是多了一個omp.h的頭文件,和一個奇怪的語句:

#pragma omp parallel for num_threads(2) schedule(dynamic)

編譯的時候,也有點小修改:

g++ sample_with_omp.cpp -o sample_with_omp.bin -fopenmp
time ./sample_with_omp.bin

運行結果如下:

./sample_with_omp.bin  24.32s user 0.01s system 199% cpu 12.182 total

可以看出,user的時間幾乎沒變,這表示CPU總的運行時間沒有變化。但是cpu的使用變成了199%,total的時間變成了12.182 s。這就表明了我們使用了2個cpu,使得運行時間成功減半了!

是不是很愉快,我們只添加了2行代碼,就使得程序的速度翻倍。可見OpenMP是多么的簡潔實用。

那么,現在是不是不用小喵說,我們也知道怎么給程序加入OpenMP的支持了呢?

歸納一下,主要有三點:

1,加入OpenMP的頭文件 omp.h

2,使用合適的編譯器指令修飾我們需要並行的部分(線程數、任務分配模式等等,后面會講到)

3,編譯的時候加入openmp的支持,編譯的時候加入參數 -fopenmp

三、fork/join的並行執行模式

我們之前看到了一個簡單的例子,可以看出,程序其實是有串行部分和並行部分兩個部分組成的。

在程序剛啟動的時候,只有一個主線程,當執行到並行部分的時候(上面的例子中就是pragma之后的for循環),並行的代碼會通過派生其他線程來執行。只有當並行的所有代碼執行完之后,才會繼續執行串行的部分。

因此主要的運行流程是這個樣子的:

理解這個流程是相當重要的,可以避免很多的不必要的錯誤。一個常見的錯誤就是資源訪問的沖突。比如文件,流對象等,如果在並行的代碼部分隨意訪問這些資源,就可能會導致不可預見的錯誤。這在多線程編程中也是最常出現的錯誤,我們在下面會具體說到。

四、OpenMP的常用指令和庫函數

在C/C++中,OpenMP的指令使用的格式如下:

#pragma omp 指令 [子句[子句]...]

指令用來指示下面的代碼的運行模式。子句是給出一些額外的信息。

這里主要介紹兩個指令:parallelfor

parallel:用在代碼段之前,表示下面的代碼段使用多線程運行。

for:用於for循環之前,將循環分配到多個線程中並行執行,必須保證每次循環之間無相關性。

parallel for:parallel 和for語句的結合,也是用在一個for循環之前,表示for循環的代碼將被多個線程並行執行。

小喵使用的時候都是直接使用了parallel for這個組合指令。用來對緊接着的for循環的代碼段進行並行。其他的指令請查閱之前提到的博客。

子句中主要是給出一些額外的設置,這里也主要介紹2個:num_threads,schedule。

num_threads:指定線程的數目(不設置該參數似乎會使用和cpu核心數相同的線程數)。

schedule:指定如何調度for循環迭代。有4種模式:static、dynamic、guided,runtime,后面會專門講到。

這里,我們再回顧一下之前寫的代碼:

#pragma omp parallel for num_threads(2) schedule(dynamic)

是不是豁然開朗。這句話的意思是,使用OpenMP(#pragma omp),將下面的for循環使用多線程去執行(parallel for),線程數為2(num_threads(2)),任務調度方式使用dynamic模式(schedule(dynamic))。

現在,讓我們趁熱打鐵,學習for循環的寫法。

這里,小喵直接復制了周老師的說法(解釋得實在太好了):

for 循環語句中,書寫是需要按照一定規范來寫才可以的,即for循環小括號內的語句要按照一定的規范進行書寫,for語句小括號里共有三條語句
for( i = start; i < end; i++)

i = start; 是for循環里的第一條語句,必須寫成 “變量=初值” 的方式。如 i=0
i < end; 是for循環里的第二條語句,這個語句里可以寫成以下4種形式之一:
變量 < 邊界值
變量 <= 邊界值
變量 > 邊界值
變量 >= 邊界值
如 i>10 i<10 i>=10 i<=10 等等
最后一條語句i++可以有以下9種寫法之一
i++
++i
i--
--i
i += inc
i -= inc
i = i + inc
i = inc + i
i = i – inc
例如i += 2; i -= 2;i = i + 2;i = i - 2;都是符合規范的寫法。

可見一般來說,我們的for循環的寫法OpenMP是支持的。那么有沒有OpenMP不支持的for循環呢?小喵沒試過,不過可以猜想,for (auto &v: arr) 這種寫法是不支持的。使用迭代器的話,不知道能不能使用,小喵沒有驗證過。喵粉如果好奇的話,可以自行驗證一下。

在介紹schedule之前,我們先學習幾個常用的庫函數,用來獲取和設置OpenMP的各種運行時狀態:

omp_get_num_procs, 返回運行本線程的多處理機的處理器個數。通常可以根據處理器的個數來合理設置並行的線程數。
omp_get_num_threads, 返回當前並行區域中的活動線程個數。比如上面的例子,應該就會返回2。
omp_get_thread_num, 返回線程號。並行區域的代碼會被多個線程執行,而每個線程都有一個自己的ID,也就是線程號。如果我們設置使用N個線程,那么線程號會是0,1,2,...,N-1。
omp_set_num_threads, 設置並行執行代碼時的線程個數。和num_threads功能相同。

五、OpenMP中的任務調度

那么接下來,我們開始學習任務調度的四種模式。使用的子句就是schedule。

schedule的使用格式:

schedule(type[, size])

type主要有4種:static,dynamic,guilded,runtime。

1、static(靜態調度)

表示靜態調度,當不設置schedule的時候,多數編譯器就是使用這種調度方式。它十分的簡單。給定N個任務,啟用t個線程,那么直接給每個線程分配N/t個任務,考慮到N可能不能整除t,所以每個線程的任務數會有極小的不同。

下面舉個例子:

 1 #include <omp.h>
 2 #include <iostream>
 3 
 4 int main() {
 5 
 6     const int task_num = 10;
 7 
 8 #pragma omp parallel for num_threads(2) schedule(static)
 9     for (int i = 0; i < task_num; ++ i) {
10         std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl;
11     }
12 
13     return 0;
14 }

輸出結果如下:

i = 0 thread_id = 0
i = 5 thread_id = 1
i = 6 thread_id = 1
i = 7 thread_id = 1
i = 8 thread_id = 1
i = 9 thread_id = 1
i = 1 thread_id = 0
i = 2 thread_id = 0
i = 3 thread_id = 0
i = 4 thread_id = 0

可以看出,0-4被分配給了0線程,5-9被分配給了1線程。由於是多線程,所以打印出來的順序並不能保證。

如果使用了size,則每次回分配給一個線程size次任務,依次迭代。

 1 #include <omp.h>
 2 #include <iostream>
 3 
 4 int main() {
 5 
 6     const int task_num = 10;
 7 
 8 #pragma omp parallel for num_threads(2) schedule(static, 2)
 9     for (int i = 0; i < task_num; ++ i) {
10         std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl;
11     }
12 
13     return 0;
14 }

運行結果和上面稍有不同:

i = 2 thread_id = 1
i = 0 thread_id = 0
i = 3 thread_id = 1
i = 6 thread_id = 1
i = 7 thread_id = 1
i = 1 thread_id = 0
i = 4 thread_id = 0
i = 5 thread_id = 0
i = 8 thread_id = 0
i = 9 thread_id = 0

可以看出,連續的2個任務會被分配到同一個線程。0、1給線程0,2、3給線程1,4、5給線程0,6、7給線程1。。。

static是一個十分簡單的策略,但同時會帶來一些問題。比如當任務的執行時間差異很大的時候,由於OpenMP的fork/join的機制,速度快的線程必須等待速度慢的線程,如果恰好分配的很不合理的話(耗時的任務集中在了某一個線程),其他的線程可能會等待較長的時間。這顯然不利於我們充分利用多核資源。

2、dynamic(動態調度)

動態調度會根據運行時的線程狀態來決定下一次的迭代。當一個線程執行完自己的任務之后,會再去領取任務。不設置size的話,一個線程一次會分配一個任務,當執行完了,會再領取一個任務。如果設置了size,線程則一次領取size個任務。

dynamic是小喵最愛的模式!是因為它和標准的生產者消費者模式很相似。這里生產者默認一次性生產所有的任務,然后每個線程都是一個消費者,當自己執行完了,會再次去領取任務。這樣,任務的分配會更加的有彈性,更好的適應了任務時間不同的情況。

下面也是一個小栗子,不使用size:

 1 #include <omp.h>
 2 #include <iostream>
 3 
 4 int main() {
 5 
 6     const int task_num = 10;
 7 
 8 #pragma omp parallel for num_threads(2) schedule(dynamic)
 9     for (int i = 0; i < task_num; ++ i) {
10         std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl;
11     }
12 
13     return 0;
14 }

運行結果:

i = 0 thread_id = 0
i = 1 thread_id = 1
i = 2 thread_id = 0
i = 3 thread_id = 0
i = 4 thread_id = 1
i = 5 thread_id = 1
i = 6 thread_id = 1
i = 7 thread_id = 1
i = 8 thread_id = 1
i = 9 thread_id = 1

可以看出任務的分配是不均勻的。

使用size之后:

 1 #include <omp.h>
 2 #include <iostream>
 3 
 4 int main() {
 5 
 6     const int task_num = 10;
 7 
 8 #pragma omp parallel for num_threads(2) schedule(dynamic, 2)
 9     for (int i = 0; i < task_num; ++ i) {
10         std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl;
11     }
12 
13     return 0;
14 
15 }

運行結果如下:

i = 0 thread_id = 0
i = 2 thread_id = 1
i = 3 thread_id = 1
i = 4 thread_id = 1
i = 5 thread_id = 1
i = 6 thread_id = 1
i = 7 thread_id = 1
i = 8 thread_id = 1
i = 9 thread_id = 1
i = 1 thread_id = 0

線程0先領取了任務0、1。線程1領取了2、3。線程1做完之后,又領取了4、5。。。

可以看出,每次的任務分配是以2個為單位的,分配的順序視運行時狀態動態調整。

3、guided(啟發式調度)

采用啟發式調度方法進行調度,每次分配給線程迭代次數不同,開始比較大,以后逐漸減小。

size表示每次分配的迭代次數的最小值,由於每次分配的迭代次數會逐漸減少,少到size時,將不再減少。如果不知道size的大小,那么默認size為1,即一直減少到1。具體采用哪一種啟發式算法,需要參考具體的編譯器和相關手冊的信息。

4、runtime

runtime調用,並不是一個真的調度方式。它是根據環境變量的OMP_SCHEDULE來確定調度模式。最終仍然是上述三種方式之一。具體用法可以查看相關文檔。

六、一個常用的設計模式

在做了前5個部分的鋪墊之后,相信喵粉們已經初步掌握了OpenMP的幾個基本的知識。那么,現在就開始講我們最重要的部分——小喵最常用的一個設計模式。

 

主要流程如下:

<1>初始化:
1,定義線程數為thread_num
2,定義平均每個線程上的任務數為task_per_thread
3,初始化處理器對象(handle_arr),大小為thread_num
4,初始化任務空間(task_arr),大小為thread_num * task_per_thread
5,初始化結果空間(result_arr),大小為thread_num * task_per_thread

<2>讀取任務(串行):
1,讀取thread_num * task_per_thread個任務,存入task_arr。
2,記錄讀取任務的數目task_num(task_num <= thread_num * task_per_thread)

<3>任務處理(並行):
1,任務的task_id就是for循環的下標
2,通過omp_get_thread_num獲取當前的線程id,根據線程id查找處理器對象。
3,使用處理器處理定義的task_id對應的任務task_arr[task_id]
4,將執行結果存入result_arr[task_id]的位置

<4>結果處理(串行):
根據task_num,處理完result_arr中的結果。

<5>程序狀態判斷
判斷task_num是否等於thread_num * task_per_thread。
如果相等,說明任務隊列沒有執行完,繼續<2>開始執行。
如果不相等,則說明任務隊列全部處理完,程序執行結束<6>。

<6>enjoy your programming

 

讓我們來一步一步的理解這個模式。

<1>初始化:

這里主要完成一些初始化的工作。

1)thread_num和task_per_thread

可以看到,這里初始化了兩個參數。那么為什么需要thread_num和task_per_thread這兩個參數呢?

為了更好的利用和控制資源。

根據機器的不同,我們可以自己設置需要開啟的線程數,這就是thread_num。

反派汪:我覺得你說的有問題。我們在程序中明明可以利用omp_get_num_procs獲取機器的所有的處理器的數目,然后就啟用這么多的線程的話,不就能最大限度的使用所有的計算能力了嗎?

喵座:其實不然。假如服務器的處理器數目為40,按照你的思路,則會啟用40個線程。這樣一是會造成其他人不能正常的工作,二是當服務器本來就有其他的程序在run的時候,你的40個線程亦不能很好的工作。不如自己在運行之前設置一下需要的計算資源數,會更方便一點。

那么為什么我們需要設置這個task_per_thread呢?

因為資源是有限的。

考慮到最高效的工作方式,就是讓所有的線程不間斷的工作。比如一次性讀完所有的任務列表,然后使用dynamic做完所有的任務。這樣在任務做完之前,每個線程都會無間歇的工作。

理想是完美的,現實是殘酷的。如果任務非常多,比如小喵需要處理的200w條數據。很難一次性全部載入內存。而且,即使這么做了,也必須得任務全部做完,才能得到運行結果,時效性很差。

那么我們不設置thask_per_thread不行嗎?或者就把這個設置成1。每次就讀取線程數相同的任務數,這樣代碼編寫不應該更簡單嗎?

這時候,讓我們回顧一下OpenMP的調度機制。如果每次只讀取thread_num這么多的個任務數,那么每次並行計算的時候,每個線程都會分配到一個任務。那么總的耗時將變成最慢的任務的執行時間。

舉個簡單的例子,比如有12個任務,耗時為2,1,2,1,2,1,2,1,2,1,2,1。我們使用2個線程。那么每處理2個任務,耗時都是2。總時間是12。

如果我們每6個一起執行,也是使用2個線程。需要的總時間會變成了10。

執行過程看下圖:

可以使用task_per_task這個策略,每次處理thread_num * task_per_task個任務的話,可以更好了利用多核的資源。(task_per_task設得越大,講道理效果應該越好。小喵自己喜歡設成10或20)

另一個好處是,當我們處理完這些任務之后,可以立刻將結果寫入結果文件。

2)處理器對象:

這是可選的。我們在實際處理任務的時候,有時候會使用到一些特殊的資源,而且必須保證這些資源是獨占的(比如網絡通信的套接字,文件對象,或是線程不安全的一些實例的對象)。最簡單高效的方法就是為每個線程都初始化一個自己的處理器(或是資源)對象。這樣在實際處理的時候,每個線程可以根據自己的線程id找到自己的處理器,從而避免了多線程中的各種問題。

3)task_arr和result_arr

這兩個空間是用來存放每次並行處理的任務和結果的。大小自然和每次並行的任務數(thread_num * task_per_thread)相等。考慮到每次並行都可以復用這些空間,所以提前申請好足夠的空間可以提高運行效率。

<2>讀取任務:

我們通常會將任務的內容保存在文件中。而文件的讀取是不能並行的。因此我們需要提前按串行的方式將任務讀取到任務隊列task_arr中。每次讀取thread_num * task_per_thread個。考慮到任務總數可能不是thread_num * task_per_thread的整數倍,因此最后一次讀取的任務數會稍小一點。我們將每次讀取的任務數記錄下來,命名為task_num。

<3>任務處理:

這里就是我們剛剛學習到的OpenMP的用武之地。

通常的寫法是:

1 #pragma omp parallel for num_threads(thread_num) schedule(dynamic)
2 for (int task_idx = 0; task_idx < task_num; ++ task_idx) {
3     int thread_id = omp_get_thread_num();                      // 獲取當前的線程id
4     handle_type handle = handle_arr[thread_id];                // 根據線程id,獲取處理器
5     result_type result = handle->process(task_arr[task_idx]);  // 處理指定的任務
6     result_arr[task_idx] = result;                             // 在指定位置寫回執行的結果
7 }

獲取當前的線程號,然后獲取處理器,然后處理對應的任務,並將結果存放進對應的位置。

注意,線程之間是獨立的,不能讀寫同一個線程不安全的資源。而且在並行區域不保證任何的線程間的順序。

這樣,我們就能安全且高效的執行完每次的任務了。

<4>結果處理:

這部分十分簡單,因為任務的結果已經按順序存進了result_arr中,有效的result是前task_num個,之后想怎么處理都是喵粉自己的事情了。

<5>程序狀態判斷:

正如我們在<2>中說到的,我們每次處理一批任務,最后的一批任務的個數將不是thread_num * task_per_thread這么多。因此需要與task_num比較一下。如果相等,就可能是我們還沒有處理完,回到<2>繼續執行。如果不相等,那就說明我們處理完了所有的任務了!你可以坐下來喝杯caffe,然后enjoy多線程帶來的快感了。

 

最后,附上一個簡單的demo。使用多線程,從文本上讀取圖片的list,讀取圖片的大小,並將結果存入一個新的文本中。

 1 #include <opencv2/opencv.hpp>
 2 #include <fstream>
 3 #include <iostream>
 4 #include <string>
 5 
 6 typedef struct {
 7     int width;
 8     int height;
 9 } Size;
10 
11 int main(int argc, char **argv) {
12 
13     if (argc < 3) {
14         std::cerr << "usage: get_size.bin input_list output_list"
15             " [thread_num] [task_per_thread]" << std::endl;
16             return 1;
17     }
18     const int thread_num = (argc > 3) ? atoi(argv[3]):1;
19     const int task_per_thread = (argc > 4) ? atoi(argv[4]): 10;
20 
21     const int total_task = thread_num * task_per_thread;
22 
23     std::string image_name_arr[total_task];    // task arr
24     Size image_size_arr[total_task]; // result arr
25 
26     std::ifstream is(argv[1]);
27     std::ofstream os(argv[2]);
28 
29     int count = 0;
30 
31     while (1) {
32 
33         // 讀取任務
34         int task_num = 0;
35         for (int task_idx = 0; task_idx < total_task; ++ task_idx) {
36             if (!(is >> image_name_arr[task_idx])) break;
37             ++ task_num;
38             ++ count;
39         }
40 
41         // 處理任務
42 #pragma omp parallel for num_threads(thread_num) schedule(dynamic)
43         for (int task_idx = 0; task_idx < task_num; ++ task_idx) {
44 
45             cv::Mat image = cv::imread(image_name_arr[task_idx]);
46             image_size_arr[task_idx].width = image.cols;
47             image_size_arr[task_idx].height = image.rows;
48 
49         }
50 
51         std::cout << "process #" << count << std::endl;
52 
53         // 處理結果
54         for (int task_idx = 0; task_idx < task_num; ++ task_idx) {
55             os << image_name_arr[task_idx] << " " 
56                 << image_size_arr[task_idx].width << " "
57                 << image_size_arr[task_idx].height << "\n";
58         }
59 
60         // 狀態判斷
61         if (task_num != total_task) break;
62 
63     }
64     return 0;
65 }

編譯和執行:

g++ get_image_size_with_omp.cpp -o get_image_size_with_omp -fopenmp -I/path/to/opencv/include -L/path/to/opencv/lib -lopencv_core -lopencv_highgui
./get_image_size_with_omp /path/to/image_list /path/to/save/result 2 20

怎么樣,使用這種模式來實現簡單的多線程程序是不是很簡單?

 

如果您覺得本文對您有幫助,那請小喵喝杯茶吧~~O(∩_∩)O~~

轉載請注明出處~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM