相關章節:第13章組通信MPI程序設計。
MPI組通信與點到點通信的一個重要區別就是:組通信需要特定組內所有成員參與,而點對點通信只涉及到發送方和接收方。
由於需要組內所有成員參與,因此也是一種比較復雜的通信方式。程序員在設計組通信語句的時候,需要同時考慮兩點:
a. 程序運行起來之后,當前正在運行的進程的行為方式
b. 將組通信作為一個整體,考慮所有進程的行為方式
(1)概述
組通信從功能上實現了三個方面:
a. 通信:完成組內數據傳輸(廣播、收集、散發、組收集、全互換各種數據交換傳輸方式)
b. 同步:能夠讓組內所有進程在特定的位置上執行進度上取得一致,組通信雖然可以有同步功能,但是並不意味這同步一定發生(MPI_Barrier同步函數)
c. 計算:通過給定數據的OP歸約操作完成(分為MPI預定義OP歸約操作、用戶自定義OP歸約操作)
下面分別闡述上述三個方面的相關代碼示例,采用的方法是把書上的代碼段擴充成完整的可執行的程序來體會。
(2)各部分代碼
下面闡述幾個代碼示例,把通信、同步、計算的例子都融入到具體的代碼中。
示例1(Bcast廣播通信)
從root進程讀入一個數,並Bcast廣播給組內所有進程並打印到終端上,代碼如下:
#include "mpi.h" #include <stdio.h> #include <stdlib.h>
int main(int argc, char *argv[]) { int rank, value=0; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); while(value>=0) { if (0==rank) { scanf("%d", &value); } MPI_Bcast(&value, 1, MPI_INT, 0, MPI_COMM_WORLD); printf("Process %d got %d\n", rank, value); } MPI_Finalize(); return 0; }
代碼執行結果如下:
上述代碼中涉及到了Bcast函數的基本用法,在每個進程中都要出現,然后通過函數參數中的root進程號,進而區分哪個進程是發送方。
示例2(Alltoall全互換通信)
全互換。全互換是形式上最復雜的一種組通信方式,內容上是各種通信形式的全集。
全互換通信模式中,在組內所有通信涉及到的進程中:
a. 每個進程發送給其他進程的數據是不同的
b. 每個進程從其他進程獲得數據也是不同的
這樣描述還是太抽象,通過具體的圖來解釋一下:
上圖描述了全互換發生前和發生后,數據變化的情況:
縱軸代表不同進程編號;左圖的橫軸代表發送緩沖區的段位大小,右圖的橫軸代表接受緩沖區的大小。
a. 第i個進程的發送緩沖區中的第j段數據(即左圖Aij)代表的意義是:第i個進程發送給第j個進程的數據
b. 第j個進程的接受緩沖區中的第i段數據(即右圖的Aji)代表的意義是:第j個進程從第i個進程中接收到的數據
結合a、b兩點,我們可以知道全互換這種方式類似:將“進程-發送緩沖區”矩陣,轉置成“進程-接受緩沖區”矩陣
對於同一對(i,j),左圖的Aij的大小要求一定與右圖的Aji的大小一樣。
比如第0個進程發送給第2個進程的數據(即A02),必須與第2個進程給第0個進程留出來的接受緩沖區大小一致(即A20),滿足這樣的條件,就實現了全互換。
每個進程在全通信中既向所有進程發送數據,又同時從所有進程接受數據,但是需要注意上述的發送、接受緩沖區大小匹配。
看如下的代碼:
1 #include "mpi.h"
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <errno.h>
6
7 int main(int argc, char *argv[]) 8 { 9 int rank, size; 10 int chunk = 2; // 發送到一個進程的數據塊大小
11 int i,j; 12 int *sbuf, *rbuf; 13
14 MPI_Init(&argc, &argv); 15 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 16 MPI_Comm_size(MPI_COMM_WORLD, &size); 17 // 申請發送緩沖區 和 接收緩沖區
18 sbuf = (int*)malloc(size*chunk*sizeof(int)); // 申請發送緩沖區
19 if (!sbuf) { 20 perror("can't allocate send buffer"); 21 MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); 22 } 23 rbuf = (int*)malloc(size*chunk*sizeof(int)); // 申請接收緩沖區
24 if (!rbuf) { 25 perror("can't allocate recv buffer"); 26 free(sbuf); 27 MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); 28 } 29 // 設置每個進程的發送緩沖區 和 接收緩沖區的數據
30 for(i=0; i<size; i++) // i代表進程編號
31 { 32 for(j=0; j<chunk; j++) // j代表myid發送給進程i的第j個數據
33 { 34 sbuf[i*chunk+j] = rank+i*chunk+j; // 設置發送緩沖區的數據
35 printf("myid = %d, send to id = %d, data[%d] = %d\n",rank ,i ,j, sbuf[i*chunk+j]); 36 rbuf[i*chunk+j] = 0; // 接收緩沖區清零
37 } 38 } 39 // 執行all to all調用
40 MPI_Alltoall(sbuf, chunk, MPI_INT, rbuf, chunk, MPI_INT, MPI_COMM_WORLD); 41 // 打印接收緩沖區從其他進程接收的數據
42 for(i=0; i<size; i++) 43 { 44 for(j=0; j<chunk; j++) 45 { 46 printf("myid = %d, recv from id = %d, data[%d] = %d\n",rank ,i ,j, rbuf[i*chunk+j]); 47 } 48 } 49 free(rbuf); 50 free(sbuf); 51 MPI_Finalize(); 52 return 0; 53 }
程序執行結果如下:
一共有2個進程,每個進程共有4個數據;可以看到執行全互換后的效果。
示例3
利用近似積分公式求圓周率π的大小。(Bcast廣播、Barrier同步、REDUCE歸約)
圓周率的大小可以用一個積分表達式來求解,求這個積分的過程可以用MPI組通信技術實現
代碼如下:
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4
5 double f(double); 6
7 int main(int argc, char *argv[]) 8 { 9 int rank, size; 10 int n; // 細分出來的小矩陣個數
11 double pi; // 存放pi的計算值
12
13 MPI_Init(&argc, &argv); 14 MPI_Comm_size(MPI_COMM_WORLD, &size); 15 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 16 if (0==rank) { 17 printf("number of small rectangle:\n"); 18 scanf("%d",&n); 19 } 20 MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD); // 將n值廣播到各個進程中
21 int i; 22 double x, h, sum=0.0; 23 for (i=rank+1; i<=n; i+=size) 24 { 25 x = (i-0.5)/(double)n; 26 sum += f(x); 27 } 28 sum = sum/n; 29 MPI_Reduce(&sum, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); 30 if (0==rank) { 31 printf("Approximately of pi is : %.16f\n",pi); 32 fflush(stdout); 33 } 34 MPI_Finalize(); 35 } 36
37 double f(double x) { return 4.0/(1.0+x*x); }
上述代碼是按照如下思路實現的:
比如總共要把積分區間按X軸划分成1000份,即計算任務是求1000個小矩形的面積,
而MPI計算進程共有5個;則為了負載均衡,考慮讓每個計算節點平均搞定200個小矩形的面積。
代碼技巧:
這里有個需要處理的地方是:如何告訴每個計算節點(進程),需要計算哪些小矩形的面積呢?
一種直觀的思路是,讓第0個進程處理0~199小矩形,讓第1個進程處理200~399小矩形...,以此類推。
上述的思路沒有錯誤,但是對邊界條件的處理可能麻煩一些:因為用這種連續區間的任務分配方式,就需要知道提前知道頭和尾各在哪里,需要額外處理。
書上給的代碼技巧是通過間隔划分的方式,比如有5個計算進程,第0個進程計算0,5,10,...這些矩形面積,第1個進程計算1,6,11,...這些矩形面積。
這樣一來,任務分配的代碼就簡單了,一個循環直接搞定,而且不同進程不用區別對待。
上述代碼執行結果如下:
小矩形個數越多,計算的值約逼近真實值。
示例4
有一個大數據集,每個計算進程中各有數據集中的一部分,現在想讓每個進程都有全部的數據集(Bcast廣播、Barrier同步)
代碼如下:
1 #include "mpi.h"
2 #include <stdlib.h>
3 #include <stdio.h>
4
5 int main(int argc, char *argv[]) 6 { 7 int rank,size,i; 8 int *table; 9 int errors = 0; 10 MPI_Aint address; 11 MPI_Datatype type, newtype; 12 int lens; 13
14 MPI_Init(&argc, &argv); 15 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 16 MPI_Comm_size(MPI_COMM_WORLD, &size); 17 table = (int*)calloc(size, sizeof(int)); 18 table[rank] = rank + 1; // 准備要廣播的數據
19 MPI_Barrier(MPI_COMM_WORLD); 20 for(i=0; i<size; i++) MPI_Bcast(&table[i], 1, MPI_INT, i, MPI_COMM_WORLD); // 每個進程只當一次root
21 for(i=0; i<size; i++) 22 { 23 if (table[i]!=i+1) { 24 errors++; 25 } 26 } 27 MPI_Barrier(MPI_COMM_WORLD); 28 if (0==rank) { 29 for(i=0;i<size;i++) printf("table[%d]:%d\n",i,table[i]); 30 } 31 MPI_Finalize(); 32 }
代碼執行結果如下:
代碼分析:
a. line19執行了第一次Barrier同步,目的是讓各個進程都准備好各自擁有數據集的一部分(這里同步的目的是,讓各個進程一方面table分配好空間,並且准備好數據)
b. line27執行了第二次Barrier同步,目的是讓Bcast都完成,然后在root進程中查看table數據是否都發送過去了
其中line20的代碼技巧也值得學習,每個進程只當一次root,而其余的時候全都是當配角,通信中的接收一方
示例5
組通信中的死鎖。組通信的相同函數有可能是同步的,也有可能是異步的,這個受軟硬件平台影響;為了構建可移植性好的代碼,在設計組通信程序的時候,無論是異步還是同步的,都應該保證程序不出現死鎖。下面用幾個例子解釋死鎖出現的情況。
代碼1. 同一個通信域中的死鎖示例
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5
6 #define LEN 10
7
8 int main(int argc, char *argv[]) 9 { 10 int rank,size; 11
12 MPI_Init(&argc, &argv); 13 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 14 MPI_Comm_size(MPI_COMM_WORLD, &size); 15
16 if(2!=size) MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); 17 int* buf1 = (int*)malloc(sizeof(int)*LEN); 18 int* buf2 = (int*)malloc(sizeof(int)*LEN); 19 buf1[0] = 1; 20 buf2[0] = 1; 21 if (0==rank) { 22 MPI_Bcast(buf1,LEN,MPI_INT,0,MPI_COMM_WORLD); 23 MPI_Bcast(buf2,LEN,MPI_INT,1,MPI_COMM_WORLD); 24 printf("0 done\n"); 25 } 26 if (1==rank) { 27 MPI_Bcast(buf2,LEN,MPI_INT,1,MPI_COMM_WORLD); 28 MPI_Bcast(buf1,LEN,MPI_INT,0,MPI_COMM_WORLD); 29 printf("1 done\n"); 30 } 31 free(buf1); 32 free(buf2); 33 MPI_Finalize(); 34 }
代碼執行結果如下:
可以看到並沒有出現deadlock。原因是由於LEN定義的發送和接收數據長度太小,MPI給Bcast執行方式選擇的是緩存非同步方式,即Bcast不必等到所有需要參加組通信的進程都完成了再返回。
如果我們把LEN定義的長度改為100000,則執行結果如下:
執行結果表明deadlock出現了。原因是由於LEN定義的發送和接受的數據長度較大,MPI給Bcast選擇的執行方式是同步方式,即Bcast必須等到所有需要參加廣播通信的進程都完成了,才能夠正確返回。
書上並沒有提Bcast是異步還是同步的事情,我在stackoverflow上提問才知道了上面的道理(http://stackoverflow.com/questions/35628524/why-this-mpi-bcast-related-code-not-deadlock)。感謝stackoverflow上的認真回答。
代碼2. 不同通信域中的死鎖示例
這部分內容與后面第15章的進程組和通信域知識相關,需要調換順序提前補充。
總共有3個進程,三個進程在MPI_COMM_WORLD中的rank分別是{0,1,2}
現在構造三個新的通信域comm0 comm1 comm2
comm0中包含MPI_COMM_WORLD中rank為0,1的進程
comm1中包含MPI_COMM_WORLD中rank為1,2的進程
comm2中包含MPI_COMM_WORLD中rank為2,0的進程
需要注意是,雖然進程客觀上只有3個,但是相同的進程在不同的通信域中的rank號是不同的。
下面的代碼構造了一個循環依賴的deadlock
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4
5 #define LEN 100000
6
7 int main(int argc, char *argv[]) 8 { 9 MPI_Init(&argc, &argv); 10 int rank, size; 11 MPI_Comm_size(MPI_COMM_WORLD, &size); 12 if (3!=size) { 13 printf("wrong proc num\n"); 14 MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); 15 } 16 const int ranks[3] = {0,1,2}; 17 MPI_Group world_group; 18 MPI_Comm_group(MPI_COMM_WORLD, &world_group); 19 // 構造三個新的進程組對象
20 MPI_Group group0, group1, group2; 21 const int ranks0[2] = {0,1}; 22 const int ranks1[2] = {1,2}; 23 const int ranks2[2] = {2,0}; 24 MPI_Group_incl(world_group, 2, ranks0, &group0); 25 MPI_Group_incl(world_group, 2, ranks1, &group1); 26 MPI_Group_incl(world_group, 2, ranks2, &group2); 27
28 // 利用三個進程對象構造通信域
29 MPI_Comm comm0, comm1, comm2; 30 MPI_Comm_create(MPI_COMM_WORLD, group0, &comm0); 31 MPI_Comm_create(MPI_COMM_WORLD, group1, &comm1); 32 MPI_Comm_create(MPI_COMM_WORLD, group2, &comm2); 33 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 34 int *buf1 = (int*)malloc(sizeof(int)*LEN); 35 int *buf2 = (int*)malloc(sizeof(int)*LEN); 36 buf1[0] = 1; 37 buf2[0] = 1; 38 int sub_rank; 39 if (0==rank) { 40 MPI_Group_translate_ranks(world_group,1,&ranks[0],group0,&sub_rank); 41 MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm0); 42 MPI_Group_translate_ranks(world_group,1,&ranks[2],group2,&sub_rank); 43 MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm2); 44 } 45 if (1==rank) { 46 MPI_Group_translate_ranks(world_group,1,&ranks[1],group1,&sub_rank); 47 MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm1); 48 MPI_Group_translate_ranks(world_group,1,&ranks[0],group0,&sub_rank); 49 MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm0); 50 } 51 if (2==rank) { 52 MPI_Group_translate_ranks(world_group,1,&ranks[2],group2,&sub_rank); 53 MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm2); 54 MPI_Group_translate_ranks(world_group,1,&ranks[1],group1,&sub_rank); 55 MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm1); 56 } 57 MPI_Finalize(); 58 }
代碼執行結果是deadlock,這個deadlock本身不難理解,就是comm0依賴comm1,comm1依賴comm2,comm2依賴comm0,循環依賴造成了死鎖,互相都等着。
書上只給出來了關鍵代碼的原型,並沒有給出來通信域是怎么構造的,Bcast中的sub_rank是怎么得來的,需要補充一些進程組和通信域的知識。
進程組和通信域使得MPI程序與傳統的串行程序的設計思路上有很大不同,初學甚至有些別扭,我也沒有全部領會。
額外參考了https://www.rc.usf.edu/tutorials/classes/tutorial/mpi/chapter9.html
只能先把與這個例子相關的知識記錄下來:
a. 通信域(Communicator)是大的概念,進程組是通信域的一個屬性
b. 一個通信域對應一個進程組
c. 一個進程可以在多個進程組中,因此也可以在多個通信域中
d. 通過進程組來構造通信域是通信域的一種生成方式
e. 相同的進程在不同的通信域中的rank是不同的,可以通過函數來查閱這種對應關系
f. 比如,調用MPI_Comm_create函數,利用進程組{0,1}構造一個通信域comm0;那么進程2也會執行這條語句,則進程2中對應的comm0就是MPI_COMM_NULL
關於上面提到的f,我用http://mpitutorial.com/tutorials/introduction-to-groups-and-communicators/里面的一個代碼示例進行了驗證,加深理解:
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4
5 int main(int argc, char *argv[]) 6 { 7 MPI_Init(&argc, &argv); 8 int world_rank, world_size; 9 MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); 10 MPI_Comm_rank(MPI_COMM_WORLD, &world_size); 11
12 MPI_Group world_group; 13 MPI_Comm_group(MPI_COMM_WORLD, &world_group); 14
15 int n = 7; 16 const int ranks[7] = {1,2,3,5,7,11,13}; 17
18 MPI_Group prime_group; 19 // 構造新的進程組 這個進程組是客觀的 不管當前進程是哪個
20 MPI_Group_incl(world_group, 7, ranks, &prime_group); 21
22 MPI_Comm prime_comm; 23 // 如果當前進程不在prime_group中 則prime_com就是MPI_COMM_NULL
24 MPI_Comm_create_group(MPI_COMM_WORLD, prime_group, 0, &prime_comm); 25
26 int prime_rank = -1, prime_size = -1; 27 if (MPI_COMM_NULL!=prime_comm) { 28 MPI_Comm_rank(prime_comm, &prime_rank); 29 MPI_Comm_size(prime_comm, &prime_size); 30 } 31
32 printf("WORLD RANK/SIZE: %d/%d \t PRIME RANK/SIZE: %d/%d\n",world_rank, world_size, prime_rank, prime_size); 33 if(MPI_COMM_NULL!=prime_comm) MPI_Barrier(prime_comm); 34 MPI_Barrier(MPI_COMM_WORLD); 35 if(MPI_GROUP_NULL!=world_group) MPI_Group_free(&world_group); 36 if(MPI_GROUP_NULL!=prime_group) MPI_Group_free(&prime_group); 37 if(MPI_COMM_NULL!=prime_comm) MPI_Comm_free(&prime_comm); 38 MPI_Finalize(); 39 }
代碼執行結果如下:
從原來進程中扣出來幾個進程構造新進程;對於沒被扣出來的那些進程,執行到MPI_Comm_create這里的時候,prime_comm就是MPI_COMM_NULL。
示例6
歸約操作。這部分代碼都是書上的內容,認真看書即可。
代碼1 MINLOC和MAXLOC
有時候不僅需要知道一堆數據中的最小(最大)值,而且需要知道最小最大值對應的編號(或進程號)是多少。
通過MPI提供的MPI_REDUCE歸約函數,以及預定義的歸約操作MPI_MINLOC活MPI_MAXLOC即可方便實現。
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <time.h>
5
6 int main(int argc, char *argv[]) 7 { 8 MPI_Init(&argc, &argv); 9 int i, myrank, root; 10 root = 0; 11 double ain[3]; // 各個進程的輸入數據
12 double aout[3]; // 只有root進程有用 用於存放規約后的數字的具體值
13 int ind[3]; // 只有root進程有用 用於存放規約后的數字的編號
14 struct{ 15 double val; 16 int rank; 17 } in[3], out[3]; // 規約操作的發送緩沖區和接收緩沖區
18 MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 19 srand((unsigned int)(myrank+time(NULL))); 20 for(i=0;i<3;i++) ain[i] = (1.0*rand())/RAND_MAX; 21 for(i=0; i<3; i++){ 22 in[i].val = ain[i]; 23 in[i].rank = myrank; 24 } 25 printf("myrank:%d %.3f,%.3f,%.3f\n",myrank, ain[0],ain[1],ain[2]); 26 MPI_Reduce(in, out, 3, MPI_DOUBLE_INT, MPI_MAXLOC, root, MPI_COMM_WORLD); 27 if (myrank==root) { 28 printf("reduce result:\n"); 29 for(i=0; i<3; i++){ 30 aout[i] = out[i].val; 31 ind[i] = out[i].rank; 32 printf("%.3f of rank %d\n", aout[i], ind[i]); 33 } 34 } 35 MPI_Finalize(); 36 }
代碼執行結果如下:
可以看到這種歸約操作:
a. 要求每個進程中的輸入數據的大小是一樣的(在這個例子中都是長度為3的double數組),
b. 歸約的對象是不同進程的數組中相同位置的數據
另外,也允許用戶自定義歸約操作,如下面代碼定義了復數相乘的歸約操作(書上只給了代碼框架,具體細節中的問題參考http://stackoverflow.com/questions/9285442/mpi-get-processor-with-minimum-value)。
1 #include "mpi.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <time.h>
5
6 typedef struct{ 7 double real; 8 double imag; 9 }Complex; 10
11 void multiplyOP(Complex *, Complex *, int *, MPI_Datatype *); 12
13 int main(int argc, char *argv[]) 14 { 15 MPI_Init(&argc, &argv); 16 int root = 0; 17 int rank; 18 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 19 Complex input[2], output[2]; 20 if (0==rank) { 21 input[0].real = 1; input[0].imag = 1; 22 input[1].real = 1; input[1].imag = 2; 23 } 24 if (1==rank) { 25 input[0].real = 1; input[0].imag = -1; 26 input[1].real = 1; input[1].imag = 2; 27 } 28 // 在MPI中注冊和構造復數類型
29 MPI_Datatype ctype; 30 MPI_Type_contiguous(2, MPI_DOUBLE, &ctype); // 在MPI中構造復數結構體 連續的兩個DOUBLE類型
31 MPI_Type_commit(&ctype); // 在MPI中注冊剛構造的復數結構體 32 // 在MPI中構造自定義歸約操作
33 MPI_Op myOp; 34 MPI_Op_create((MPI_User_function*)multiplyOP, 1, &myOp); // 生成用戶定義的乘積操作
35 MPI_Reduce(input, output, 2, ctype, myOp, root, MPI_COMM_WORLD); // 執行復數乘積操作 36 // 在root中打印結果
37 if (root==rank) { 38 printf("reduce result of 0 is : %1.0f+(%1.0f)i\n",output[0].real, output[0].imag); 39 printf("reduce result of 1 is : %1.0f+(%1.0f)i\n",output[1].real, output[1].imag); 40 } 41 MPI_Finalize(); 42 } 43
44 // 計算復數的乘積
45 void multiplyOP(Complex *in, Complex *inout, int *len, MPI_Datatype *datatype) 46 { 47 int i; 48 Complex c; 49 for(i=0; i<*len; i++){ 50 c.real = inout->real*in->real - inout->imag*in->imag; 51 c.imag = inout->real*in->imag + inout->imag*in->real; 52 *inout = c; // 把計算結果存回到inout的位置
53 in++; 54 inout++; 55 } 56 }
代碼的執行結果如下: