parallel python多進程集群模式


parallel python作為輕量級的python分布式框架,為用python做簡單的分布式計算提供了很大的方便,而且使用也簡單。
主要分為單機模式和集群模式:

單機模式

單機模式就是本機上進行多進程,這與multiprocess的多進程類似,甚至表現不是那么好(主要是體現在速度上面,比multiprocess還是要慢一些),其代碼如下:

import pp
import time

def time_delay(n):
	time.sleep(n)

def main_func(n):
	t_time = time.time()  # 使用了time模塊,需要在submit里面添加次模塊。如果不添加,也可以直接在函數里面直接導入模塊。
	time_delay(n)  # 調用其他函數,需要在submit里面添加此函數
	a = 0
	for k in range(n+1):
		a += k
	print(f'job: {n} takes {time.time() - t_time}s')
	return a


if __name__ == "__main__":
	ppservers = ()  # 遠程服務端ip和端口號,為空就是本機
	job_server = pp.Server(ncpus=5, ppservers=ppservers)  # ncpus:本機上運行進程數量
	# 生成[(n1, job1), (n2, job2),...] jobs列表
	jobs = [job_server.submit(main_func,  # 主函數
	                           (n,),  # 函數的參數
	                           depfuncs=(time_delay,),  # 函數內會調用到的其他函數,需要傳入函數
	                           modules=("time", )  # 需要調用的模塊,通過字符串類型傳入。注意:從此處引入模塊到 主函數中,外部導入的模塊不能使用import...as,也不能用from ... import,只能用import。如果是在函數內部導入模塊無所謂。
	                           )
	        for n in range(20)]

	# 遍歷jobs,執行函數
	result_list = []  # 接收返回結果
	for job in jobs:
		rt = job()  # 執行函數,獲取返回值。如果函數無返回值,無需用變量接收,直接執行函數即可。
		result_list.append(rt)

	job_server.destroy()  # 銷毀進程
	print(result_list)

集群模式

在集群內多進程,與單機的區別是,單機只能在本機上進行多進程。而集群內多進程可以在集群內分配進程,可以有效避免某些機器超負荷運作,而有些機器在閑置的情況。

  • 與單機模式的區別是:集群模式需要先啟動其他服務器作為客戶端(想要調用哪些服務器,就要在里面啟動),
  • 啟動方法:
    - 找到ppserver.py文件。安裝包的python2或python3下面有
    - 通過此命令,啟動文件:python ppserver.py - p 3505 - w 10 - i 192.168.0.231 - s "123456"
    - 上面命令中:-p是自定義端口號,-w是服務端進程數量(可以與本機不同,不同節點上的都可以不同), -i是啟動ppserver.py服務器的ip,-s是自定義密碼
    - 查詢pp進程:ps -ef | grep python | grep pp | grep user_name # user_name是登錄服務器的賬號名
  • 最后在本機中,運行以下代碼,即可在本機和其他機器上同時進行多進程任務


import pp
import time

def time_delay(n):
	time.sleep(n)

def main_func(n):
	t_time = time.time()  # 使用了time模塊,需要在submit里面添加次模塊。如果不添加,也可以直接在函數里面直接導入模塊。
	time_delay(n)  # 調用其他函數,需要在submit里面添加此函數
	a = 0
	for k in range(n+1):
		a += k
	print(f'job: {n} takes {time.time() - t_time}s')
	return a


if __name__ == "__main__":
	ppservers = ('192.168.0.231:3505',)  # 注意:這里需要填寫遠端服務器的ip和端口號
	job_server = pp.Server(ncpus=5, ppservers=ppservers, secret='123456')  # ncpus:本機進程數量,需要輸入遠端服務器啟動ppserver時候的密碼
	# 生成[(n1, job1), (n2, job2),...] jobs列表
	jobs = [job_server.submit(main_func,  # 主函數
	                           (n,),  # 函數的參數
	                           depfuncs=(time_delay,),  # 函數內會調用到的其他函數,需要傳入函數
	                           modules=("time", )  # 需要調用的模塊,通過字符串類型傳入。注意:從此處引入模塊到 主函數中,外部導入的模塊不能使用import...as,也不能用from ... import,只能用import。如果是在函數內部導入模塊無所謂。
	                           )
	        for n in range(20)]

	# 遍歷jobs,執行函數
	result_list = []  # 接收返回結果
	for job in jobs:
		rt = job()  # 執行函數,獲取返回值。如果函數無返回值,無需用變量接收,直接執行函數即可。
		result_list.append(rt)

	job_server.destroy()  # 銷毀進程
	print(result_list)


#### 結果輸出如下圖所示:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM