[源碼解析] 深度學習分布式訓練框架 horovod (3) --- Horovodrun背后做了什么
0x00 摘要
Horovod 是Uber於2017年發布的一個易於使用的高性能的分布式訓練框架,在業界得到了廣泛應用。
本系列將通過源碼分析來帶領大家了解 Horovod。本文是系列第三篇,從 python 開始進入 Horovod 世界,看看 Horovodrun 做了什么。
前兩篇鏈接如下:
[源碼解析] 深度學習分布式訓練框架 Horovod (1) --- 基礎知識
[源碼解析] 深度學習分布式訓練框架 horovod (2) --- 從使用者角度切入
0x01 背景知識
首先介紹一些相關背景知識。
1.1 分布式體系
在設計並行計算機時,最直接的方式就是多個計算單元共享一個內存。共享內存的編程在數據交換和訪問上有較大的優勢,程序編寫起來更加簡單。但在擴展性上有較大的瓶頸。
另一種方式為 分布式內存。即每個計算單元有單獨的內存,計算單元之間的數據訪問通過互聯網絡去傳輸。這一架構在可移植性和擴展上會強很多,但消息的傳遞會成為程序設計中的難點。
將這兩點結合,即是分布式共享內存並行計算機的架構,也是當今最常用的體系結構。
1.2 並行任務通信
並行任務通信一般分為P2P(Point-to-point communication)和 Collective communication。
- P2P通信這種模式只有一個sender和一個receiver,即點到點通信。
- Collective communication含多個sender多個receive。
Collective communication包含一些常見的原語
- broadcast
- reduce,allreduce
- scatter,scatter reduce
- gather,allgather
- ring-base collectives
- ring-allreduce
傳統Collective communication假設通信節點組成的topology是一顆fat tree,這樣通信效率最高。但實際的通信topology可能比較復雜,並不是一個fat tree。因此一般用ring-based Collective communication。
1.3 MPI
MPI(Message Passing Interface) 是一種可以支持點對點和廣播的通信協議,具體實現的庫有很多,使用比較流行的包括 Open Mpi, Intel MPI 等等。
MPI 是一種消息傳遞編程模型。消息傳遞指用戶必須通過顯式地發送和接收消息來實現處理器間的數據交換。在這種並行編程中,每個控制流均有自己獨立的地址空間,不同的控制流之間不能直接訪問彼此的地址空間,必須通過顯式的消息傳遞來實現。這種編程方式是大規模並行處理機(MPP)和機群(Cluster)采用的主要編程方式。由於消息傳遞程序設計要求用戶很好地分解問題,組織不同控制流間的數據交換,並行計算粒度大,特別適合於大規模可擴展並行算法。
MPI 是基於進程的並行環境。進程擁有獨立的虛擬地址空間和處理器調度,並且執行相互獨立。MPI 設計為支持通過網絡連接的機群系統,且通過消息傳遞來實現通信,消息傳遞是 MPI 的最基本特色。
1.4 Open-MPI
OpenMPI 是一種高性能消息傳遞庫,最初是作為融合的技術和資源從其他幾個項目(FT-MPI, LA-MPI, LAM/MPI, 以及 PACX-MPI),它是 MPI-2 標准的一個開源實現,由一些科研機構和企業一起開發和維護。因此,OpenMPI 能夠從高性能社區中獲得專業技術、工業技術和資源支持,來創建最好的 MPI 庫。OpenMPI 提供給系統和軟件供應商、程序開發者和研究人員很多便利。易於使用,並運行本身在各種各樣的操作系統,網絡互連,以及一批/調度系統。
1.5 MPI 使用問題
因為MPI是分布式內存編程,在后面的開發中涉及節點間信息的傳遞。往往數據和程序是在多個節點上,所以需要保證執行命令時各節點之間信息的交換。
具體使用之中,就有兩個問題:
- 這個多台機器Open-MPI是如何發現並建立連接的呢?
- 多機多卡在訓練過程中,傳輸環如何建立,這個也是決定了訓練效率,那么Open-MPI如何去做呢?
關於第一個問題:
設置SSH免密登錄可以免去操作中密碼的輸入。各節點生成私鑰和公鑰后需要認證,此時可以保證本機免密登錄。將各個子節點的公鑰文件發送給主節點,然后分別加入到主節點的認證文件中,此時可以保證主節點對各個子節點的免密登錄。最后將認證文件傳回到每個子節點,從而保證各個子節點對其他節點之間的免密登錄。
在 Open-MPI 啟動的時候,可以指定--hostfile或者--host去指定運行要運行任務的 IP 或 Hostname,這樣 Open-MPI 就會試圖通過 ssh 免秘鑰的方式試圖去鏈接對方機器,並執行一系列命令,主要是為了同步環境變量、當前路徑以及下發啟動命令。
當然用戶也可以通過其他方式給遠程機器下發命令,這個可以通過環境變量 OMPI_MCA_plm_rsh_agent指定。
關於第二個問題:
當所有的機器建立好連接了,准備開始計算了,為了能夠最高效的去通信,Open-MPI中集成了組件——hwloc。該組件主要是為了單機硬件資源拓撲構建,進而構建最短路徑通信。
0x02 入口點
很多機器學習框架都會采用如下套路:shell腳本(可選),python端 和 C++端。
- Shell腳本是啟動運行的入口,負責解析參數,確認並且調用訓練程序;
- Python是用戶的接口,引入了C++庫,封裝了API,負責運行時和底層C++交互;
- C++實現底層訓練邏輯;
所以我們先看看 hordovodrun 腳本。
2.1 如何運行
官方給出的 Hovorod 運行范例之一如下:
horovodrun -np 2 -H localhost:4 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist.py
這里 -np 指的是進程的數量,localhost:4表示localhost節點上4個GPU。
注意,如果虛擬機只有一個核。想要強行地達到並行的效果,可以使用 -np參數,它會自動幫你把一個核心切成多份處理器,每一個分布式處理就是一個slot。
因此,我們可以從 horovodrun 這個命令入手看看。
2.2 horovodrun
入口文件可以從 setup.py 看到,其就被映射成 horovod.runner.launch:run_commandline。
entry_points={
'console_scripts': [
'horovodrun = horovod.runner.launch:run_commandline'
]
}
所以我們看看 run_commandline
2.3 run_commandline
該命令位於:horovod-master/horovod/runner/launch.py,我們摘錄重要部分。
def run_commandline():
args = parse_args()
_run(args)
於是進入到 _run 函數。可以看到,Horovod 會依據是否是彈性訓練來選擇不同的路徑。我們在此系列中,會首先分析 非彈性訓練 _run_static。
def _run(args):
# if hosts are not specified, either parse from hostfile, or default as
# localhost
if not args.hosts and not args.host_discovery_script:
if args.hostfile:
args.hosts = hosts.parse_host_files(args.hostfile)
else:
# Set hosts to localhost if not specified
args.hosts = 'localhost:{np}'.format(np=args.np)
# Convert nics into set
args.nics = set(args.nics.split(',')) if args.nics else None
if _is_elastic(args):
return _run_elastic(args)
else:
return _run_static(args) # 我們先看這里
2.4 非彈性訓練 _run_static
在 _run_static 之中做了如下操作:
- 首先解析各種參數,得到 settings;
- 會調用
driver_service.get_common_interfaces獲取網卡以及其他host的信息,依據這些信息會進行slot分配,這部分很復雜,具體我們會有專文講解(下一篇)。 - 這里有一個問題:為什么要得到 host, slot, rank 之間的關系信息?由於工程上的考慮,底層 C++ 世界中對於 rank 的角色做了區分:rank 0 是 master,rank n 是 worker,所以這些信息需要決定並且傳遞給 C++世界;
- 會根據是否在參數中傳遞運行函數來決定采取何種路徑,一般默認沒有運行參數,所以會執行_launch_job 來啟動訓練 job;
具體代碼如下:
def _run_static(args):
settings = hvd_settings.Settings(verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
ssh_identity_file=args.ssh_identity_file,
extra_mpi_args=args.mpi_args,
tcp_flag=args.tcp_flag,
binding_args=args.binding_args,
key=secret.make_secret_key(),
start_timeout=tmout,
num_proc=args.np,
hosts=args.hosts,
output_filename=args.output_filename,
run_func_mode=args.run_func is not None,
nics=args.nics,...)
# 首先解析各種參數,得到 settings
fn_cache = None
if not args.disable_cache:
params = ''
if args.np:
params += str(args.np) + ' '
if args.hosts:
params += str(args.hosts) + ' '
if args.ssh_port:
params += str(args.ssh_port)
if args.ssh_identity_file:
params += args.ssh_identity_file
parameters_hash = hashlib.md5(params.encode('utf-8')).hexdigest()
fn_cache = cache.Cache(CACHE_FOLDER, CACHE_STALENESS_THRESHOLD_MINUTES,
parameters_hash)
# 獲取網卡以及其他host的信息,依據這些信息會進行slot分配
all_host_names, _ = hosts.parse_hosts_and_slots(args.hosts)
remote_host_names = network.filter_local_addresses(all_host_names)
nics = driver_service.get_common_interfaces(settings, all_host_names,
remote_host_names, fn_cache)
if args.run_func:
# get the driver IPv4 address
driver_ip = network.get_driver_ip(nics)
run_func_server = KVStoreServer(verbose=settings.verbose) # 啟動內部KV服務器
run_func_server_port = run_func_server.start_server()
put_data_into_kvstore(driver_ip, run_func_server_port,
'runfunc', 'func', args.run_func) # 把'func', args.run_func存儲成KV
command = [sys.executable, '-m', 'horovod.runner.run_task', str(driver_ip), str(run_func_server_port)]
try:
_launch_job(args, settings, nics, command)
results = [None] * args.np
for i in range(args.np):
results[i] = read_data_from_kvstore(driver_ip, run_func_server_port,'runfunc_result', str(i))
return results
finally:
run_func_server.shutdown_server()
else:
command = args.command
_launch_job(args, settings, nics, command) # 我們重點講解這里
return None
目前邏輯如下:
+-----------+
|horovodrun |
+-----+-----+
|
|
v
+--------+--------+
| run_commandline |
+----+------+-----+
| |
+---------+ +--------+
| |
| |
v v
+-----+--------+ +----+--------+
| _run_elastic | | _run_static |
| | | |
+--------------+ +-------------+
至此,我們已經分析完成 horovod 的入口,下面會分析具體如何啟動 Job。
0x03 運行訓練 Job
3.1 _launch_job
_launch_job 會根據配置或者安裝情況來進行具體調用。我們看到有三種可能:gloo, mpi, js。
jsrun的資料很難找,所以我們重點看看 gloo, mpi 這兩種。
def _launch_job(args, settings, nics, command):
env = os.environ.copy()
config_parser.set_env_from_args(env, args)
def gloo_run_fn():
driver_ip = network.get_driver_ip(nics)
gloo_run(settings, nics, env, driver_ip, command)
def mpi_run_fn():
mpi_run(settings, nics, env, command)
def js_run_fn():
js_run(settings, nics, env, command)
run_controller(args.use_gloo, gloo_run_fn,
args.use_mpi, mpi_run_fn,
args.use_jsrun, js_run_fn,
args.verbose)
3.2 run_controller
run_controller 依然是一個中介函數,具體導入 gloo 或者 mpi。
def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity):
if use_gloo:
gloo_run()
elif use_mpi:
mpi_run()
elif use_jsrun:
js_run()
else:
if mpi_built(verbose=verbose):
if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
js_run()
else:
mpi_run()
elif gloo_built(verbose=verbose):
gloo_run()
目前邏輯如下:
+-----------+
|horovodrun |
+-----+-----+
|
|
v
+--------+--------+
| run_commandline |
+----+------+-----+
| |
+---------+ +--------+
| |
| |
v v
+-----+--------+ +----+--------+
| _run_elastic | | _run_static |
| | | |
+--------------+ +------+------+
|
|
v
+------+------+
| _launch_job |
| |
+------+------+
|
|
v
+---------+--------+
| run_controller |
| |
+----+----+-----+--+
| | |
+-------------+ | +--------+
| | |
| | |
v v v
+------+---+ +------+----+ +---+-----+
| gloo_run | | mpi_run | | js_run |
| | | | | |
+----------+ +-----------+ +---------+
於是我們下面就分為兩個分支介紹:gloo & mpi。
0x04 Gloo 實現
4.1 Gloo 簡介
Gloo 是 facebook出品的一個類似MPI的集合通信庫(https://github.com/facebookincubator/gloo)。
集合通信庫的主要特征是:大體上會遵照 MPI 提供的接口規定,實現了包括點對點通信(SEND,RECV等),集合通信( REDUCE,BROADCAST,ALLREDUCE等)等相關接口,然后根據自己硬件或者是系統的需要,在底層實現上進行了相應的改動,保證接口的穩定和性能。
Gloo 為CPU和GPU提供了集合通信程序的優化實現。 它特別適用於GPU,因為它可以執行通信而無需使用GPUDirect 將數據傳輸到CPU的內存。 它還能夠使用 NCCL 執行快速的節點內通信,並實現其自己的節點間例程算。你不需要考慮內存數據的拷貝,只需要實現邏輯就可以。
Gloo 支持集體通信(collective Communication),並對其進行了優化。由於 GPU 之間可以直接進行數據交換,而無需經過 CPU 和內存,因此,在 GPU 上使用 gloo后端速度更快。
Horovod 為什么會選擇 Gloo?個人認為除了其功能的全面性和性能之外,基於它可以二次開發是一個亮點,比如下面我們所說的 Rendezvous 功能就被 Horovod 用來實現彈性訓練(我們后文有專門講解)。
Gloo 和 MPI 都起到了同樣類似作用:
-
一方面Horovod內集成了基於 Gloo 的AllReduce,類似於NCCL,都是用作梯度規約;
-
另一方面,Gloo 可以用來啟動多個進程(Hovorod里用Rank表示),實現並行計算;
具體如下:
+-----------------------+ +-----------------------+ +------------------------+
| gloo_run slot 1 | | gloo_run slot 2 | | gloo_run slot 3 |
| | | | | |
| +-------------------+ | | +------------------+ | | +------------------+ |
| | python train.py | | | | python train.py | | | | python train.py | |
+----+ +<------+ +<------+ +<------+
| | | | | | | | | | | | | |
| | +-------------------+ | | +------------------+ | | +------------------+ | |
| | | | | | | |
| +-----------------------+ +-----------------------+ +------------------------+ |
| |
| |
| |
v-------------------------------------------------------------------------------------->
Ring Allreduce on Gloo
4.2 Rendezvous 功能
4.2.1 Rendezvous 概念
在 Gloo 的文檔中,如此說:
The rendezvous process needs to happen exactly once per Gloo context.
It makes participating Gloo processes exchange details for setting up their communication channels. For example, when the TCP transport is used, processes exchange IP address and port number details of listening sockets.
Rendezvous can be executed by accessing a key/value store that is accessible by all participating processes. Every process is responsible for setting a number of keys and will wait until their peers have set their keys. The values stored against these keys hold
the information that is passed to the transport layer.
大致意思是:
Gloo 在每一個 Gloo context 之中有一個 rendezvous process,Gloo 利用它來交換通訊需要的細節。
Rendezvous 具體實現是可以依靠訪問一個 KVstore 來完成。具體細節就是通過 KVstore 來進行交互。
以 Horovod 為例:
- Horovod 在進行容錯 AllReduce 訓練時,除了啟動 worker 進程外,還會啟動一個 driver 進程。這個 driver 進程用於幫助 worker 調用 gloo 構造 AllReduce 通信環。
- driver 進程中會創建一個帶有 KVStore 的 RendezvousServer,driver 會將參與通信的 worker 的 ip 等信息存入 KVstore 中。
- 然后 worker 就可以調用 gloo 來訪問 RendezvousServer 構造通信環了。
4.2.2 RendezvousServer
具體代碼如下,可以看到是啟動了RendezvousHTTPServer(就是繼承拓展了 HTTPServer):
class RendezvousServer:
def __init__(self, verbose=0):
self._httpd = None
self._listen_thread = None
self._verbose = verbose
# Rendezvous function finds a available port, create http socket,
# and start listening loop to handle request
# self.httpd.init needs to be called after server start
def start(self, handler_cls=RendezvousHandler): # 下面馬上介紹
self._httpd, port = find_port(
lambda addr: RendezvousHTTPServer(
addr, handler_cls, self._verbose))
# start the listening loop
self._listen_thread = in_thread(target=self._httpd.serve_forever)
return port
def init(self, host_alloc_plan):
self._httpd.init(host_alloc_plan)
def stop(self):
self._httpd.shutdown()
self._listen_thread.join()
4.2.3 KVStore
KVStore 是由 KVStoreHandler 來體現,RendezvousHandler 繼承了 KVStoreHandler,進而被 RendezvousServer 作為 handler 使用。
KVStoreHandler 精簡版代碼如下:
class KVStoreHandler(SimpleHTTPRequestHandler):
# Override PUT handler
def do_PUT(self):
paths = self.path.split('/')
_, scope, key = paths
# Get body length
content_length = int(self.headers['Content-Length'])
value = self.rfile.read(content_length)
self._put_value(scope, key, value)
self.send_status_code(OK)
def _put_value(self, scope, key, value):
with self.server.cache_lock:
scope_dict = self.server.cache.setdefault(scope, {})
scope_dict[key] = value
4.2.4 底層使用
Rendezvous 具體如何使用?簡要的說:
- Python世界構建了一個 RendezvousServer,其地址配置在環境變量(或者其他方式)中。
- 在 C++ 世界中,比如 horovod/common/gloo/gloo_context.h,horovod/common/gloo/gloo_context.cc 之中有使用。即得到 Python 配置的 RendezvousServer 的地址端口等,然后構建 gloo 所需的 context。
#define HOROVOD_HOSTNAME "HOROVOD_HOSTNAME"
#define HOROVOD_RANK "HOROVOD_RANK"
#define HOROVOD_SIZE "HOROVOD_SIZE"
#define HOROVOD_LOCAL_RANK "HOROVOD_LOCAL_RANK"
#define HOROVOD_LOCAL_SIZE "HOROVOD_LOCAL_SIZE"
#define HOROVOD_CROSS_RANK "HOROVOD_CROSS_RANK"
#define HOROVOD_CROSS_SIZE "HOROVOD_CROSS_SIZE"
#define HOROVOD_ELASTIC "HOROVOD_ELASTIC"
ctx = Rendezvous(HOROVOD_GLOO_GLOBAL_PREFIX,
rendezvous_addr_env, rendezvous_port,
rank, size, dev, timeout);
local_ctx = Rendezvous(HOROVOD_GLOO_LOCAL_PREFIX + hostname,
rendezvous_addr_env, rendezvous_port,
local_rank, local_size, dev, timeout);
cross_ctx = Rendezvous(HOROVOD_GLOO_CROSS_PREFIX + std::to_string(local_rank),
rendezvous_addr_env, rendezvous_port,
cross_rank, cross_size, dev, timeout);
邏輯如下,C++世界會從python世界的獲取到RendezvousServer的 IP,port:
+---------------------> System Env +------------------+
| addr, port, ... addr, port, ... |
| + |
| | |
| | |
| | |
| | |
| | |
| Python | C++ |
| | |
| | |
| | |
| | v
+---------+---------------+ | +------------+--------+
| RendezvousServer | | |GlooContext |
| | | | |
| | | | |
| | | | |
| RendezvousHandler | | | Rendezvous |
| | | | |
+-------------------------+ | +---------------------+
|
+
4.3 Horovd 的 gloo 入口
gloo_run 是 horovod 之中,gloo 模塊的 相關入口。
注釋說的很清楚:每一個 thread 將使用 ssh 命令在遠程host之上啟動訓練job。
def gloo_run(settings, nics, env, server_ip, command):
# Each thread will use ssh command to launch the job on each remote host. If an
# error occurs in one thread, entire process will be terminated. Otherwise,
# threads will keep running and ssh session.
exec_command = _exec_command_fn(settings)
launch_gloo(command, exec_command, settings, nics, env, server_ip)
就是用 launch_gloo 來運行 exec_command。
此時 command 參數類似 "['python', 'train.py']"。
4.4 構建可執行環境
gloo_run 的第一部分是 exec_command = _exec_command_fn(settings),就是基於各種配置來生成可以執行命令環境。如果是遠程,就得生成相關遠程可運行命令環境(包括切換目錄,遠程執行等等)。
4.4.1 _exec_command_fn
具體又可以分為兩部分:
- 利用 get_remote_command 來生成相關遠程可運行環境,比如在訓練腳本前面加上
'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'; - 調整輸入輸出,利用 safe_shell_exec.execute 來實現安全執行能力;
具體如下:
def _exec_command_fn(settings):
"""
executes the jobs defined by run command on hosts.
:param hosts_alloc: list of dict indicating the allocating info.
For example,
[{'Hostname':'worker-0', 'Rank': 0, 'Local_rank': 0, 'Cross_rank':0,
'Size':2, 'Local_size':1, 'Cross_size':2},
{'Hostname':'worker-1', 'Rank': 1, 'Local_rank': 0, 'Cross_rank':1,
'Size':2, 'Local_size':1, 'Cross_size':2}
]
:type hosts_alloc: list(dict)
:param remote_host_names: names that are resolved to one of the addresses
of remote hosts interfaces.
:param _run_command: command to execute
"""
def _exec_command(command, slot_info, events):
index = slot_info.rank
host_name = slot_info.hostname
host_address = network.resolve_host_address(host_name)
local_addresses = network.get_local_host_addresses()
# 需要構建遠程命令
if host_address not in local_addresses:
local_command = quote('cd {pwd} > /dev/null 2>&1 ; {command}'
.format(pwd=os.getcwd(), command=command))
command = get_remote_command(local_command,
host=host_name,
port=settings.ssh_port,
identity_file=settings.ssh_identity_file)
# Redirect output if requested
# 調整輸入輸出,利用 safe_shell_exec.execute 來實現安全執行能力
stdout = stderr = None
stdout_file = stderr_file = None
if settings.output_filename:
padded_rank = _pad_rank(index, settings.num_proc)
output_dir_rank = os.path.join(settings.output_filename, 'rank.{rank}'.format(rank=padded_rank))
if not os.path.exists(output_dir_rank):
os.mkdir(output_dir_rank)
stdout_file = open(os.path.join(output_dir_rank, 'stdout'), 'w')
stderr_file = open(os.path.join(output_dir_rank, 'stderr'), 'w')
stdout = MultiFile([sys.stdout, stdout_file])
stderr = MultiFile([sys.stderr, stderr_file])
# 實現安全執行能力
exit_code = safe_shell_exec.execute(command,
index=index,
stdout=stdout,
stderr=stderr,
events=events,...)
return exit_code, time.time()
return _exec_command
4.4.2 get_remote_command
本函數是針對遠程 host,獲取如何在其上運行的方式。這個函數是比較新加入的,具體和 kubeflow mpi operator 也相關,以后有機會再分析。
SSH_COMMAND_PREFIX = 'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'
def get_ssh_command(local_command, host, port=None, identity_file=None, timeout_s=None):
port_arg = f'-p {port}' if port is not None else ''
identity_file_arg = f'-i {identity_file}' if identity_file is not None else ''
timeout_arg = f'-o ConnectTimeout={timeout_s}' if timeout_s is not None else ''
return f'{SSH_COMMAND_PREFIX} {host} {port_arg} {identity_file_arg} {timeout_arg} {local_command}'
def get_remote_command(local_command, host, port=None, identity_file=None, timeout_s=None):
return f'{env_util.KUBEFLOW_MPI_EXEC} {host} {local_command}' if env_util.is_kubeflow_mpi() \
else get_ssh_command(local_command, host, port, identity_file, timeout_s)
大致邏輯如下:
command : python train.py
+
|
|
v
+---------+-------------+
| |
| get_remote_command |
| |
+---------+-------------+
|
|
v
ssh -o ... python train.py
+
|
|
|
v
+---------+--------------+
|safe_shell_exec.execute |
| |
+------------------------+
4.5 使用 gloo 執行命令
獲取到了可執行環境 exec_command 與 執行命令 command 之后,就可以使用 gloo 來執行命令了。
每個 command 都是被 exec_command 來執行。
launch_gloo 來獲取命令,各種配置信息,網卡信息(nics,比如 {'lo'}),host信息等,然后開始運行,就是開始運行我們的訓練代碼了,具體是:
- 建立 RendezvousServer,這個會被底層 Gloo C++ 環境使用到;
- host_alloc_plan = get_host_assignments 來根據host進行分配slot,就是horovod的哪個rank應該在哪個host上的哪個slot之上運行;
- get_run_command 獲取到可執行命令;
- slot_info_to_command_fn 來得到在slot之上可執行的 slot command;
- 依據 slot_info_to_command_fn 構建 args_list,這個 list 之中,每一個arg就是一個 slot command;
- 多線程執行,在每一個 exec_command 之上執行每一個 arg(slot command);
代碼如下:
def launch_gloo(command, exec_command, settings, nics, env, server_ip):
"""
Launches the given command multiple times using gloo.
Each command is launched via exec_command.
:param command: command to launch
:param exec_command: means to execute a single command
:param settings: settings for the distribution
:param nics: common interfaces
:param env: environment to use
:param server_ip: ip to use for rendezvous server
"""
# Make the output directory if it does not exist
if settings.output_filename:
_mkdir_p(settings.output_filename)
# start global rendezvous server and get port that it is listening on
# 建立 RendezvousServer,這個會被底層 Gloo C++ 環境使用到
rendezvous = RendezvousServer(settings.verbose)
# allocate processes into slots
# 來根據host進行分配slot,就是horovod的哪個rank應該在哪個host上的哪個slot之上運行
hosts = parse_hosts(settings.hosts)
host_alloc_plan = get_host_assignments(hosts, settings.num_proc)
# start global rendezvous server and get port that it is listening on
global_rendezv_port = rendezvous.start()
rendezvous.init(host_alloc_plan)
# 獲取到可執行命令
run_command = get_run_command(command, server_ip, nics, global_rendezv_port)
# 得到在slot之上可執行的 slot command
slot_info_to_command = _slot_info_to_command_fn(run_command, env)
event = register_shutdown_event()
# 依據 slot_info_to_command_fn 構建 args_list,這個 list 之中,每一個arg就是一個 slot command
args_list = [[slot_info_to_command(slot_info), slot_info, [event]]
for slot_info in host_alloc_plan]
# If an error occurs in one thread, entire process will be terminated.
# Otherwise, threads will keep running.
# 多線程執行,在每一個 exec_command 之上執行每一個 arg(slot command)
res = threads.execute_function_multithreaded(exec_command,
args_list,
block_until_all_done=True)
for name, value in sorted(res.items(), key=lambda item: item[1][1]):
exit_code, timestamp = value
4.5.1 slot分配方案
上面提到了 Horovod 在 slot 之上執行任務,我們需要看看 slot 是如何分配的。
4.5.1.1 從輸入參數解析
由下面代碼可知,slot 是通過 parse_hosts 自動解析出來。
def parse_hosts(hosts_string):
"""Parse a string of comma-separated hostname:slots mappings into a list of HostItem objects.
:param hosts_string: list of addresses and number of processes on each host.
For example:
- 'worker-0:2,worker-1:2'
- '10.11.11.11:4,10.11.11.12:4'
:return: a list of HostInfo objects describing host to slot mappings
:rtype: list[HostInfo]
"""
return [HostInfo.from_string(host_string) for host_string in hosts_string.split(',')]
具體 HostInfo.from_string 信息如下:
class HostInfo:
def __init__(self, hostname, slots):
self.hostname = hostname
self.slots = slots
@staticmethod
def from_string(host_string):
hostname, slots = host_string.strip().split(':')
return HostInfo(hostname, int(slots))
4.5.1.2 分配方案
get_host_assignments 會依據 host 和 process capacities (slots) 來給 Horovod 之中的進程分配,即給出一個 horovod rank 和 slot 的對應關系。設置了幾個 np,就有幾個 slot。
給出的分配方案類似如下,這樣就知道了哪個rank對應於哪個host上的哪個slot:
[
SlotInfo(hostname='h1', rank=0, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),
SlotInfo(hostname='h2', rank=1, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),
]
代碼如下:
def get_host_assignments(hosts, min_np, max_np=None):
"""Assign hosts with process capacities (slots) to ranks in the Horovod process.
This function will try to allocate as many as possible processes on the same host to leverage local network.
:param hosts: list of HostInfo objects describing host and slot capacity
:type hosts: list[HostInfo]
:param min_np: minimum number of processes to be allocated
:param max_np: (optional) maximum number of processes to be allocated
:return: a list of the allocation of process on hosts in a `SlotInfo` object.
:rtype: list[SlotInfo]
"""
host_ranks = []
cross_ranks = collections.defaultdict(dict)
rank = 0
# 依據 hosts 信息構建 rank, local rank, cross rank(hierarchical allreduce所需要)
for host_info in hosts:
ranks = []
for local_rank in range(host_info.slots):
if rank == max_np:
break
ranks.append(rank)
rank += 1
cross_ranks_at_local = cross_ranks[local_rank]
cross_ranks_at_local[host_info.hostname] = len(cross_ranks_at_local)
host_ranks.append((host_info, ranks))
world_size = rank
# 給出一個 horovod rank 和 slot 的對應關系。返回一個alloc_list,每個SlotInfo包括各種rank信息
alloc_list = []
for host_info, ranks in host_ranks:
local_size = len(ranks)
for local_rank, rank in enumerate(ranks):
cross_ranks_at_local = cross_ranks[local_rank]
cross_rank = cross_ranks_at_local[host_info.hostname]
cross_size = len(cross_ranks_at_local)
alloc_list.append(
SlotInfo(
hostname=host_info.hostname,
rank=rank,
local_rank=local_rank,
cross_rank=cross_rank,
size=world_size,
local_size=local_size,
cross_size=cross_size))
return alloc_list
4.5.2 得到運行命令
get_run_command 是從環境變量中得到 Gloo 的變量,然后加到 command 之上。此步完成之后,得到類似如下命令:
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
可以把這個格式縮寫為:{horovod_gloo_env} command。
代碼為:
def create_run_env_vars(server_ip, nics, port, elastic=False):
# 從環境變量中得到 Gloo 的變量
run_envs = {
'HOROVOD_GLOO_RENDEZVOUS_ADDR': server_ip,
'HOROVOD_GLOO_RENDEZVOUS_PORT': port,
'HOROVOD_CONTROLLER': "gloo",
'HOROVOD_CPU_OPERATIONS': "gloo",
'HOROVOD_GLOO_IFACE': list(nics)[0], # TODO: add multiple ifaces in future
'NCCL_SOCKET_IFNAME': ','.join(nics),
}
if elastic:
run_envs["HOROVOD_ELASTIC"] = "1"
return run_envs
def get_run_command(command, server_ip, nics, port, elastic=False):
env_vars = create_run_env_vars(server_ip, nics, port, elastic)
env_string = " ".join(
[f"{k}={str(v)}" for k, v in env_vars.items()])
run_command = (
'{env_string} '
'{command}' # expect a lot of environment variables
.format(env_string=env_string,
command=' '.join(quote(par) for par in command)))
return run_command
4.5.3 得到slot運行命令
得到運行命令之后,這里會結合 horovod env 和 env,以及slot 分配情況 進一步修改為適合 gloo 運行的方式。就是可以在具體每一個slot上運行的命令。
可以把這個格式縮寫為:{horovod_gloo_env} {horovod_rendez_env} {env} run_command。
此步完成之后,得到類似如下:
HOROVOD_HOSTNAME=1.1.1.1 HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
具體代碼如下:
def _slot_info_to_command_fn(run_command, env):
# TODO: Workaround for over-buffered outputs. Investigate how mpirun avoids this problem.
env = copy.copy(env) # copy env so we do not leak env modifications
env['PYTHONUNBUFFERED'] = '1'
def slot_info_to_command(slot_info):
"""
Given a slot_info, creates a command used by gloo to launch a single job.
:param slot_info: host and slot to execute the run command on
:return:
"""
env_vars = create_slot_env_vars(slot_info)
horovod_rendez_env = " ".join(
[f"{k}={str(v)}" for k, v in env_vars.items()])
return '{horovod_env} {env} {run_command}' .format(
horovod_env=horovod_rendez_env,
env=' '.join(['%s=%s' % (key, quote(value)) for key, value in env.items()
if env_util.is_exportable(key)]),
run_command=run_command)
return slot_info_to_command
4.5.4 多線程調用命令
這就是啟動了多線程進行調用。gloo_run 的注釋說的很清楚:在調用 execute_function_multithreaded 時,每一個thread將使用 ssh 命令在遠程host之上啟動訓練job。
回憶下之前我們在“構建可執行環境” 中提到:利用 get_remote_command 來生成相關遠程可運行環境,比如在訓練腳本前面加上 'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'。大家就理解了如何在遠端執行。
在本地運行,則命令大致為:
cd /code directory > /dev/null 2 >&1
HOROVOD_HOSTNAME=1.1.1.1 HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
在遠端運行,命令就需要加上 ssh 信息,大致為:
ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no 1.1.1.1
cd /code directory > /dev/null 2 >&1
HOROVOD_HOSTNAME=1.1.1.1 HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
execute_function_multithreaded 具體代碼如下,其中:
fn就是前面提到的程序運行環境(能力)exec_command。fn(*arg[:-1])就是在exec_command之中運行slot_info_to_command。
def execute_function_multithreaded(fn,
args_list,
block_until_all_done=True,
max_concurrent_executions=1000):
"""
Executes fn in multiple threads each with one set of the args in the
args_list.
:param fn: function to be executed
:type fn:
:param args_list:
:type args_list: list(list)
:param block_until_all_done: if is True, function will block until all the
threads are done and will return the results of each thread's execution.
:type block_until_all_done: bool
:param max_concurrent_executions:
:type max_concurrent_executions: int
:return:
If block_until_all_done is False, returns None. If block_until_all_done is
True, function returns the dict of results.
{
index: execution result of fn with args_list[index]
}
:rtype: dict
"""
result_queue = queue.Queue()
worker_queue = queue.Queue()
for i, arg in enumerate(args_list):
arg.append(i)
worker_queue.put(arg)
def fn_execute():
while True:
try:
arg = worker_queue.get(block=False)
except queue.Empty:
return
exec_index = arg[-1]
# fn 就是前面提到的程序運行環境(能力)exec_command
# fn(*arg[:-1])是在 exec_command 之中運行 slot_info_to_command
res = fn(*arg[:-1])
result_queue.put((exec_index, res))
threads = []
number_of_threads = min(max_concurrent_executions, len(args_list))
# 在多線程中執行 fn_execute
for _ in range(number_of_threads):
thread = in_thread(target=fn_execute, daemon=not block_until_all_done)
threads.append(thread)
# Returns the results only if block_until_all_done is set.
# 如果有設置,則 block 等待
results = None
if block_until_all_done:
# Because join() cannot be interrupted by signal, a single join()
# needs to be separated into join()s with timeout in a while loop.
have_alive_child = True
while have_alive_child:
have_alive_child = False
for t in threads:
t.join(0.1)
if t.is_alive():
have_alive_child = True
results = {}
while not result_queue.empty():
item = result_queue.get()
results[item[0]] = item[1]
return results
python train.py 就會進入到我們的訓練代碼。
大致邏輯如下圖,可以看到,結合了各種信息之后,構建了一個可以執行的結果,然后多host執行:
- 圖左面,是從 參數中獲取 host 等信息,然后解析出 slot 信息;
- 圖右邊,是從 python train.py 這個待運行的命令,基於各種配置來生成可以執行命令環境。如果是遠程,就得生成 相關遠程可運行命令環境(包括切換目錄,遠程執行等等);
- 圖中間,是從 python train.py 這個待運行的命令,經過添加 env 信息,gloo 信息。然后結合 左面的 slot 信息 和 右面 的可以執行命令環境 之后,得到了可以在多線程上運行,從而在 多slot 運行的命令。
args : '10.11.11.11:4,10.11.11.12:4' python train.py command : python train.py
+ + +
| | |
| | |
v v v
+----------+--------+ +----------+----------+ +---------+-------------+
| parse_hosts | | get_run_command | | |
+----------+--------+ | | | get_remote_command |
| +----------+----------+ | |
| | +---------+-------------+
v | |
+------------+-----------+ v |
| get_host_assignments | v
| | gloo python train.py
+------------+-----------+ + ssh -o ... python train.py
| | +
| | |
v | |
| |
SlotInfo(hostname='h2', rank=1) v v
+ +-----------+---------------+ +---------+--------------+
| | _slot_info_to_command_fn | |safe_shell_exec.execute |
+-----------------------> | | | |
+-----------+---------------+ +---------+--------------+
| |
| |
v |
|
HOROVOD_CONTROLLER=gloo python train.py |
+ |
| |
| |
v |
+-------------+-------------------+ |
| | |
| execute_function_multithreaded | <---------------+
| |
+---------------------------------+
手機如下:

4.6 C++舉例
我們給出一個底層代碼,大家就進一步了解 Gloo 可以起到什么作用。
這個就是 Horovod 之中,rank 0 最終給其他 rank 發送構建好的 Tensor。
void GlooController::SendFinalTensors(ResponseList& response_list) {
// Notify all nodes which tensors we'd like to reduce at this step.
std::string encoded_response;
ResponseList::SerializeToString(response_list, encoded_response);
// Boardcast the response length
int encoded_response_length = (int)encoded_response.length() + 1;
{
gloo::BroadcastOptions opts(gloo_context_.ctx);
opts.setOutput(&encoded_response_length, 1);
opts.setRoot(RANK_ZERO);
gloo::broadcast(opts); // 廣播給其他rank
}
// Boardcast the response
{
gloo::BroadcastOptions opts(gloo_context_.ctx);
opts.setOutput((uint8_t*)(encoded_response.c_str()),
encoded_response_length);
opts.setRoot(RANK_ZERO);
gloo::broadcast(opts); // 廣播給其他rank
}
}
0x05 Mpi 實現
5.1 openmpi 庫
horovod 這里主要依賴 openmpi。
- MPI:英文全稱是Message Passing Interface,MPI是一個跨語言的通訊協議,用於編寫並行計算機。支持點對點和廣播。MPI是一個信息傳遞應用程序接口,包括協議和和語義說明,他們指明其如何在各種實現中發揮其特性。MPI的目標是高性能,大規模性,和可移植性。
- openMPI:英文全稱是open Message Passing Interface。openMPI是MPI的一種實現,一種庫項目。
MPI在Hovorod的角色比較特殊:
-
一方面Horovod內集成了基於MPI的AllReduce,類似於NCCL,都是用作梯度規約;
-
另一方面,MPI可以用來在所有機器上啟動多個進程(Hovorod里用Rank表示),實現並行計算;
5.2 mpi_run 函數
此部分代碼位於:horovod/runner/mpi_run.py。
首先摘錄其關鍵代碼如下,可以看出來其核心是運行 mpirun 命令。
# 我是下面大段代碼中的關鍵代碼!
mpirun_command = (
'mpirun {basic_args} '
'-np {num_proc}{ppn_arg}{hosts_arg} '
'{binding_args} '
'{mpi_args} '
'{mpi_ssh_args} '
'{tcp_intf_arg} '
'{nccl_socket_intf_arg} '
'{output_filename_arg} '
'{env} {extra_mpi_args} {command}'
.format(basic_args=basic_args,
num_proc=settings.num_proc,
ppn_arg=ppn_arg,
hosts_arg=hosts_arg,
binding_args=binding_args,
mpi_args=' '.join(mpi_impl_flags),
tcp_intf_arg=tcp_intf_arg,
nccl_socket_intf_arg=nccl_socket_intf_arg,
mpi_ssh_args=mpi_ssh_args,
output_filename_arg=' '.join(output),
env=env_list,
extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else '',
command=' '.join(quote(par) for par in command))
)
# Execute the mpirun command.
if settings.run_func_mode:
exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
else:
os.execve('/bin/sh', ['/bin/sh', '-c', mpirun_command], env)
就是依據各種配置以及參數來構建 mpirun 命令的所有參數,比如 ssh 的參數,mpi 參數,nccl 參數等等。
最后得到的 mpirun 命令舉例如下:
mpirun --allow-run-as-root --np 2 -bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
具體代碼如下,具體是:
# 上面代碼是我之中的片段
def mpi_run(settings, nics, env, command, stdout=None, stderr=None):
"""
Runs mpi_run.
Args:
settings: Settings for running MPI.
Note: settings.num_proc and settings.hosts must not be None.
nics: Interfaces to include by MPI.
env: Environment dictionary to use for running command.
command: Command and arguments to run as a list of string.
stdout: Stdout of the mpi process.
Only used when settings.run_func_mode is True.
stderr: Stderr of the mpi process.
Only used when settings.run_func_mode is True.
"""
# 得到各種配置
mpi_impl_flags, impl_binding_args, mpi = _get_mpi_implementation_flags(settings.tcp_flag, env=env)
impi = _IMPI_IMPL == mpi
# 處理ssh參數
ssh_args = []
if settings.ssh_port:
ssh_args += [f'-p {settings.ssh_port}']
if settings.ssh_identity_file:
ssh_args += [f'-i {settings.ssh_identity_file}']
mpi_ssh_args = ''
if ssh_args:
joined_ssh_args = ' '.join(ssh_args)
mpi_ssh_args = f'-bootstrap=ssh -bootstrap-exec-args \"{joined_ssh_args}\"' if impi else f'-mca plm_rsh_args \"{joined_ssh_args}\"'
# 處理網絡配置,網卡信息等
tcp_intf_arg = '-mca btl_tcp_if_include {nics}'.format(
nics=','.join(nics)) if nics and not impi else ''
nccl_socket_intf_arg = '-{opt} NCCL_SOCKET_IFNAME={nics}'.format(
opt='genv' if impi else 'x',
nics=','.join(nics)) if nics else ''
# 處理host信息
# On large cluster runs (e.g. Summit), we need extra settings to work around OpenMPI issues
host_names, host_to_slots = hosts.parse_hosts_and_slots(settings.hosts)
if not impi and host_names and len(host_names) >= _LARGE_CLUSTER_THRESHOLD:
mpi_impl_flags.append('-mca plm_rsh_no_tree_spawn true')
mpi_impl_flags.append('-mca plm_rsh_num_concurrent {}'.format(len(host_names)))
# if user does not specify any hosts, mpirun by default uses local host.
# There is no need to specify localhost.
hosts_arg = '-{opt} {hosts}'.format(opt='hosts' if impi else 'H',
hosts=','.join(host_names) if host_names and impi else settings.hosts)
# 處理ppn配置
ppn_arg = ' '
if host_to_slots and impi:
ppn = host_to_slots[host_names[0]]
for h_name in host_names[1:]:
ppn_arg = ' -ppn {} '.format(ppn)
# 處理超時配置
if settings.prefix_output_with_timestamp and not impi:
mpi_impl_flags.append('--timestamp-output')
binding_args = settings.binding_args if settings.binding_args and not impi else ' '.join(impl_binding_args)
# 配置需要root身份運行
basic_args = '-l' if impi else '--allow-run-as-root --tag-output'
output = []
if settings.output_filename:
output.append('-outfile-pattern' if impi else '--output-filename')
output.append(settings.output_filename)
# 構建環境信息列表
env_list = '' if impi else ' '.join(
'-x %s' % key for key in sorted(env.keys()) if env_util.is_exportable(key))
# 構建最終的 MPI 命令
# Pass all the env variables to the mpirun command.
mpirun_command = (
'mpirun {basic_args} '
'-np {num_proc}{ppn_arg}{hosts_arg} '
'{binding_args} '
'{mpi_args} '
'{mpi_ssh_args} '
'{tcp_intf_arg} '
'{nccl_socket_intf_arg} '
'{output_filename_arg} '
'{env} {extra_mpi_args} {command}' # expect a lot of environment variables
.format(basic_args=basic_args,
num_proc=settings.num_proc,
ppn_arg=ppn_arg,
hosts_arg=hosts_arg,
binding_args=binding_args,
mpi_args=' '.join(mpi_impl_flags),
tcp_intf_arg=tcp_intf_arg,
nccl_socket_intf_arg=nccl_socket_intf_arg,
mpi_ssh_args=mpi_ssh_args,
output_filename_arg=' '.join(output),
env=env_list,
extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else '',
command=' '.join(quote(par) for par in command))
)
# we need the driver's PATH and PYTHONPATH in env to run mpirun,
# env for mpirun is different to env encoded in mpirun_command
for var in ['PATH', 'PYTHONPATH']:
if var not in env and var in os.environ:
# copy env so we do not leak env modifications
env = copy.copy(env)
# copy var over from os.environ
env[var] = os.environ[var]
# Execute the mpirun command.
if settings.run_func_mode:
exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
else:
os.execve('/bin/sh', ['/bin/sh', '-c', mpirun_command], env)
5.3 mpirun命令
因為 mpi_run 使用的是 mpirun 命令來運行,所以我們介紹一下。
mpirun是MPI程序的啟動腳本,它簡化了並行進程的啟動過程,盡可能屏蔽了底層的實現細節,從而為用戶提供了一個通用的MPI並行機制。
在用mpirun命令執行並行程序時,參數-np指明了需要並行運行的進程個數。mpirun首先在本地結點上啟動一個進程,然后根據/usr/local/share/machines.LINUX文件中所列出的主機,為每個主機啟動一個進程。若進程數比可用的並行節點數多,則多余的進程將重新按照上述規則進行。按這個機制分配好進程后,一般會給每個節點分一個固定的標號,類似於身份證了,后續在消息傳遞中會用到。
這里需要說明的是,實際運行的
orterun(Open MPI SPMD / MPMD啟動器; mpirun / mpiexec只是它的符號鏈接)
命令舉例如下:
mpirun -np 4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
0x06 總結
對比 gloo 和 mpi 的實現,我們還是能看出來區別。
6.1 gloo
gloo 只是一個庫,需要 horovod 來完成命令分發功能。
gloo 需要 horovod 自己實現本地運行和遠端運行方式,即 get_remote_command 函數 實現 'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'。
gloo 需要實現 RendezvousServer,底層會利用 RendezvousServer 進行通訊。
6.2 mpi
mpi 則功能強大很多,只要把命令配置成被 mpirun 包裝,openmpi 就可以自行完成命令分發執行。說到底,horovod 是一個 mpirun 程序,即使運行了 tensor flow,也是一個mpi程序,可以互相交互。
0xEE 個人信息
★★★★★★關於生活和技術的思考★★★★★★
微信公眾賬號:羅西的思考
如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。

