Parallel Computing–Cannon算法 (MPI 實現)


原理不解釋,直接上代碼

代碼中被注釋的源程序可用於打印中間結果,檢查運算是否正確。

#include "mpi.h"
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>


void scatter_matrix(int* fstream,int n1,int n2,int*Q,int root,int tag){
	/*每個矩陣塊的大小*/
	int rows=(n1+root-1)/root;
	int cols=(n2+root-1)/root;
	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));
	
	int i,j;
	memset(Q,0,rows*cols*sizeof(int));
	for(i=0;i<root;i++){
		for(j=0;j<root;j++){
			int p=0,q=0;
			int imin=i*rows*n2;
			int jmin=j*cols;
			memset(tmp_matrix,0,sizeof(tmp_matrix));

			/*在划分矩陣時,由於地空間不連續,需要另開辟一個數組連續的保存起來,以便於調用MPI_Send*/
			for(p=0;p<rows;p++,imin+=n2){
				for(q=0;q<cols;q++){
					tmp_matrix[p*cols+q]=fstream[imin+jmin+q];
				}
			}
			if(i==0&&j==0){
				/*進程0 不需要使用MPI_Send將數據發送給自己,直接使用memcpy將結果拷貝即可*/
				memcpy(Q,tmp_matrix,rows*cols*sizeof(int));
			}else{
				/*將分塊發送給位於i行,j列的進程*/
				MPI_Send(tmp_matrix,rows*cols,MPI_INT,i*root+j,tag,MPI_COMM_WORLD);
			}	
		}
	}
}
/*
 *@row:矩陣所在的行
 *@col:矩陣所在的列
 *@sp:sp=root=sqrt(nprocs)
 *@return 根據行列號計算進程實際編號
*/
int get_index(int row,int col,int sp){
	int tmp=((row+sp)%sp)*sp+(col+sp)%sp;
	return tmp;
}
/*計算矩陣乘法,將結果存入C中*/
void matrix_multi(int* A,int *B,int *C,int n1,int n2,int n3,int myid){
	int i=0,j=0,k=0;
	int* tmp_C=(int*)malloc(n1*n3*sizeof(int));
	memset(tmp_C,0,sizeof(int)*n1*n3);
	
	for(i=0;i<n1;i++){
		for(j=0;j<n3;j++){
			for(k=0;k<n2;k++){
				tmp_C[i*n3+j]+=A[i*n2+k]*B[k*n3+j];
			}
			C[i*n3+j]+=tmp_C[i*n3+j];
		}
		
	}
	
}

/*用於矩陣下標定位對齊*/
void shuffle(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,int root,int myid){
	int i,j;
	MPI_Status status;
	int cur_col=0;
	int cur_row=0;
	/*通過進程編號計算獲得當前進程所在的行號和列號*/
	cur_row=myid/root;
	cur_col=myid-cur_row*root;
	/*對於矩陣A,第i行的矩陣需要向左平移i次*/
	for(i=0;i<cur_row;i++){
		/*接收來自右邊的數據,並將當前矩陣發送給左邊的進程*/
		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
		memcpy(A,buf_A,buf_A_size*sizeof(int));/*buf_A用於通信時緩存矩陣*/
		memset(buf_A,0,buf_A_size*sizeof(int));	
	}
	/*對於矩陣B,第j列的矩陣需要向上平移j次*/	
	for(j=0;j<cur_col;j++){
		/*接收來自下邊的數據,並將當前矩陣發送給上邊的進程*/
		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
		memcpy(B,buf_B,buf_B_size*sizeof(int));/*buf_B用於通信時緩存矩陣*/
		memset(buf_B,0,buf_B_size*sizeof(int));
	}
	/*printf("I have shuffled!\n");*/
}
void cannon(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,
	int *C,int buf_C_size,int row_a,int col_a,int col_b,int root,int myid){
	MPI_Status status;
	double elapsed_time,multiply_time=0,passdata_time=0;
	int i,j;
	memset(C,0,sizeof(int)*buf_C_size);
	int cur_col=0;
	int cur_row=0;
	/*通過進程編號計算獲得當前進程所在的行號和列號*/
	cur_row=myid/root;
	cur_col=myid-cur_row*root;
	

	for(i=0;i<root;i++){/*一共需要循環root次,root=sqrt(nprocs)*/
		elapsed_time=MPI_Wtime();
		matrix_multi(A,B,C,row_a,col_a,col_b,myid);/*計算矩陣乘法*/
		elapsed_time=MPI_Wtime()-elapsed_time;
		multiply_time+=elapsed_time;
		/*elapsed_time=MPI_Wtime();		*/
		/*接收來自右邊(row,col+1)的數據,並將當前矩陣發送給左邊(row,col-1)的進程*/
		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
		/*接收來自下邊(row+1,col)的數據,並將當前矩陣發送給上邊(row-1,col)的進程*/
		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
		/*elapsed_time=MPI_Wtime()-elapsed_time;
		passdata_time+=elapsed_time;*/
		memcpy(B,buf_B,buf_B_size*sizeof(int));/*將buf_B中的數據拷貝至B中*/
		memcpy(A,buf_A,buf_A_size*sizeof(int));/*將buf_A中的數據拷貝至A中*/
		
	}
	/*將計算結果發送給數組C*/
	MPI_Send(C,row_a*col_b,MPI_INT,0,104,MPI_COMM_WORLD);
	
	printf("proc:%d, passdata time:%lf    multiply time:%lf\n",myid,passdata_time,multiply_time);
}
 
void gather_matrix(int *fstream,int n1,int n3,int*C,int root,FILE*fhc){
	MPI_Status status;
	int rows=(n1+root-1)/root;
	int cols=(n3+root-1)/root;
	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));
	int i,j;

	for(i=0;i<root;i++){
		for(j=0;j<root;j++){
			int p,q;
			int imin=i*rows*n3;
			int jmin=j*cols;
			memset(tmp_matrix,0,sizeof(tmp_matrix));
			/*接收來自各個進程的數據*/
			MPI_Recv(tmp_matrix,rows*cols,MPI_INT,i*root+j,104,MPI_COMM_WORLD,&status);	
			/*printf("I am passed proc:%d \n",i*root+j);*/
			/*將接收的矩陣tmp拼接到矩陣C中去,需要按照合理順序拼接,否則結果會出錯*/
			for(p=0;p<rows;p++,imin+=n3){
				for(q=0;q<cols;q++){
					fstream[imin+jmin+q]=tmp_matrix[p*cols+q];
					/*printf("%d ",((int*)fstream)[imin+jmin+q]);*/
				}
				/*printf("\n");*/
			}
		}
	}

	/*將結果打印到文件中*/
	for(i=0;i<n1;i++){
		for(j=0;j<n3;j++){
			fprintf(fhc,"%d ",fstream[i*n3+j]);
		}
		fprintf(fhc,"\n");
	}
}

int main(int argc,char**argv){
	int myid,numprocs;
	int i,j;
	MPI_Status status;
	int root=0;
	int dim[3];
	double elapsed_time=0;
	int max_rows_a,max_cols_a,max_rows_b,max_cols_b;
	int buf_A_size,buf_B_size,buf_C_size;
	FILE* fhc;
	/*suppose A:n1*n2 ,B:n2*n3;n1,n2,n3 are read from input file*/
	int n1,n2,n3;
	/*buffer for matrix A,B,C will be shifted ,so they each have two buffer*/
	int *A,*B,*C,*buf_A,*buf_B;

	/*on proc0,buffers to cache matrix files of A,B and C*/
	int *fstream_a=NULL,*fstream_b=NULL,*fstream_c=NULL;
	MPI_Init(&argc,&argv);/*初始化*/
	MPI_Comm_rank(MPI_COMM_WORLD,&myid);/*獲取當前進程ID*/
	MPI_Comm_size(MPI_COMM_WORLD,&numprocs);/*獲取全部進程數量*/

	root=sqrt(numprocs);
	if(numprocs!=root*root){
		/*如果進程總數不是平方數,則結束程序*/
		printf("process number must be a squre!\n");
		exit(-1);
	}

	/*on proc0,preprocess the command line,read in file
	 for A,B and put their sizes in dim[]*/
	if(myid==0){
		FILE *file_a,*file_b,*file_c;
		int n1,n2,n3;
		int i,j;
		file_a=fopen(argv[1],"r");/*打開文件a,文件名從運行時給的參數中獲得*/
		file_b=fopen(argv[2],"r");/*打開文件b,文件名從運行時給的參數中獲得*/
		fscanf(file_a,"%d %d",&n1,&n2);/*從文件a中讀取矩陣A的行數,列數*/
		fscanf(file_b,"%d %d",&n2,&n3);/*從文件b中讀取矩陣B的行數,列數*/
	
		dim[0]=n1,dim[1]=n2,dim[2]=n3;
		fstream_a=(int*)malloc(n1*n2*sizeof(int));/*分配一塊內存,用於將矩陣A讀入內存*/
		fstream_b=(int*)malloc(n2*n3*sizeof(int));/*分配一塊內存,用於將矩陣B讀入內存*/
		/*printf("Yeah! I got n1=%d,n2=%d,n3=%d\n",n1,n2,n3);*/
		/*讀入矩陣A,保存在fstream_a中*/
		for(i=0;i<n1;i++)
			for(j=0;j<n2;j++)
			fscanf(file_a,"%d",&((int*)fstream_a)[i*n2+j]);
		/*讀入矩陣B,保存在fstream_b中*/	
		for(i=0;i<n2;i++)
			for(j=0;j<n3;j++)
				fscanf(file_b,"%d",&((int*)fstream_b)[i*n3+j]);
	}
	/*將矩陣的行數,列數通過Bcast廣播給所有進程*/
	MPI_Bcast(dim,3,MPI_INT,0,MPI_COMM_WORLD);
	n1=dim[0],n2=dim[1],n3=dim[2];

	/*begin new version*/

	max_rows_a=(n1+root-1)/root;/*子矩陣塊A的行數*/
	max_cols_a=(n2+root-1)/root;/*子矩陣塊A的列數*/
	max_rows_b=max_cols_a;      /*子矩陣塊B的行數*/
	max_cols_b=(n3+root-1)/root;/*子矩陣塊B的列數*/
	buf_A_size=max_rows_a*max_cols_a;/*子矩陣塊A的大小*/
	buf_B_size=max_rows_b*max_cols_b;/*子矩陣塊B的大小*/
	buf_C_size=max_rows_a*max_cols_b;/*子矩陣塊C的大小*/


	/*給A,,buf_A,buf_B,B,C分配內存空間,其中buf_A,buf_B用於通訊中的緩存*/
	A=(int*)malloc(sizeof(int)*buf_A_size);
	buf_A=(int*)malloc(sizeof(int)*buf_A_size);
	B=(int*)malloc(sizeof(int)*buf_B_size);
	buf_B=(int*)malloc(sizeof(int)*buf_B_size);
	C=(int*)malloc(sizeof(int)*buf_C_size);
	if(A==NULL||buf_A==NULL||B==NULL||buf_B==NULL||C==NULL)
	{
		/*如果內存申請失敗,就退出*/
		printf("Memory allocation failed!\n");
		exit(-1);
	}

	/*proc 0 scatter A,B to other procs in a 2D block distribution fashion*/
	if(myid==0){
		/*printf("max_rows_a:%d\n",max_rows_a);
		printf("max_cols_a:%d\n",max_cols_a);
		printf("max_rows_b:%d\n",max_rows_b);
		printf("max_cols_b:%d\n",max_cols_b);*/
		/*進程0 將矩陣A,B划分成小塊,分發給其他進程*/
		scatter_matrix((int*)fstream_a,n1,n2,A,root,100);
		/*printf("I am debuging!\n");*/
		scatter_matrix((int*)fstream_b,n2,n3,B,root,101);
		/*printf("I am finding fault!\n");*/
	}else{
		/*其他進程接收來自進程0 發送的矩陣A,B*/
		MPI_Recv(A,max_rows_a*max_cols_a,MPI_INT,0,100,MPI_COMM_WORLD,&status);
		MPI_Recv(B,max_rows_b*max_cols_b,MPI_INT,0,101,MPI_COMM_WORLD,&status);
	}

	MPI_Barrier(MPI_COMM_WORLD);/*等待全部進程完成數據接收工作。*/

	/*printf("I am proc %d\n",myid);
	for(i=0;i<max_rows_a;i++){
		printf("%d:      ",myid);
		for(j=0;j<max_cols_a;j++){
			printf("%d ",A[i*max_cols_a+j]);
		}
		printf("\n");
	}
	printf("I am proc %d\n",myid);
	for(i=0;i<max_rows_b;i++){
		printf("%d:      ",myid);
		for(j=0;j<max_cols_b;j++){
			printf("%d ",B[i*max_cols_b+j]);
		}
		printf("\n");
	}*/

	
	/*compute C=A*B by Cannon algorithm*/
	/*矩陣塊必須定位對齊,先做預處理*/	
	shuffle(A,buf_A,buf_A_size,B,buf_B,buf_B_size,root,myid);
	elapsed_time=MPI_Wtime();
	/*包含cannon全部內容*/
	cannon(A,buf_A,buf_A_size,B,buf_B,buf_B_size,
		C,buf_C_size,max_rows_a,max_cols_a,max_cols_b,root,myid);
	MPI_Barrier(MPI_COMM_WORLD);
	elapsed_time=MPI_Wtime()-elapsed_time;/*統計cannon算法實際耗時*/
	

	MPI_Barrier(MPI_COMM_WORLD);/*等待所有進程完成cannon算法,將結果發送給進程0*/


	int fsize_c=sizeof(int)*n1*n3;
	if(myid==0){
		/*進程0創建文件寫句柄,准備將計算結果寫入文件中*/
		if(!(fhc=fopen(argv[3],"w"))){
			printf("Cant't open file %s\n",argv[3]);
			MPI_Finalize();
		}
		fstream_c=(int*)malloc(fsize_c);
		/*進程0 接收來自各個進程的結果矩陣,拼接成一個完整的結果,寫入文件,持久化數據結果*/
		gather_matrix(fstream_c,n1,n3,C,root,fhc);
	}
	
	MPI_Barrier(MPI_COMM_WORLD);    /*make sure proc 0 read all it needs*/

	if(myid==0){
		int i,j;
		printf("Cannon algorithm :multiply a %d* %d with a %d*%d, use %lf(s)\n",
			n1,n2,n2,n3,elapsed_time);
		/*printf("I have finished!\n");*/
		fclose(fhc);/*關閉文件讀寫句柄*/
		/*釋放申請的內存空間*/
		free(fstream_a);
		free(fstream_b);
		free(fstream_c);
	}

	/*釋放申請的內存空間*/
	free(A);free(buf_A);
	free(B);free(buf_B);
	free(C);
	MPI_Finalize();
	return 0;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM