【MPI學習4】MPI並行程序設計模式:非阻塞通信MPI程序設計


這一章講了MPI非阻塞通信的原理和一些函數接口,最后再用非阻塞通信方式實現Jacobi迭代,記錄學習中的一些知識。

 

(1)阻塞通信與非阻塞通信

阻塞通信調用時,整個程序只能執行通信相關的內容,而無法執行計算相關的內容;

非阻塞調用的初衷是盡量讓通信和計算重疊進行,提高程序整體執行效率。

整體對比見下圖:

 

(2)非阻塞通信的要素

非阻塞通信調用返回意味着通信開始啟動;而非阻塞通信完成則需要調用其他的接口來查詢。

要素1:非阻塞通信的調用接口

要素2:非阻塞通信的完成查詢接口

理想的非阻塞通信設計應該如下:

非阻塞通信的 發送 和 接受 過程都需要同時具備以上兩個要素,“調用+完成”

“調用”按照通信方式的不同(標准、緩存、同步、就緒),有各種函數接口,具體用到哪個就查手冊的性質。

這里“完成”是重點,因為程序員需要知道非阻塞調用是否執行完成了,來做下一步的操作。

MPI為“完成”定義了一個內部變量MPI_Request request,每個request與一個在非阻塞調用發生時與該調用發生關聯(這里的調用包括發送和接收)。

“完成”不區分通信方式的不同,統一用MPI_Wait系列函數來完成,這里對MPI_Wait函數做一點說明:

1)MPI_Wait(MPI_Request *request),均等着request執行完畢了,再往下進行

2)對於非重復非阻塞通信,MPI_Wait系列函數調用的返回,還意味着request對象被釋放了,程序員不用再顯式釋放request變量。

3)對於重復非阻塞通信,MPI_Wait系列函數調用的返回,意味着將於request對象關聯的非阻塞通信處於不激活狀態,並不釋放request

關於2)3)看后面的代碼示例就了解了

 

(3)非阻塞調用實現Jacobi迭代

有了非阻塞調用的技術,可以再將Jacobi迭代的程序效率提升,其總體的實現思路如下:

1)先計算Jacobi迭代下次計算所需要的邊界數據,這些數據與每個計算節點中的計算無關,可以先獨立計算好

2)啟動非阻塞通信,將邊界數據在進程間傳遞

3)計算每個計算節點可以獨立計算的部分;此時,2)中啟動的非阻塞通信也在進行中,這時通信和計算就重疊了

4)等着非阻塞通信完成,再進行下一次迭代

再回顧一下之前用阻塞通信實現Jacobi迭代的思路:

1)先傳遞邊界數據

2)等着數據都傳遞完了,再進行計算

3)等着計算完成了,進行下一次迭代

可以看到阻塞通信中實現Jacobi迭代的程序中,在同一計算節點下,通信和計算是分別進行的,效率不如非阻塞通信。

總結起來:

“單機程序 → 阻塞通信MPI程序” 實現單機計算到多機計算,用並行代替串行提高效率。

“阻塞MPI程序 → 非阻塞MPI程序” 不僅將多台機器之間的並行,而且還能將每台機器的通信與計算過程並行,實現更高效的並行。 

 

(4)非阻塞通信實現Jacobi迭代的代碼

書上的源代碼是Fortan的,數據存儲是列優先的,矩陣按列分塊;下面的代碼是我翻譯的C的代碼,數據存儲是行優先的,矩陣按行分塊。

  1 #include "mpi.h"
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 
  5 #define N 8
  6 #define SIZE N/4
  7 #define T 2
  8 
  9 void print_matrix(int myid, float myRows[][N]);
 10 
 11 int main(int argc, char *argv[])
 12 {
 13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
 14     int myid;
 15     MPI_Status status[4];
 16     MPI_Request request[4];
 17 
 18     MPI_Init(&argc, &argv);
 19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
 20 
 21     // 初始化
 22     int i,j;
 23     for(i=0; i<SIZE+2; i++)
 24     {
 25         for(j=0; j<N; j++)
 26         {
 27             matrix1[i][j] = matrix2[i][j] = 0;
 28         }
 29     }
 30     if(0==myid) // 按行划分 上面第一分塊矩陣 上邊界
 31     {
 32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
 33     }
 34     if (3==myid) { // 按行划分 最下面一分塊矩陣 下邊界
 35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
 36     }
 37     for(i=1; i<SIZE+1; i++) // 每個矩陣的兩側邊界
 38     {
 39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
 40     }
 41     // 引入虛擬進程 並計算每個進程上下相鄰進程
 42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
 43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
 44     // jacobi迭代過程
 45     int t,row,col;
 46     for(t=0; t<T; t++)
 47     {
 48         // 1 計算邊界數據
 49         if(0==myid) // 最上的矩陣塊
 50         {
 51             for (col=1; col<N-1; col++)
 52             {
 53                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 54             }
 55         }
 56         else if (3==myid) { // 最下的矩陣塊
 57             for (col=1; col<N-1; col++)
 58             {
 59                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 60             }
 61         }
 62         else {
 63             for(col=1; col<N-1; col++) // 中間的矩陣塊
 64             {
 65                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 66                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 67             }
 68         }
 69         // 2 利用非阻塞函數傳遞邊界數據 為下一次計算做准備
 70         int tag1 = 1, tag2 = 2;
 71         MPI_Isend(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
 72         MPI_Isend(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
 73         MPI_Irecv(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
 74         MPI_Irecv(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
 75         // 3 計算中間數據
 76         int begin_row = 0==myid ? 2 : 1;
 77         int end_row = 3==myid ? (SIZE-1) : SIZE; 
 78         for (row=begin_row; row<end_row; row++)
 79         {
 80             for (col=1; col<N-1; col++)
 81             {
 82                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
 83             }
 84         }
 85         // 4 更新矩陣 並等待各個進程間數據傳遞完畢
 86         for (row=begin_row; row<=end_row; row++)
 87         {
 88             for (col=1; col<N-1; col++)
 89             {
 90                 matrix1[row][col] = matrix2[row][col];
 91             }
 92         }
 93         MPI_Waitall(4, &request[0], &status[0]);
 94     }
 95     MPI_Barrier(MPI_COMM_WORLD);
 96     print_matrix(myid, matrix1);
 97     MPI_Finalize();
 98 }
 99 
100 
101 void print_matrix(int myid, float myRows[][N])
102 {
103     int i,j;
104     int buf[1];
105     MPI_Status status;
106     buf[0] = 1;
107     if ( myid>0 ) {
108         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
109     }
110     printf("Result in process %d:\n", myid);
111     for ( i = 0; i<SIZE+2; i++)
112     {
113         for ( j = 0; j<N; j++)
114             printf("%1.3f\t", myRows[i][j]);
115         printf("\n");
116     }
117     if ( myid<3 ) {
118         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
119     }
120     MPI_Barrier(MPI_COMM_WORLD);
121 }

程序的執行結果如下:

上述程序設計的邏輯如下:

1)各個分塊矩陣的邊界數據是可以需要通信交換的

2)先計算邊界數據,盡量把需要通信交換而且又相對獨立的數據先計算出來

3)用非阻塞通信傳遞分塊矩陣的邊界數據;同時每個節點內計算內部的數據;計算與通信並行

4)等到每個計算節點的2個發送、2個接收,總共4個非阻塞調用都完成了,進行下一輪迭代

 

(5)重復非阻塞通信

上面實現Jacobi迭代的代碼中,以進程1和進程2為例:

1)迭代一輪二者之間就需要互相通信一次

2)每次互相通信,隨着MPI_Wait的執行,request通信對象釋放,兩個進程通信完全被切斷了

3)兩個進程之間每次通信,有一些通信連接操作都是重復的,最好不用每次通信都重新執行這些連接操作,以此提高效率

4)因此,比上面實現jacobi迭代更優化一些的做法是:每次不完全掐斷兩個進程的非阻塞通信,保持那些基礎的通用的操作,每次迭代只需要更新需要傳輸的數據,再激活兩個進程之間的非阻塞通信

