MPI 和 MPI4PY 的搭建上一篇文章已經介紹,這里面介紹一些基本用法。
mpi4py 的 helloworld
from mpi4py import MPI print("hello world")
mpiexec -n 5 python3 x.py

2. 點對點通信
因為 mpi4py 中點對點的 通信 send 語句 在數據量較小的時候是把發送數據拷貝到緩存區,是非堵塞的操作, 然而在數據量較大時候是堵塞操作,由此如下:
在 發送較小數據時:
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() # point to point communication data_send = [comm_rank]*5 comm.send(data_send,dest=(comm_rank+1)%comm_size) data_recv =comm.recv(source=(comm_rank-1)%comm_size) print("my rank is %d, and Ireceived:" % comm_rank) print(data_recv)

在數據量較大時, 比如發送 :
# point to point communication data_send = [comm_rank]*1000000
這時候就會造成各個進程之間的死鎖。(因為這時候各個進程是堵塞執行,每個進程都在等待另一個進程的發送數據)
修改后的代碼,所有進程順序執行, 0進程發送給1,1接收然后發送給2,以此類推:
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() data_send = [comm_rank]*1000000 if comm_rank == 0: comm.send(data_send, dest=(comm_rank+1)%comm_size) if comm_rank > 0: data_recv = comm.recv(source=(comm_rank-1)%comm_size) comm.send(data_send, dest=(comm_rank+1)%comm_size) if comm_rank == 0: data_recv = comm.recv(source=(comm_rank-1)%comm_size) print("my rank is %d, and Ireceived:" % comm_rank) print(data_recv)
3 群體通信
3.1 廣播bcast
一個進程把數據發送給所有進程
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() if comm_rank == 0: data = range(comm_size) dat = comm.bcast(data if comm_rank == 0 else None, root=0) print('rank %d, got:' % (comm_rank)) print(dat)

發送方 也會收到 這部分數據,當然發送方這份數據並不是網絡傳輸接受的,而是本身內存空間中就是存在的。
3.2 散播scatter
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() if comm_rank == 0: data = range(comm_size) else: data = None local_data = comm.scatter(data, root=0) print('rank %d, got:' % comm_rank) print(local_data)

3.3 收集gather
將所有數據搜集回來
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() if comm_rank == 0: data = range(comm_size) else: data = None local_data = comm.scatter(data, root=0) local_data = local_data * 2 print('rank %d, got and do:' % comm_rank) print(local_data) combine_data = comm.gather(local_data,root=0) if comm_rank == 0: print("root recv {0}".format(combine_data))

3.4 規約reduce
import mpi4py.MPI as MPI comm = MPI.COMM_WORLD comm_rank = comm.Get_rank() comm_size = comm.Get_size() if comm_rank == 0: data = range(comm_size) else: data = None local_data = comm.scatter(data, root=0) local_data = local_data * 2 print('rank %d, got and do:' % comm_rank) print(local_data) all_sum = comm.reduce(local_data, root=0,op=MPI.SUM) if comm_rank == 0: print('sum is:%d' % all_sum)
SUM MAX MIN 等操作在數據搜集是在各個進程中進行一次操作后匯總到 root 進程中再進行一次總的操作。
op=MPI.SUM
op=MPI.MAX
op=MPI.MIN

3.5 對一個文件的多個行並行處理
#!usr/bin/env python #-*- coding: utf-8 -*- import sys import os import mpi4py.MPI as MPI import numpy as np # Global variables for MPI # instance for invoking MPI relatedfunctions comm = MPI.COMM_WORLD # the node rank in the whole community comm_rank = comm.Get_rank() # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster comm_size = comm.Get_size() if __name__ == '__main__': if comm_rank == 0: sys.stderr.write("processor root starts reading data...\n") all_lines = sys.stdin.readlines() all_lines = comm.bcast(all_lines if comm_rank == 0 else None, root = 0) num_lines = len(all_lines) local_lines_offset = np.linspace(0, num_lines, comm_size +1).astype('int') local_lines = all_lines[local_lines_offset[comm_rank] :local_lines_offset[comm_rank + 1]] sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_lines), num_lines)) for line in local_lines: output = line.strip() + ' : process every line here' print(output)

3.6 對多個文件並行處理
#!usr/bin/env python #-*- coding: utf-8 -*- import sys import os import mpi4py.MPI as MPI import numpy as np # Global variables for MPI # instance for invoking MPI relatedfunctions comm = MPI.COMM_WORLD # the node rank in the whole community comm_rank = comm.Get_rank() # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster comm_size = comm.Get_size() if __name__ == '__main__': if len(sys.argv) != 2: sys.stderr.write("Usage: python *.py directoty_with_files\n") sys.exit(1) path = sys.argv[1] if comm_rank == 0: file_list = os.listdir(path) sys.stderr.write("......%d files......\n" % len(file_list)) file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0) num_files = len(file_list) local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int') local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]] sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files)) sys.stderr.write("processor %d has %s files \n"%(comm_rank, local_files))

3.7 聯合numpy對矩陣的多個行或者多列並行處理
import os, sys, time import numpy as np import mpi4py.MPI as MPI # instance for invoking MPI relatedfunctions comm = MPI.COMM_WORLD # the node rank in the whole community comm_rank = comm.Get_rank() # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster comm_size = comm.Get_size() # test MPI if __name__ == "__main__": #create a matrix if comm_rank == 0: all_data = np.arange(20).reshape(4, 5) print("************ data start******************") print(all_data) print("************ data end******************") #broadcast the data to all processors all_data = comm.bcast(all_data if comm_rank == 0 else None, root = 0) #divide the data to each processor num_samples = all_data.shape[0] local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int') #get the local data which will be processed in this processor local_data = all_data[local_data_offset[comm_rank] :local_data_offset[comm_rank + 1]] print("****** %d/%d processor gets local data ****" %(comm_rank, comm_size)) print(local_data) #reduce to get sum of elements local_sum = local_data.sum() all_sum = comm.reduce(local_sum, root = 0, op = MPI.SUM) #process in local local_result = local_data ** 2 #gather the result from all processors and broadcast it result = comm.allgather(local_result) result = np.vstack(result) if comm_rank == 0: print("*** sum: ", all_sum) print("************ result ******************") print(result)

參考文章:
《Python多核編程mpi4py實踐》
