版權聲明:本文為博主原創文章,未經博主允許不得轉載。
1.概述
MPI(Message Passing Interface),消息傳遞接口,是一個標准化和輕便的能夠運行在各種各樣並行計算機上的消息傳遞系統。消息傳遞指的是並行執行的各個進程擁有自己獨立的堆棧和代碼段,作為互不相關的多個程序獨立執行,進程之間的信息交互完全通過顯示地調用通信函數來完成。
mpi4py是構建在MPI之上的Python非官方庫,使得Python的數據可以在進程之間進行傳遞。
2.MPI執行模型
並行程序是指一組獨立、同一的處理過程;
-
所有的進程包含相同的代碼;
-
進程可以在不同的節點或者不同的計算機;
-
當使用Python,使用n個Python解釋器;
mpirun -np 32 python parallel_script.py
並行執行模型如下所示,
2.1 MPI基本概念
rank:給予每個進程的id;
- 可通過rank進行查詢;
- 根據rank,進程可以執行不同的任務;
Communicator:包含進程的群組;
- mpi4py中基本的對象,通過它來調用方法;
- MPI_COMM_WORLD,包含所有的進程(mpi4py中是MPI.COMM_WORLD);
2.2 數據模型
所有的變量和數據結構都是進程的局部值;
進程之間通過發送和接收消息來交換數據;
2.3 使用mpi4py
from mpi4py import MPI
comm = MPI.COMM_WORLD #Communicator對象包含所有進程
size = comm.Get_size()
rank = comm.Get_rank()
print "rank = %d,size = %d"%(rank,size)
2.4 安裝mpi4py
3.工作方式
工作方式主要有點對點和群體通信兩種;點對點通信就是一對一,群體通信是一對多;
3.1 點對點
example 1
點對點發送Python內置dict對象;
#Broadcasting a Python dict
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {"a":7,"b":3.14}
comm.send(data,dest = 1,tag = 11)
print "send data = ",data
elif rank == 1:
data = comm.recv(source = 0,tag = 11)
print "recv data = ",data
任意的Python內置對象可以通過send和recv進行通信,目標rank和源rank和tag都要互相匹配;
send(data,dest,tag)
- data,待發送的Python內置對象;
- dest,目標rank;
- tag,發送消息的id;
recv(source,tag)
- source,源rank;
- tag,發送消息的id;
example 2
點對點發送Python內置dict對象,非阻塞通信;
#point to point communication Python objects with non-blocking communication
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank ==0:
data = {"a":7,"b":3.14}
req = comm.isend(data,dest = 1,tag = 11)
req.wait()
print "send data = ",data
elif rank == 1:
req = comm.irecv(source = 0,tag = 11)
data = req.wait()
print "recv data = ",data
example 3
發送Numpy數組;
#point to point communication Python objects Numpy arrays
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# automatic MPI datatypes discovery
if rank == 0:
data = np.arange(100,dtype = np.int)
comm.Send(data, dest = 1,tag = 13)
print "send data = ",data
elif rank == 1:
data = np.empty(100,dtype = np.int)
comm.Recv(data, source = 0,tag = 13)
print "recv data = ",data
當發送消息時,任意的Python對象轉換為字節流;
當接收消息時,字節流被轉換為Python對象;
Send(data,dest,tag),Recv(data,source,tag),連續型數組,速度快;
send(data,dest,tag),recv(source,tag),Python內置對象,速度慢;
3.2 群體通信
群體通信分為發送和接收,發送是一次性把數據發給所有人,接收是一次性從所有人那里回收結果;
example 1
root進程新建data dict,然后將data數據廣播給所有的進程,這樣所有的進程都擁有這個data dict;
#Broadcasting a Python dict
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {"key1":[7,2.72,2+3j],"key2":("abc","xyz")}
else:
data = None
data = comm.bcast(data,root = 0)
print "rank = ",rank," data = ",data
example 2
root進程新建了一個list,然后將它散播給所有的進程,相當於對這個list做了划分,每個進程獲得等分的數據,這里就是list中的每一個數字(主要根據list的索引來划分,list索引為第i份的數據就發送給第i個進程),如果是矩陣,那么久等分的划分行,每個進程獲得相同的行數進行處理;
MPI的工作方式是每個進程都會執行所有的代碼,每個進程都會執行scatter這個指令,但是只有root進程執行它的時候,它才兼備發送者和接收者的身份(root進程也會得到數據它自己的那份數據),對於其他進程來說,他們都只是接收者而已;
#Scattering Python objects
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [(i+1)**2 for i in range(size)]
else:
data = None
data = comm.scatter(data,root = 0)
assert data == (rank+1)**2
print "rank = ",rank," data = ",data
example 3
gather是將所有進程的數據收集回來,然后合並成一個列表;
#Gathering Python objects
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = (rank+1)**2
data = comm.gather(data,root = 0)
if rank == 0:
for i in range(size):
assert(data[i] == (i+1)**2)
print "data = ",data
else:
assert data is None
example 4
廣播Numpy數組;
#Broadcasting Numpy array
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = np.arange(100, dtype = 'i')
else:
data = np.empty(100,dtype = 'i')
comm.Bcast(data,root = 0)
for i in range(100):
assert(data[i] == i)
print "rank = ",rank," data = ",data
example 5
散播Numpy數組;
#Scattering Numpy arrays
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
senbuf = None
if rank == 0:
senbuf = np.empty([size,100],dtype = 'i')
senbuf.T[:,:] = range(size)
recvbuf = np.empty(100,dtype = 'i')
comm.Scatter(senbuf,recvbuf,root = 0)
assert np.allclose(recvbuf,rank)
print "rank = ",rank," recvbuf = ",recvbuf
example 6
收集Numpy數組;
#Gathering Numpy array
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendbuf = np.zeros(100, dtype='i') + rank
recvbuf = None
if rank == 0:
recvbuf = np.empty([size, 100], dtype='i')
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
for i in range(size):
assert np.allclose(recvbuf[i,:], i)