▶ 八個常用的集合通信函數
▶ 規約函數 MPI_Reduce(),將通信子內各進程的同一個變量參與規約計算,並向指定的進程輸出計算結果
● 函數原型
1 MPI_METHOD MPI_Reduce( 2 _In_range_(!= , recvbuf) _In_opt_ const void* sendbuf, // 指向輸入數據的指針 3 _When_(root != MPI_PROC_NULL, _Out_opt_) void* recvbuf, // 指向輸出數據的指針,即計算結果存放的地方 4 _In_range_(>= , 0) int count, // 數據尺寸,可以進行多個標量或多個向量的規約 5 _In_ MPI_Datatype datatype, // 數據類型 6 _In_ MPI_Op op, // 規約操作類型 7 _mpi_coll_rank_(root) int root, // 目標進程號,存放計算結果的進程 8 _In_ MPI_Comm comm // 通信子 9 );
● 使用范例
1 { 2 int size, rank, data, dataCollect; 3 MPI_Init(NULL, NULL); 4 MPI_Comm_size(MPI_COMM_WORLD, &size); 5 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 6 7 data = rank;// 參與計算的數據 8 MPI_Reduce((void *)&data, (void *)&dataCollect, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);// 所有的進程都要調用,而不是只在目標進程中調用 9 10 MPI_Finalize(); 11 }
● 操作類型,定義於 mpi.h
1 #define MPI_OP_NULL ((MPI_Op)0x18000000) 2 3 #define MPI_MAX ((MPI_Op)0x58000001) 4 #define MPI_MIN ((MPI_Op)0x58000002) 5 #define MPI_SUM ((MPI_Op)0x58000003) 6 #define MPI_PROD ((MPI_Op)0x58000004) 7 #define MPI_LAND ((MPI_Op)0x58000005)// 邏輯與 8 #define MPI_BAND ((MPI_Op)0x58000006)// 按位與 9 #define MPI_LOR ((MPI_Op)0x58000007) 10 #define MPI_BOR ((MPI_Op)0x58000008) 11 #define MPI_LXOR ((MPI_Op)0x58000009) 12 #define MPI_BXOR ((MPI_Op)0x5800000a) 13 #define MPI_MINLOC ((MPI_Op)0x5800000b)// 求最小值所在位置 14 #define MPI_MAXLOC ((MPI_Op)0x5800000c)// 求最大值所在位置 15 #define MPI_REPLACE ((MPI_Op)0x5800000d)
▶ 規約並廣播函數 MPI_Allreduce(),在計算規約的基礎上,將計算結果分發到每一個進程中,相比於 MPI_Reduce(),只是少了一個 root 參數。除了簡單的先規約再廣播的方法,書中介紹了蝶形結構全局求和的方法。
● 函數原型
1 _Pre_satisfies_(recvbuf != MPI_IN_PLACE) MPI_METHOD MPI_Allreduce( 2 _In_range_(!= , recvbuf) _In_opt_ const void* sendbuf, 3 _Out_opt_ void* recvbuf, 4 _In_range_(>= , 0) int count, 5 _In_ MPI_Datatype datatype, 6 _In_ MPI_Op op, 7 _In_ MPI_Comm comm 8 );
● 使用范例
1 { 2 int size, rank, data, dataCollect; 3 MPI_Init(NULL, NULL); 4 MPI_Comm_size(MPI_COMM_WORLD, &size); 5 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 6 7 data = rank; 8 MPI_Reduce((void *)&data, (void *)&dataCollect, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);// 所有的進程都要調用 9 10 MPI_Finalize(); 11 }
▶ 廣播函數 MPI_Bcast(),將某個進程的某個變量的值廣播到該通信子中所有進程的同名變量中
● 函數原型
1 MPI_METHOD MPI_Bcast( 2 _Pre_opt_valid_ void* buffer, // 指向輸入 / 輸出數據的指針 3 _In_range_(>= , 0) int count, // 數據尺寸 4 _In_ MPI_Datatype datatype, // 數據類型 5 _mpi_coll_rank_(root) int root, // 廣播源進程號 6 _In_ MPI_Comm comm // 通信子 7 );
● 使用范例
1 { 2 int size, rank, data; 3 MPI_Init(NULL, NULL); 4 MPI_Comm_size(MPI_COMM_WORLD, &size); 5 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 6 7 data = rank; 8 MPI_Bcast((void *)&data, 1, MPI_INT, 0, MPI_COMM_WORLD);// 所有的進程都要調用,調用后所有 data 均被廣播源進程的值覆蓋 9 10 MPI_Finalize(); 11 }
▶ 散射函數 MPI_Scatter(),將向量數據分段發送到各進程中
● 函數原型和宏定義
1 _Pre_satisfies_(sendbuf != MPI_IN_PLACE) MPI_METHOD MPI_Scatter( 2 _In_range_(!= , recvbuf) _In_opt_ const void* sendbuf, // 指向需要分發的數據的指針 3 _In_range_(>= , 0) int sendcount, // 分發到每一個進程的數據量,注意不是分發的數據總量 4 _In_ MPI_Datatype sendtype, // 分發數據類型 5 _When_(root != MPI_PROC_NULL, _Out_opt_) void* recvbuf, // 指向接收的數據的指針 6 _In_range_(>= , 0) int recvcount, // 接受數據量,不小於上面分發到每一個進程的數據量 7 _In_ MPI_Datatype recvtype, // 接收數據類型 8 _mpi_coll_rank_(root) int root, // 分發數據源進程號 9 _In_ MPI_Comm comm // 通信子 10 ); 11 12 // 宏定義,mpi.h 13 #define MPI_IN_PLACE ((void*)(MPI_Aint)-1 // MPI_Aint 為 __int64 類型,表示地址
▶ 聚集函數 MPI_Gather(),將各進程中的向量數據分段聚集到一個進程的大向量中
● 函數原型
1 _Pre_satisfies_(recvbuf != MPI_IN_PLACE) MPI_METHOD MPI_Gather( 2 _In_opt_ _When_(sendtype == recvtype, _In_range_(!= , recvbuf)) const void* sendbuf,// 指向需要聚集的數據的指針 3 _In_range_(>= , 0) int sendcount, // 每個進程中進行聚集的數據量,不是聚集的數據總量 4 _In_ MPI_Datatype sendtype, // 發送數據類型 5 _When_(root != MPI_PROC_NULL, _Out_opt_) void* recvbuf, // 指向接收數據的指針 6 _In_range_(>= , 0) int recvcount, // 從每個進程接收的接收數據量,不是聚集的數據總量 7 _In_ MPI_Datatype recvtype, // 接收數據類型 8 _mpi_coll_rank_(root) int root, // 聚集數據匯進程號 9 _In_ MPI_Comm comm // 通信子 10 );
● 函數 MPI_Scatter() 和 MPI_Gather() 的范例
1 { 2 const int dataSize = 8 * 8; 3 const int localSize = 8; 4 int globalData[dataSize], localData[localSize], globalSum, i, comSize, comRank; 5 6 MPI_Init(&argc, &argv); 7 MPI_Comm_size(MPI_COMM_WORLD, &comSize); 8 MPI_Comm_rank(MPI_COMM_WORLD, &comRank); 9 10 if (comRank == 0) // 初始化 11 for (i = 0; i < dataSize; globalData[i] = i, i++); 12 for (i = 0; i < localSize; localData[i++] = 0); 13 14 MPI_Scatter((void *)&globalData, localSize, MPI_INT, (void *)&localData, localSize, MPI_INT, 0, MPI_COMM_WORLD); // 分發數據 15 for (i = 0; i < localSize; localData[i++]++); 16 MPI_Barrier(MPI_COMM_WORLD); // 進程同步 17 MPI_Gather((void *)&localData, localSize, MPI_INT, (void *)&globalData, localSize, MPI_INT, 0, MPI_COMM_WORLD); // 聚集數據 18 for (i = globalSum = 0; i < dataSize; globalSum += globalData[i++]); 19 20 if (comRank == 0) 21 printf("\nSize = %d, Rank = %d, result = %d\n", comSize, comRank, globalSum); 22 MPI_Finalize(); 23 24 return 0;// 輸出結果:Size = 8, Rank = 0, result = 2080,表示 0 + 1 + 2 + …… + 63 25 }
▶ 全局聚集函數 MPI_Allgather(),將各進程的向量數據聚集為一個大向量,並分發到每個進程中,相當於各進程同步該大向量的各部分分量。相比於 MPI_Gather(),只是少了一個 root 參數。
● 函數原型
1 _Pre_satisfies_(recvbuf != MPI_IN_PLACE) MPI_METHOD MPI_Allgather( 2 _In_opt_ _When_(sendtype == recvtype, _In_range_(!= , recvbuf)) const void* sendbuf, 3 _In_range_(>= , 0) int sendcount, 4 _In_ MPI_Datatype sendtype, 5 _Out_opt_ void* recvbuf, 6 _In_range_(>= , 0) int recvcount, 7 _In_ MPI_Datatype recvtype, 8 _In_ MPI_Comm comm 9 );
● 函數 MPI_Scatter() 和 MPI_Allgather() 的范例,相當於從上面的范例中修改了一部分
1 { 2 const int dataSize = 8 * 8; 3 const int localSize = 8; 4 int globalData[dataSize], localData[localSize], globalSum, i, comSize, comRank; 5 6 MPI_Init(&argc, &argv); 7 MPI_Comm_size(MPI_COMM_WORLD, &comSize); 8 MPI_Comm_rank(MPI_COMM_WORLD, &comRank); 9 10 for (i = 0; i < dataSize; globalData[i] = i, i++);// 改動 11 for (i = 0; i < localSize; localData[i++] = 0); 12 13 MPI_Scatter((void *)&globalData, localSize, MPI_INT, (void *)&localData, localSize, MPI_INT, 0, MPI_COMM_WORLD); // 分發數據 14 for (i = 0; i < localSize; localData[i++]++); 15 MPI_Barrier(MPI_COMM_WORLD); 16 MPI_Allgather((void *)&localData, localSize, MPI_INT, (void *)&globalData, localSize, MPI_INT, MPI_COMM_WORLD); // 聚集數據,改動 17 for (i = globalSum = 0; i < dataSize; globalSum += globalData[i++]); 18 19 printf("\nSize = %d, rank = %d, result = %d\n", comSize, comRank, globalSum);// 改動 20 MPI_Finalize(); 21 22 return 0;// 輸出結果,八個進程亂序輸出 2080 23 }
▶ 前綴和函數 MPI_Scan(),將通信子內各進程的同一個變量參與前綴規約計算,並將得到的結果發送回每個進程,使用與函數 MPI_Reduce() 相同的操作類型
● 函數原型
1 _Pre_satisfies_(recvbuf != MPI_IN_PLACE) MPI_METHOD MPI_Scan( 2 _In_opt_ _In_range_(!= , recvbuf) const void* sendbuf, // 指向參與規約數據的指針 3 _Out_opt_ void* recvbuf, // 指向接收規約結果的指針 4 _In_range_(>= , 0) int count, // 每個進程中參與規約的數據量 5 _In_ MPI_Datatype datatype, // 數據類型 6 _In_ MPI_Op op, // 規約操作類型 7 _In_ MPI_Comm comm // 通信子 8 );
● 范例代碼
1 int main(int argc, char **argv) 2 { 3 const int nProcess = 8, localSize = 8, globalSize = localSize * nProcess; 4 int globalData[globalSize], localData[localSize], sumData[localSize]; 5 int comRank, comSize, i; 6 7 MPI_Init(&argc, &argv); 8 MPI_Comm_rank(MPI_COMM_WORLD, &comRank); 9 MPI_Comm_size(MPI_COMM_WORLD, &comSize); 10 11 if (comRank == 0) 12 for (i = 0; i < globalSize; globalData[i] = i, i++); 13 14 MPI_Scatter(globalData, localSize, MPI_INT, localData, localSize, MPI_INT, 0, MPI_COMM_WORLD); 15 16 for (i = 0; i < localSize; i++) 17 printf("%2d, ", localData[i]); 18 19 MPI_Scan(localData, sumData, localSize, MPI_INT, MPI_SUM, MPI_COMM_WORLD); 20 21 for (i = 0; i < localSize; i++) 22 printf("%2d, ", sumData[i]); 23 24 MPI_Finalize(); 25 return 0; 26 }
● 輸出結果,分別展示了 localSize 取 1 和 8 的結果,每個進程的輸出中,前一半(分別為 1 個和 8 個元素)為個進程的原始數據,后一半為進行完前綴求和后的結果。注意到 localSize 取 8 時,程序將各進程保存向量的每一個元素分別進行前綴和,但同一進程中各元素之間不相互影響。
D:\Code\MPI\MPIProjectTemp\x64\Debug>mpiexec -n 8 -l MPIProjectTemp.exe [1] 1, 1, [6] 6, 21, [5] 5, 15, [4] 4, 10, [3] 3, 6, [0] 0, 0, [2] 2, 3, [7] 7, 28, D:\Code\MPI\MPIProjectTemp\x64\Debug>mpiexec -n 8 -l MPIProjectTemp.exe [4] 32, 33, 34, 35, 36, 37, 38, 39, 80, 85, 90, 95, 100, 105, 110, 115, [1] 8, 9, 10, 11, 12, 13, 14, 15, 8, 10, 12, 14, 16, 18, 20, 22, [0] 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, [7] 56, 57, 58, 59, 60, 61, 62, 63, 224, 232, 240, 248, 256, 264, 272, 280, [3] 24, 25, 26, 27, 28, 29, 30, 31, 48, 52, 56, 60, 64, 68, 72, 76, [6] 48, 49, 50, 51, 52, 53, 54, 55, 168, 175, 182, 189, 196, 203, 210, 217, [5] 40, 41, 42, 43, 44, 45, 46, 47, 120, 126, 132, 138, 144, 150, 156, 162, [2] 16, 17, 18, 19, 20, 21, 22, 23, 24, 27, 30, 33, 36, 39, 42, 45,
▶ 規約分發函數 MPI_Reduce_Scatter(),將數據進行規約計算,結果分段分發到各進程中
● 函數原型
1 _Pre_satisfies_(recvbuf != MPI_IN_PLACE) MPI_METHOD MPI_Reduce_scatter( 2 _In_opt_ _In_range_(!= , recvbuf) const void* sendbuf, // 指向輸入數據的指針 3 _Out_opt_ void* recvbuf, // 指向接收數據的指針 4 _In_ const int recvcounts[], // 各進程接收規約結果的元素個數 5 _In_ MPI_Datatype datatype, // 數據類型 6 _In_ MPI_Op op, // 規約操作類型 7 _In_ MPI_Comm comm // 通信子 8 );
● 使用范例
1 { 2 const int nProcess = 8, localSize = 8, globalSize = nProcess * localSize, countValue = 1; 3 int globalData[globalSize], localData[localSize], count[localSize], localSum[countValue], i, comSize, comRank; 4 5 MPI_Init(&argc, &argv); 6 MPI_Comm_size(MPI_COMM_WORLD, &comSize); 7 MPI_Comm_rank(MPI_COMM_WORLD, &comRank); 8 9 if (comRank == 0) 10 for (i = 0; i < globalSize; globalData[i] = i, i++); 11 12 MPI_Scatter(globalData, localSize, MPI_INT, localData, localSize, MPI_INT, 0, MPI_COMM_WORLD); 13 14 for (i = 0; i < localSize; count[i++] = 1); 15 16 for (i = 0; i < localSize; i++) 17 printf("%3d, ", localData[i]); 18 19 MPI_Reduce_scatter(localData, localSum, count, MPI_INT, MPI_SUM, MPI_COMM_WORLD); 20 21 for (i = 0; i < countValue; i++) 22 printf("%3d, ", localSum[i]); 23 24 MPI_Finalize(); 25 return 0; 26 }
● 輸出結果,這里取定 localSize 為 8,輸出結果的前 8 個元素為分發到各進程中參與規約計算的原始數據,后面元素為規約計算結果。程序將各進程保存向量的每一個元素分別進行前綴和,但同一進程中各元素之間不相互影響,通過修改 countValue(即參數 count 各元素的值),可以將規約計算的結果分發到各進程中
■ countValue == 1(count == { 1, 1, 1, 1, 1, 1, 1, 1 })情況,每個進程分得一個結果(注意與上面的函數 MPI_Scan() 作對比)
■ countValue == 2(count == { 2, 2, 2, 2, 2, 2, 2, 2 })情況,前 4 個進程每個進程分得 2 個結果,后 4 的進程訪問越界,得到無意義的值
■ count == { 2, 0, 2, 0, 2, 0, 2, 0 } 情況,偶數號進程每個進程分得 2 個結果,奇數號進程分得 0 個結果,表現為無意義的值
■ 思考,這列每個 localData 長度為 8,所以規約計算的結果為一個長度為 8 的向量,可以在不同進程中進行分發(注意數據尺寸大小 localSize 與運行程序的進程數 nProcess 沒有任何關系,只是在范例中恰好相等),而函數 MPI_Scan() 則相當於在此基礎上保留了所有中間結果(部分前綴結果),所以其輸出為一個長為 localSize,寬度為 nProcess 的矩陣,並且自動按照進程號均分。
D:\Code\MPI\MPIProjectTemp\x64\Debug>mpiexec -n 8 -l MPIProjectTemp.exe // countValue = 1 [6] 48, 49, 50, 51, 52, 53, 54, 55, 272, [0] 0, 1, 2, 3, 4, 5, 6, 7, 224, [2] 16, 17, 18, 19, 20, 21, 22, 23, 240, [4] 32, 33, 34, 35, 36, 37, 38, 39, 256, [3] 24, 25, 26, 27, 28, 29, 30, 31, 248, [1] 8, 9, 10, 11, 12, 13, 14, 15, 232, [5] 40, 41, 42, 43, 44, 45, 46, 47, 264, [7] 56, 57, 58, 59, 60, 61, 62, 63, 280, D:\Code\MPI\MPIProjectTemp\x64\Debug>mpiexec -n 8 -l MPIProjectTemp.exe // countValue = 2 [0] 0, 1, 2, 3, 4, 5, 6, 7, 224, 232, [6] 48, 49, 50, 51, 52, 53, 54, 55, 1717986912, 1717986912, [1] 8, 9, 10, 11, 12, 13, 14, 15, 240, 248, [3] 24, 25, 26, 27, 28, 29, 30, 31, 272, 280, [4] 32, 33, 34, 35, 36, 37, 38, 39, 1717986912, 1717986912, [5] 40, 41, 42, 43, 44, 45, 46, 47, 1717986912, 1717986912, [2] 16, 17, 18, 19, 20, 21, 22, 23, 256, 264, [7] 56, 57, 58, 59, 60, 61, 62, 63, 1717986912, 1717986912, D:\Code\MPI\MPIProjectTemp\x64\Debug>mpiexec -n 8 -l MPIProjectTemp.exe // countValue = 2,count[i] = (i + 1) % 2 * 2 [4] 32, 33, 34, 35, 36, 37, 38, 39, 256, 264, [2] 16, 17, 18, 19, 20, 21, 22, 23, 240, 248, [3] 24, 25, 26, 27, 28, 29, 30, 31, -858993460, -858993460, [1] 8, 9, 10, 11, 12, 13, 14, 15, -858993460, -858993460, [7] 56, 57, 58, 59, 60, 61, 62, 63, -858993460, -858993460, [0] 0, 1, 2, 3, 4, 5, 6, 7, 224, 232, [5] 40, 41, 42, 43, 44, 45, 46, 47, -858993460, -858993460, [6] 48, 49, 50, 51, 52, 53, 54, 55, 272, 280,