依照上面的思路,MPI給出了重復非阻塞的通信調用實現。用重復非阻塞的通信再實現一次Jacobi迭代,代碼如下:

  1 #include "mpi.h"
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 
  5 #define N 8
  6 #define SIZE N/4
  7 #define T 2
  8 
  9 void print_matrix(int myid, float myRows[][N]);
 10 
 11 int main(int argc, char *argv[])
 12 {
 13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
 14     int myid;
 15     MPI_Status status[4];
 16     MPI_Request request[4];
 17 
 18     MPI_Init(&argc, &argv);
 19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
 20 
 21     // 初始化
 22     int i,j;
 23     for(i=0; i<SIZE+2; i++)
 24     {
 25         for(j=0; j<N; j++)
 26         {
 27             matrix1[i][j] = matrix2[i][j] = 0;
 28         }
 29     }
 30     if(0==myid) // 按行划分 上面第一分塊矩陣 上邊界
 31     {
 32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
 33     }
 34     if (3==myid) { // 按行划分 最下面一分塊矩陣 下邊界
 35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
 36     }
 37     for(i=1; i<SIZE+1; i++) // 每個矩陣的兩側邊界
 38     {
 39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
 40     }
 41     // 引入虛擬進程 並計算每個進程上下相鄰進程
 42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
 43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
 44     // 初始化重復非阻塞通信
 45     int tag1 = 1, tag2 = 2;
 46     MPI_Send_init(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
 47     MPI_Send_init(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
 48     MPI_Recv_init(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
 49     MPI_Recv_init(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
 50     // jacobi迭代過程
 51     int t,row,col;
 52     for(t=0; t<T; t++)
 53     {
 54         // 1 計算邊界數據
 55         if(0==myid) // 最上的矩陣塊
 56         {
 57             for (col=1; col<N-1; col++)
 58             {
 59                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 60             }
 61         }
 62         else if (3==myid) { // 最下的矩陣塊
 63             for (col=1; col<N-1; col++)
 64             {
 65                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 66             }
 67         }
 68         else {
 69             for(col=1; col<N-1; col++) // 中間的矩陣塊
 70             {
 71                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 72                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 73             }
 74         }
 75         // 2 啟動重復非阻塞通信
 76         MPI_Startall(4, &request[0]);
 77         // 3 計算中間數據
 78         int begin_row = 0==myid ? 2 : 1;
 79         int end_row = 3==myid ? (SIZE-1) : SIZE; 
 80         for (row=begin_row; row<end_row; row++)
 81         {
 82             for (col=1; col<N-1; col++)
 83             {
 84                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
 85             }
 86         }
 87         // 4 更新矩陣 並等待各個進程間數據傳遞完畢
 88         for (row=begin_row; row<=end_row; row++)
 89         {
 90             for (col=1; col<N-1; col++)
 91             {
 92                 matrix1[row][col] = matrix2[row][col];
 93             }
 94         }
 95         MPI_Waitall(4, &request[0], &status[0]);
 96     }
 97     int n;
 98     for(n = 0; n < 4; n++) MPI_Request_free(&request[n]); // 釋放非阻塞通信對象
 99     MPI_Barrier(MPI_COMM_WORLD);
100     print_matrix(myid, matrix1);
101     MPI_Finalize();
102 }
103 
104 
105 void print_matrix(int myid, float myRows[][N])
106 {
107     int i,j;
108     int buf[1];
109     MPI_Status status;
110     buf[0] = 1;
111     if ( myid>0 ) {
112         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
113     }
114     printf("Result in process %d:\n", myid);
115     for ( i = 0; i<SIZE+2; i++)
116     {
117         for ( j = 0; j<N; j++)
118             printf("%1.3f\t", myRows[i][j]);
119         printf("\n");
120     }
121     if ( myid<3 ) {
122         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
123     }
124     MPI_Barrier(MPI_COMM_WORLD);
125 }

1)上述的代碼不難理解,可以查閱相關函數手冊;最核心的思想就是,如果兩個進程有多次迭代通信,就可以用這種重復非阻塞的通信函數。

2)另外,對於重復非阻塞通信的調用,在調用MPI_Wait系列函數時,不會釋放與通信關聯的request函數(即上面說的保持一些共性的通信設定操作,不完全掐斷),因此,需要在line98中,程序員手動釋放非則色通信操作對象


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM