如何進行並行編程:從並行矩陣運算開始


矩陣計算

矩陣計算問題有很多種類型,例如:

求解線性代數方程組 Ax = b

線性最小二乘問題 given b in R^m, for x in R^n,minimize ||Ax - b||^2

矩陣特征值問題 Ax = λx

矩陣奇異值分解 A = U∑V^T

 很多矩陣計算問題都有並行的計算方法,例如矩陣乘法,我們現在來學習並且用代碼實現他們,從而更深地理解並行計算的思想。

 

並行矩陣乘法

    並行計算,就是多個進程並行協作,完成特定的任務。現在我們假定一個並行系統,包含了p個處理機,每個處理機一個進程,我們可以分別用字符“0”,“1”,...,“p-1”來引用它們,或者為了清晰,我們用 Pi 來引用它們,i 表示一個進程的進程號,進程之間可以相互傳遞消息,所謂消息,指的是一個數據結構。設用x引用一個消息,那么函數 send(x, i)表示當前進程(處理機)把消息x發送給序號等於i的進程,recv(x, i)表示當前進程從序號為i的進程接收消息x。在並行編程中,我們用程序代碼定義好一個過程,每個進程都將運行這段程序代碼定義的過程,也就是說,代碼必須是通用的。並行編程的是一個定義各個局部之間相互關系的過程,全局的發展方向不僅僅依賴於局部與局部之間的這些關系,而且對初始狀態也是極其依賴的,類似於數學和物理學中的混沌現象,我們需要選擇合適的初始狀態和局部之間的相互作用關系,使得整體得到希望的發展方向。

    接下來我們來說明用這p個處理機來進行並行算法。給出兩個矩陣A和B,要C = A×B,我們可以用矩陣乘法的標准定義,還可以用分塊矩陣來先進行變換。

算法1:行列划分算法

    我們對A進行行分塊,對B進行列分塊:

    這兩個矩陣的子矩陣分別兩兩相乘得到 Ai×Bj = Ci,j ,我們可以得到p×p個矩陣,這些矩陣拼接起來,就得到了結果C,亦即,C = [Ci,j]p×p。我們可以用舉世聞名的數學歸納法來證明如此分塊的正確性,不過這並不是本文重點,不再贅述。

由此我們得到如下偽代碼:

PROCEDURE 1
INPUT: A , B . A and B are both in processor 0.
OUTPUT: C = A × B in processor 0.
BEGIN:
#數據散發階段,進程0把矩陣A和B分發給各個進程
if myid == 0:   for i in range(1,p):     send(Ai, i), send(Bi, i)   barrier() else:   recv(Amyid, 0), recv(Bmyid, 0)   barrier()
#並行計算各個分塊矩陣的階段 left
= (myid - 1)%p , right = (myid + 1)%p for i in range(0, p):   k = (i + myid)%p   C[myid][k] = Amyid * Bk   if i != p-1 :     send(Bk,left), recv(Bk+1,right)
#聚集階段,每個節點把自己的計算結果發送給進程0
if myid == 0:   for i in range(1, p):     j = (i - 1)%p     start_new_thread(recv, (C[j],i)) else:   j = (myid - 1)%p   send(C[j],0) return C END.   

 

   在初始狀態時,矩陣A和B都在節點0,我們通過數據散發操作使得通信器達到這樣的狀態:我們用數字 i 來引用通信器在每次循環通信的狀態,用數字 k 引用在此狀態下進程在自己的內存中保留着的分塊矩陣 Ak 和 Bk。 Ai 和 Bi相乘可以得到C[i][i],計算和循環通信完成后,B的分塊矩陣在節點中完成一次輪換,通信器進入下一個狀態( 用數字 i+1 來引用這個狀態),繼續計算得到C[i][i+1],直到計算出每個C的分塊,最后每個進程把自己計算出來的分塊發送給節點0,完成拼接得到C。

    考慮這個算法的計算時間,如果是串行執行乘法,需要的時間是 O(mnk),p個處理機同時計算的計算時間當然就是O(mnk/p),注意計算總量仍舊是不變的。

    再考慮通信時間。串行算法沒有通信時間,對於並行算法,忽略每次通信的啟動時間,數據散發階段的通信的時間復雜度O( (m-m0)k+(n-n0)k ),再計算階段,共要完成p-1次循環傳輸,每次總共傳輸nk個數據,時間復雜度O( (p-1)nk ),聚集階段的時間復雜度O( (m-m0)n )。再沒有采用多線程同時進行計算和傳輸的情況下,算法的運行時間復雜度就是計算時間與通信時間的和 O( mnk/p + (m-m0)k+(n-n0)k + (p-1)nk + (m-m0)n )。

    這個算法是否真的更快,就有待實踐檢驗了。當然上述算法還有很多可以優化的地方,這個算法只是一個思路。比如由於矩陣B已經存在於0引用的那個節點,所以不需要把節點0納入循環;每個節點計算矩陣的過程中,可以每次計算完一列,就可以采用非阻塞通信或者多線的方式,把這一列發送出去,從而同時進行計算和傳輸。

 

MPI來實現行列划分算法

MPI是一組接口標准,調用這些接口可以實現並行編程中的各種動作,如初始化或者結束並行編程環境,進程間傳遞消息等。

通信器提供進程間通信的基本環境,MPI 程序中所有通信都必須在特定的通信器中完成。 MPI 環境在初始化時(也就是會設置一些全局變量)會自動創建兩個通信器,一個稱為 MPI_COMM_WORLD,它包含程序中的所有進程,另一個稱為 MPI_COMM_SELF,它是每個進程獨自構成的、僅包含自己的通信器。MPI 系統提供了一個特殊進程號 MPI_PROC_NULL,它代表空進程 (不存在的進程),與 MPI_PROC_NULL 進行通信相當於一個空操 作,對程序的運行沒有任何影響。具體的API可以查閱書籍或者看 http://www.cnblogs.com/xinchrome/p/4859119.html

下面上代碼:

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define M 1000
#define N 1000
#define K 1000

/* 職責:讀取數據到矩陣中 */
void read_to_mat(int mat[][],int m,int n){
  int i = 0,j = 0;
    char filename[80];
    FILE *fp = NULL;
  fscanf(stdin,"%s",filename);
  fp = fopen(filename,"r");
  for(i = 0;i < m;i++){
    for(j = 0;j < n;j++){
      if(EOF == fscanf(fp,"%d",&mat[i][j])){
                printf("[ERROR] - filename: %s",filename);
        exit(1);
            }
    }
  }
    fclose(fp);
  return;
}

void write_to_file(int mat[][],int m,int n){
    char filename[] = "result.txt"
    FILE *fp = fopen(filename,"w");

    for(int i = 0;i < m;i++){
        if(i > 0)
            fprintf(fp,"\n");
        for(int j = 0;j < n;j++){
            fprintf(fp,"%d",mat[i][j]);
            if(j != n-1)
                fprintf(fp,"\t");
        }
    }
    fclose(fp);
    return;
}

void scatter(int mat[][],int src,int size,int displs[],int sendcnts[]){
    int myid = 0;
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);
    if(myid == src){
        for(int dest = 0;dest < size;dest++){
            if(dest != src){
                for(int j = displs[dest];j < displs[dest] + sendcnts[dest];j++){
                    MPI_Bsend(mat[j],K,MPI_INT,dest,0,MPI_COMM_WORLD);
                }
            }
        }
    }
    else{
        for(int j = displs[myid];j < displs[myid] + sendcnts[myid];j++){
            MPI_Recv(mat[j],K,MPI_INT,src,0,MPI_COMM_WORLD,MPI_STATURS_IGNORE);
        }
    }
    return;
}

typedef struct{
    int displs[];
    int sendcnts[];
    int mat[][];
    int src;
} RECV_ARG;

void receive(RECV_ARG *arg){
    for(int j = arg->displs[arg->src];j < arg->displs[arg->src] + arg->sendcnts[arg->src];j++){
        MPI_Recv(arg->mat[j],K,MPI_INT,arg->src,0,MPI_COMM_WORLD,MPI_STATURS_IGNORE);
    }
    return;
}

void gather(int mat[][],int dest,int size,int displs[],int sendcnts[]){
    int myid = 0;
    pthread_t tid = 0;
    RECV_ARG * arg = (RECV_ARG *)malloc(sizeof(RECV_ARG));
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);
    if(myid == dest){
        for(int src = 0;src < size;src++){
            if(src != dest){
                arg->displs = displs;
                arg->sendcnts = sendcnts;
                arg->mat = mat;
                arg->src = src;
                pthread_create(&tid,NULL,receive,(RECV_ARG *)&arg);
            }
        }
    }
    else{
        for(int j = displs[myid];j < displs[myid] + sendcnts[myid];j++){
            MPI_Send(mat[j],K,MPI_INT,src,0,MPI_COMM_WORLD);
        }
    }
    free(arg);
    return;
}

int main(int argc, char *argv[]){
  int myid = 0, numprocs = 0,
        A[M][K],B[N][K],C[M][N],/* 變量B存儲了真實矩陣的轉置 */
        sendcnts_row[],sendcnts_col[],displs_row[],displs_col[],
        *buf = NULL;
        pack_size = 0,buf_size = 0,
        left = 0, right = 0;

    /* 起始階段,把兩個矩陣分塊 */
  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD,&myid);
  MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    if(numprocs < 2){
        printf("[ERROR] - numprocs = %d", numprocs);
        exit(2);
    }
    sendcnts_row = (int *)malloc(sizeof(int)*numprocs);
    sendcnts_col = (int *)malloc(sizeof(int)*numprocs);    
    for(int i = 0;i < numprocs;i++){
        if(i == 0){
            displs_row[i] = 0;
            displs_col[i] = 0;
            sendcnts_row[i] = (M/numprocs) + M%numprocs;
            sendcnts_col[i] = (N/numprocs) + N%numprocs;
        } else {
            displs_row[i] = i * (M/numprocs) + M%numprocs;
            displs_col[i] = i * (N/numprocs) + N%numprocs;
            sendcnts_row[i] = M/numprocs;
            sendcnts_col[i] = N/numprocs;
        }
    }
    MPI_Pack_size(M*K+K*N,MPI_INT,&pack_size);
    buf_size = (M + N) * MPI_BSEND_OVERHEAD + pack_size;
    buf = (int*)malloc(sizeof(int)*buf_size);

  /* 數據散發階段,形成通信器的初始狀態 */
    MPI_Buffer_attach(buf,buf_size);
    if(myid == 0){
        read_to_mat(A,M,K);    
        read_to_mat(B,N,K);/* 把真實矩陣映射為轉置矩陣 */
    }
    scatter(A,0,numprocs,displs_row,sendcnts_row);
    scatter(B,0,numprocs,displs_col,ndcnts_col);
    MPI_Buffer_detach(&buf,&buf_size);

    /* 計算並循環通信階段,i的值引用了通信器的每個狀態 */
    MPI_Buffer_attach(buf,buf_size);
    left = (myid - 1) % numprocs;
    right = (myid + 1) % numprocs;
    for(int i = 0;i < numprocs;i++){
        int p = (i + myid) % numprocs;
        int left_p = (p - 1) % numprocs;
        for(int n = displs_col[p];n < displs_col[p] + sendcnts_col[p];n++){
            for(int m = displs_row[myid];m < displs_row[myid] + sendcnts_row[myid];m++){
                int sum = 0;
                for(int k = 0;k < K;k++){
                    sum += A[m][k] + B[n][k];
                }
                C[m][n] = sum;
            }
            if(i != numprocs - 1)
                MPI_Bsend(B[n],K,MPI_INT,right,0,MPI_COMM_WORLD);/* 一邊計算一邊發送 */
        }
        if(i != numprocs - 1){
            for(int col = displs_col[left_p];col < displs_col[left_p] + sendcnts_col[left_p];col++){
                MPI_Recv(B[col],K,MPI_INT,left,0,MPI_COMM_WORLD,MPI_STATURS_IGNORE);
            }
        }
    }
    MPI_Buffer_detach(&buf,&buf_size);

    /* 聚集階段 */
    MPI_Buffer_attach(buf,buf_size);
    gather(C,0,numprocs,displs_row,sendcnts_row);
    MPI_Buffer_detach(&buf,&buf_size);

    if(myid ==0)
        write_to_file(C,M,N);

    free(buf);
  MPI_Finalize();
  return 0;
}

/* 實際的代碼和偽代碼有一些區別,
* 實際代碼用B存儲實際矩陣的轉置,
* 實際代碼采用一邊計算一邊傳輸的方式,
* 循環中消息也是從小序號傳遞到大序號,
* 從而使得每個狀態中,狀態的號碼等於最大的分塊所在的進程號 */

 

 

其他的並行矩陣乘法

 

列行划分算法

這種算法在每個處理機上得到了相同規模的矩陣,最后將它們全部疊加起來得到C。另外如果處理機數目非常多,在最后聚集的時候可以采用樹形聚集的方式,實現並行聚集求和,這樣可以加快減少求和的時間,當然,這種方式也加大了復制傳輸數據的開銷。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM