MPI學習筆記
研究課題需要用到並行化,這里記錄學習筆記
MPI准備
概述
MPI(Message Passin Interface 消息傳遞接口)是一種消息傳遞編程模型,是一個庫。
MPI是一種標准或規范的代表,並不特指某一個對它具體實現。
目的:服務於進程間通信
前置知識補充
消息傳輸:從一個處理器的內存拷貝到另一個處理器內存的方式。
在分布式存儲系統中,數據通常以消息包的形式通過網絡從一個處理器發送到另一個處理器。
消息包 = 消息頭控制信息 + 消息體數據信息
舉例
序號:進程的標識,唯一
進程組:一個MPI程序的全部進程集合的一個有序子集,進程組中每個進程都被賦予了再該組中唯一的序號(rank),用於在該組中標識該進程。
tong'xin'yu
環境部署
三台虛擬機均為CentOS8,一台作為控制節點,另外兩台作為計算節點。
1.修改IP及主機名
1.三台主機已經修改在同一個網關
2.設置了靜態ip
MPI0 192.168.10.110 控制節點
MPI1 192.168.10.111 計算節點
MPI2 192.168.10.112 計算節點
3.做好了主機名和ip地址的映射關系
2.關閉防火牆
為了mpi運行成功,盡可以能降低通信延遲和系統開銷,關閉防火牆達到最高的效率
關閉firewalld
[ranan@c105 ~]$ sudo systemctl stop firewalld //關閉防火牆
[ranan@c105 ~]$ sudo systemctl disable firewalld //設置防火牆不自啟
關閉selinux
1.暫時關閉
setenforce 0
2.永久性關閉selinux
編輯selinux的配置文件/etc/sysconfig/selinux
,把SELINUX設置成disabled,然后重啟生效
[ranan@c105 ~]$ sudo vim /etc/sysconfig/selinux
[ranan@c105 ~]$ cat /etc/sysconfig/selinux
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these three values:
# targeted - Targeted processes are protected,
# minimum - Modification of targeted policy. Only selected processes are protected.
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
3.實現免密碼SSH登錄
4.配置MPI運行環境
三台主機都需要配置
上傳包到/opt/software
[root@c105 software]# rz
rz waiting to receive.
zmodem trl+C ȡ
正在傳輸 mpich-3.1.3.tar.gz...
100% 11218 KB 11218 KB/ 00:00:01 0
[root@c105 software]# ll
總用量 11220
-rw-r--r--. 1 root root 11487313 11月 17 16:48 mpich-3.1.3.tar.gz
解壓包到/opt/module/
[root@c105 software]#tar -xvzf mpich-3.1.3.tar.gz -C /opt/module/
執行配置操作,作用是對即將安裝的軟件進行配置,檢查當前的環境是否滿足要安裝軟件的依賴關系。參數:–prefix=PREFIX 表示把所有文件裝在目錄PREFIX下而不是默認目錄下。本系統安裝目錄為/home/mpi。配置成功后,最后一行提示顯示“Configuration completed”。
[ranan@c105 mpich-3.1.3]$sudo ./configure --prefix=/home/mpi
報錯信息:Incompatible Fortran and C Object File Types!
解決辦法:sudo yum install gcc gcc-gfortran
[ranan@c105 mpich-3.1.3]$ make
[ranan@c105 mpich-3.1.3]$ make install
注意點:
step1 ./configure
step2 make
step3 make install
配置環境變量/home/mpi
[ranan@c105 ~]$ vim ~/.bashrc
export PATH=/home/mpi/bin:$PATH //與原來使用:進行拼接
export INCLUDE=/home/mpi/include:$INCLUDE
export LD_LIBRARY_PATH=/home/mpi/lib:$LD_LIBRARY_PATH
[ranan@c105 ~]$ source .bashrc //更新環境配置
測試是否配置成功
which mpicc
5.測試
復制測試例子到/home/mpi目錄下,修改/home/mpi文件權限
[root@c106 mpich-3.1.3]# cp -r examples /home/mpi
[root@c106 mpich-3.1.3]# chown -R ranan:ranan /home/mpi
單節點測試
現在是c106節點機
[ranan@c106 ~]$ cd /home/mpi/examples/
[ranan@c106 examples]$ m
Process 0 of 6 is on c106
Process 1 of 6 is on c106
Process 2 of 6 is on c106
Process 4 of 6 is on c106
Process 5 of 6 is on c106
Process 3 of 6 is on c106
pi is approximately 3.1415926544231239, Error is 0.0000000008333307
wall clock time = 0.019008
多節點測試
測試6個進程再不同權重的節點機上運行
[ranan@c106 examples]$ vim nodes
[ranan@c106 examples]$ cat nodes
c105:3
c106:2
c107:1
[ranan@c106 examples]$ mpirun -np 6 -f nodes ./cpi
Process 3 of 6 is on c106
Process 0 of 6 is on c105
Process 5 of 6 is on c107
Process 4 of 6 is on c106
Process 1 of 6 is on c105
Process 2 of 6 is on c105
pi is approximately 3.1415926544231243, Error is 0.0000000008333312
wall clock time = 0.005398
程序的執行
編譯語句
c
gcc編譯器
mpicc -O2(優化選項) -o(生成可持續文件) heeloworld(編譯成的執行文件名) helloworld.c(被編譯的源文件)
c++
gcc編譯器
mpicxx -O2(優化選項) -o heeloworld helloworld.c
運行語句
mpi普通程序運行執行
mpirun(mpiexec) -np 產生的進程數 可執行文件
集群mpi上運行
集群作業調度系統特定參數 + 可執行文件
mpirun -np 產生的進程數 -f 集群配置文件 ./可執行文件
集群配置文件的格式
ip地址:進程個數
ip地址:進程個數
...
MPI編程
C語言中的頭文件 #include "mpi.h"
4個基本函數
MPI_Init(int *argc,char ***argv)
完成MPI程序初始化工作,通過獲取main函數的參數,讓每一個MPI程序都能獲取到main函數
MPI_Comm_rank(MPI_Comm comm,int *rank)
用於獲取調用進程在給定的通信域中的進程標識號。默認一個最大通信域word。
通信域中的序號是有序的
假設一個通信域中有p個進程,編號為0到p-1,利用進程的序號來決定負責計算數據集的哪一個部分。
MPI_Comm_size(MPI_Comm comm,int *size)
返回給定的通信域中所包含的進程總數
MPI_Finalize(void)
MPI程序的最后一個調用,清除全部MPI環境
MPI點對點通信函數
MPI的通信機制是在一對進程之間傳遞數據,稱為點對點通信
MPI提供的點對點通信數據傳輸有兩種機制
- 阻塞:等消息從本地發出之后,才進行執行后續的語句
- 非阻塞:不需要等待,實現通信與計算的重疊
非阻塞MPI_Send/MPI_Recv
MPI_Send用於發送方
MPI_Send(void*buf,int count,MPI_Datatype datatype,int dest,int tag,MPI_Comm comm)
- buf 發送的數據緩存區的起始地址
- count 需要發送數據的個數
- datatype 需要發送的數據類型
- dest 目的進程的標識號
- tag 消息標志為tag
- comm 進程所在的域
MPI_Recv用於接收方
MPI_Recv(void*buf,int count,MPI_Datatype datatype,int source,int tag,MPI_Comm comm,MPI_Status *status)
從comm通信域中標識號為source的進程,接受消息標記為tag,消息數據類型為datatype,個數為count的消息並存儲在buf緩沖區中,並將該過程的狀態信息寫入status中
在C語言中,status是一個結構體,包含了MPI_SOURCE(數據來源進程標識號),MPI_TAG(消息標記),MPI_ERROR
在Fortran語言中,status是一個整型的數組
阻塞MPI_Isend
Isend
int MPI_Isend(void*buf,int count,MPI_Datatype datatype,int dest,int tag,MPI_Comm comm,MPI_Request *request)
Irecv
int MPI_Irecv(void*buf,int count,MPI_Datatype datatype,int dest,int tag,MPI_Comm comm,MPI_Request *request)
MPI集合通信函數
1-n/n-1
集合通信還包括一個同步操作Barrier,所有進程都到達后才繼續執行
MPI_Bcast(void *buffer,int count,MPI_Datatype datatype,int root,MPI_Comm comm)
從指定的一個根進程中把相同的數據廣播發送給組中的所有其他進程
MPI_Scatter(void send_data,int send_count,MPI_Datatype send_datatype,void recv_data,int recv count,MPI_Datatype recv_datatype,int root,MPI_Comm communicator)
把指定的根進程中的數據分散發送給組中的所有進程(包括自己)
MPI_Gather(void *sendbuf,int sent_count,MPI_Datatype send_datatype,void *recv_data,int recv_count,MPI_Datatype recv_datatype,int root,MPI_Comm communicator)
在組中指定一個進程收集組中進程發來的消息,這個函數操作與MPI_Scatter函數操作相反
所有進程調用該函數,把指定位置的數據發送給根進程的指定位置
MPI_Reduce(void *send_data,void *recv_data,int count,MPI_Datatype datatype,MPI_Op op, int root,MPI_Comm communicator)
在組內所有的進程中,執行一個規約操作(算術等),並把結果存放在指定的一個進程中
舉例
n-n
常用函數
計時函數
double MPI_Wtime(void)
功能:返回某一時刻到調用時刻經歷的時間(s)
案例
double start_time,end_time,total_time; //初始化
start_time = MPI_Wtime();
//需要計時的部分
end_time = MPI_Wtime();
total_time = end_time - start_time
Printf("It looks %f seconds\n",total_time);
獲得本進程的機器名函數
int MPI_Get_processor_name(char name,intresultlen)
name為返回的機器名字符串,resultlen為返回的機器名長度
//MPI_MAX_PROCESSOR_NAME代表MPI中允許機器名字的最大長度
int resultlen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processor_name,&resultlen);//resultlen存放長度
cout"當前運行的機器" <<processor_name<<endl;
測試案例
send
home/mpi/example/send_test.cpp
#include <stdio.h>
#include <iostream>
#include "mpi"
using namespace std;
/*從0號進程發送信息,其他進程接受信息*/
int main(int argc,char ** argv)
{
int rank;//記錄進程標識號
int size;//記錄通訊域中的進程個數
int senddata,recvdata;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Status status;
recvdata=0;
if(rank==0){ //發送數據的進程
for(int i=1;i<size;i++){
senddata=i+1000;
MPI_Send(&senddata,1,MPI_INT,i,i,MPI_COMM_WORLD);
}
}
if(rank!=0){ //接受數據的進程
MPI_Recv(&recvdata,1,MPI_INT,0,rank,MPI_COMM_WORLD,&status);
cout << "進程 " << rank <<"從 "<< status.MPI_SOURCE <<"接收信息,tag="<<status.MPI_TAG <<" and data=" <<recvdata<<endl;
}
MPI_Finalize();
return 0;
}
編譯運行
[ranan@c106 examples]$ mpicxx -o testmpi send_test.cpp
[ranan@c106 examples]$ xsync testmpi //把執行程序分發給c105 c107
[ranan@c106 examples]$ mpirun -np 6 -f nodes ./testmpi
MPI_Scatter 與 MPI_Gather
#include "stdio.h"
#include "mpi.h"
#include "stdlib.h"
#include <iostream>
#include <vector>
using namespace std;
const int N=2; //每個進程接受兩個數據
int main(int argc,char ** argv){
int size,rank; //size記錄總進程數,rank記錄當前進程的標識號
int *send; //每個進程的發送緩存區
int *recv; //每個進程的接受緩存區
int *result; //接受其他進程發送過來的數據
int send_data[N]; //其他進程發送的數據;
int i=0;
int j=0;
int resultlen; //記錄機器名的長度
char processor_name[MPI_MAX_PROCESSOR_NAME]; //運行的機器名
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank); //獲得當前進程號
MPI_Comm_size(MPI_COMM_WORLD,&size); //獲得總進程數,假設4個
MPI_Get_processor_name(processor_name,&resultlen);
recv = (int*) malloc(N*sizeof(int));
for(;i<N;i++){
recv[i] = 0; //原始的數據
}
//開始准備發送的數據
if(rank==0){
send = (int*) malloc(size*N*sizeof(int));
result = (int*) malloc(size*N*sizeof(int));
for(;j<N*size;j++){
send[j]=j+6; //6,7,8,9,10,11,12,13 ,大於等於10返回1,小於10返回0
}
}
//開始進行任務分發
MPI_Scatter(send,N,MPI_INT,recv,N,MPI_INT,0,MPI_COMM_WORLD); //0號進程分發
//輸出分發結果
cout << "-----------------------"<<endl;
cout << "進程號"<<rank<<"當前運行的機器" <<processor_name<<endl;
for(i=0;i<N;i++){
cout << "傳輸后的recv["<<i<<"] = " <<recv[i]<<endl;
//數據處理
send_data[i] = (recv[i]>=10)?1:0;
cout << "當前值:" <<recv[i]<<",對應的send_data = "<< send_data[i] <<endl;
}
cout << "-----------------------"<<endl;
MPI_Gather(send_data,N,MPI_INT,result,N,MPI_INT,0,MPI_COMM_WORLD); //0號進程接受
//MPI_Barrier調用函數時進程將處於等待狀態,直到通信子中所有進程 都調用了該函數后才繼續執行。
MPI_Barrier(MPI_COMM_WORLD); //等待所有進程傳送數據結束。
//輸出匯聚的結果
if(rank==0){
for(j=0;j<N*size;j++){
cout << "result["<<j<<"]="<<result[j]<<endl;
}
}
MPI_Finalize();
return 0;
}