parallel python作為輕量級的python分布式框架,為用python做簡單的分布式計算提供了很大的方便,而且使用也簡單。
主要分為單機模式和集群模式:
單機模式
單機模式就是本機上進行多進程,這與multiprocess的多進程類似,甚至表現不是那么好(主要是體現在速度上面,比multiprocess還是要慢一些),其代碼如下:
import pp
import time
def time_delay(n):
time.sleep(n)
def main_func(n):
t_time = time.time() # 使用了time模塊,需要在submit里面添加次模塊。如果不添加,也可以直接在函數里面直接導入模塊。
time_delay(n) # 調用其他函數,需要在submit里面添加此函數
a = 0
for k in range(n+1):
a += k
print(f'job: {n} takes {time.time() - t_time}s')
return a
if __name__ == "__main__":
ppservers = () # 遠程服務端ip和端口號,為空就是本機
job_server = pp.Server(ncpus=5, ppservers=ppservers) # ncpus:本機上運行進程數量
# 生成[(n1, job1), (n2, job2),...] jobs列表
jobs = [job_server.submit(main_func, # 主函數
(n,), # 函數的參數
depfuncs=(time_delay,), # 函數內會調用到的其他函數,需要傳入函數
modules=("time", ) # 需要調用的模塊,通過字符串類型傳入。注意:從此處引入模塊到 主函數中,外部導入的模塊不能使用import...as,也不能用from ... import,只能用import。如果是在函數內部導入模塊無所謂。
)
for n in range(20)]
# 遍歷jobs,執行函數
result_list = [] # 接收返回結果
for job in jobs:
rt = job() # 執行函數,獲取返回值。如果函數無返回值,無需用變量接收,直接執行函數即可。
result_list.append(rt)
job_server.destroy() # 銷毀進程
print(result_list)
集群模式
在集群內多進程,與單機的區別是,單機只能在本機上進行多進程。而集群內多進程可以在集群內分配進程,可以有效避免某些機器超負荷運作,而有些機器在閑置的情況。
- 與單機模式的區別是:集群模式需要先啟動其他服務器作為客戶端(想要調用哪些服務器,就要在里面啟動),
- 啟動方法:
- 找到ppserver.py文件。安裝包的python2或python3下面有
- 通過此命令,啟動文件:python ppserver.py - p 3505 - w 10 - i 192.168.0.231 - s "123456"
- 上面命令中:-p是自定義端口號,-w是服務端進程數量(可以與本機不同,不同節點上的都可以不同), -i是啟動ppserver.py服務器的ip,-s是自定義密碼
- 查詢pp進程:ps -ef | grep python | grep pp | grep user_name # user_name是登錄服務器的賬號名
- 最后在本機中,運行以下代碼,即可在本機和其他機器上同時進行多進程任務
import pp
import time
def time_delay(n):
time.sleep(n)
def main_func(n):
t_time = time.time() # 使用了time模塊,需要在submit里面添加次模塊。如果不添加,也可以直接在函數里面直接導入模塊。
time_delay(n) # 調用其他函數,需要在submit里面添加此函數
a = 0
for k in range(n+1):
a += k
print(f'job: {n} takes {time.time() - t_time}s')
return a
if __name__ == "__main__":
ppservers = ('192.168.0.231:3505',) # 注意:這里需要填寫遠端服務器的ip和端口號
job_server = pp.Server(ncpus=5, ppservers=ppservers, secret='123456') # ncpus:本機進程數量,需要輸入遠端服務器啟動ppserver時候的密碼
# 生成[(n1, job1), (n2, job2),...] jobs列表
jobs = [job_server.submit(main_func, # 主函數
(n,), # 函數的參數
depfuncs=(time_delay,), # 函數內會調用到的其他函數,需要傳入函數
modules=("time", ) # 需要調用的模塊,通過字符串類型傳入。注意:從此處引入模塊到 主函數中,外部導入的模塊不能使用import...as,也不能用from ... import,只能用import。如果是在函數內部導入模塊無所謂。
)
for n in range(20)]
# 遍歷jobs,執行函數
result_list = [] # 接收返回結果
for job in jobs:
rt = job() # 執行函數,獲取返回值。如果函數無返回值,無需用變量接收,直接執行函數即可。
result_list.append(rt)
job_server.destroy() # 銷毀進程
print(result_list)
#### 結果輸出如下圖所示: