python3.6_多進程_multiprocessing.pool_concurrent.futures_ProcessPoolExecutor_對比
轉載注明來源: 本文鏈接 來自osnosn的博客,寫於 2020-06-27.
多進程的多種寫法,在大量任務的情況下,效率的對比。
(后面有兩個例程參考)
import time
from multiprocessing.pool import Pool
from concurrent.futures import as_completed, ProcessPoolExecutor
NUMBERS = range(1, 60000)
K = 50
def f(x):
r = 0
for k in range(1, K+2):
r += x ** (1 / k**1.5)
return ['xx',r]
if __name__ == '__main__':
if 1:
print('-------------------\n no multiProcessing:')
start = time.time()
l = []
for nn in NUMBERS:
result=f(nn)
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n multiprocessing.pool.Pool:')
start = time.time()
l = []
pool = Pool(4)
for num, result in zip(NUMBERS, pool.map(f, NUMBERS)):
l.append(result)
pool.close()
pool.terminate()
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n multiprocessing.pool.Pool, apply_async:')
start = time.time()
l = []
pool = Pool(4)
res=[]
for nn in NUMBERS:
res.append(pool.apply_async(f,(nn,)))
pool.close()
print('middle COST: {}'.format(time.time() - start))
pool.join()
for rr in res:
l.append(rr.get())
pool.terminate()
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n multiprocessing.pool.Pool, apply_async,maxtasksperchild=1000 :')
start = time.time()
l = []
pool = Pool(4,maxtasksperchild=1000)
res=[]
for nn in NUMBERS:
res.append(pool.apply_async(f,(nn,)))
pool.close()
print('middle COST: {}'.format(time.time() - start))
pool.join()
for rr in res:
l.append(rr.get())
pool.terminate()
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor with chunksize,1/4:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 4)
print('chunksize',chunksize)
for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor with chunksize,1/10:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 10)
print('chunksize',chunksize)
for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor with chunksize,1/100:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 100)
print('chunksize',chunksize)
for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor with chunksize,1/300:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 300)
print('chunksize',chunksize)
for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor with chunksize,500:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
chunksize=500
print('chunksize',chunksize)
for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor submit:')
start = time.time()
pool_res=[]
executor=ProcessPoolExecutor(max_workers=4)
for nn in NUMBERS:
res=executor.submit(f,nn)
pool_res.append(res)
print('middle COST: {}'.format(time.time() - start))
l = []
for p_res in as_completed(pool_res):
result=p_res.result()
l.append(result)
executor.shutdown()
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
if 1:
print('-------------------\n ProcessPoolExecutor without chunksize:')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=4) as executor:
for num, result in zip(NUMBERS, executor.map(f, NUMBERS)):
l.append(result)
print(len(l), l[0])
print('COST: {}'.format(time.time() - start))
print('')
結果:
-------------------
no multiProcessing:
59999 ['xx', 51.0]
COST: 1.2773692607879639
-------------------
multiprocessing.pool.Pool:
59999 ['xx', 51.0]
COST: 0.4585001468658447
-------------------
multiprocessing.pool.Pool, apply_async:
middle COST: 1.492830514907837
59999 ['xx', 51.0]
COST: 4.116384267807007
-------------------
multiprocessing.pool.Pool, apply_async,maxtasksperchild=1000 :
middle COST: 2.0289459228515625
59999 ['xx', 51.0]
COST: 5.032078266143799
-------------------
ProcessPoolExecutor with chunksize,1/4:
chunksize 3749
59999 ['xx', 51.0]
COST: 0.4767882823944092
-------------------
ProcessPoolExecutor with chunksize,1/10:
chunksize 1499
59999 ['xx', 51.0]
COST: 0.5644888877868652
-------------------
ProcessPoolExecutor with chunksize,1/100:
chunksize 149
59999 ['xx', 51.0]
COST: 0.4668114185333252
-------------------
ProcessPoolExecutor with chunksize,1/300:
chunksize 49
59999 ['xx', 51.0]
COST: 0.673607587814331
-------------------
ProcessPoolExecutor with chunksize,500:
chunksize 500
59999 ['xx', 51.0]
COST: 0.45476365089416504
-------------------
ProcessPoolExecutor submit:
middle COST: 11.38172698020935
59999 ['xx', 16145.294670113708]
COST: 21.179430723190308
-------------------
ProcessPoolExecutor without chunksize:
59999 ['xx', 51.0]
COST: 20.61406421661377
區別還挺大, 測試機器cpu有6核。python3.6
感覺用 pool.map() 並設置 chunksize 比較好
例子01
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
多進程例程,
隊列一次全部准備好,一次啟動子進程。
"""
import os
from concurrent.futures import ProcessPoolExecutor as Pool
def child_do(ii,fnn): #正真的子進程,可以接收多個參數
#可以打開hdf5文件,可以print()
print(ii,'child')
#返回參數可以是dataframe,返回參數可以是多個結果
res2=res3=res4= ii+2+fnn
return [ii,res2,res3,res4] #此處返回4個結果
def child_one(param): #此函數只有一個參數
result=child_do(* param) #把param展開,傳給函數
return result
def do_main():
# do something...
cpucount=os.cpu_count() #進程數=系統cpu個數
if cpucount<3: cpucount=3
cpucount=4 #指定進程數,(cpu太多,還是指定算了,上面2行其實沒有用)
pool=Pool(cpucount) #開啟進程池,后面用pool.map()啟動子進程
pool_args=[] #傳給子進程的參數隊列
ii=0 #計數
for fnn in range(0,1115):
ii+=1
# 放入隊列,子進程只能接受一個參數,所以使用tuple封裝多個參數,用list,dict也可以
# 參數中可以有dataframe, 傳入的是引用.
# 但要注意:多個進程同時修改同一個dataframe可能有問題
pool_args.append((ii,fnn,) ) #(child_do接受兩個參數)
#子進程的參數是由主進程發送的。一次發送一個,效率很低,很耗時。所以一次就發送一堆。
chunksize,_=divmod(len(pool_args),pool._max_workers * 30) #參數分30次傳輸
if chunksize<50:
chunksize=50 #太小就設50
elif chunksize>200:
chunksize=200 #太大就設200
for p_result in pool.map(child_one, pool_args, chunksize=chunksize): #啟動子進程,並等待返回
jj,res2,res3,res4=p_result # 獲取子進程的返回數據(child_do返回了4個結果)
print(jj,res2,res3,res4)
# 匯總結果
print('啟動進程池',ii)
pool.shutdown() #關閉進程池
# 處理匯總的結果
if __name__=='__main__':
do_main()
例子02
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
多進程例程
隊列准備多次,多次啟動子進程。
"""
import os
from concurrent.futures import ProcessPoolExecutor as Pool
import time
def child_do(ii,fnn): #正真的子進程,可以接收多個參數
#可以打開hdf5文件,可以print
print(ii,'child')
#返回參數可以是dataframe,返回參數可以是多個結果
res2=res3=res4= ii+2+fnn
return [ii,res2,res3,res4] #此處返回4個結果
def child_one(param): #此函數只有一個參數
result=child_do(* param) #把param展開,傳給函數
return result
#啟動進程池,並處理結果(此函數不會多進程同時執行)
def do_result(pool, pool_args ):
#子進程的參數是由主進程發送的。一次發送一個,效率很低,很耗時。所以一次就發送一堆。
chunksize=20 #實際使用可以設50或100
for p_result in pool.map(child_one, pool_args, chunksize=chunksize): #啟動進程池,並等待返回
jj,res2,res3,res4=p_result # 獲取子進程的返回數據(child_do返回了4個結果)
print(jj,res2,res3,res4)
# 匯總結果
return res2 #返回匯總結果
def do_main():
# 先做一些准備工作
cpucount=os.cpu_count() #進程數=系統cpu個數
if cpucount<3: cpucount=3
cpucount=4 #指定進程數,(cpu太多,還是指定算了,上面2行其實沒有用)
pool=Pool(cpucount) #開啟進程池,后面用pool.map()啟動子進程
pool_args=[] #傳給子進程的參數隊列
ii=0 #計數
for fnn in range(0,1115):
ii+=1
# 放入隊列,子進程只能接受一個參數,所以使用tuple封裝多個參數,用list,dict也可以
# 參數中可以有dataframe, 傳入的是引用.
# 但要注意:多個進程同時修改同一個dataframe可能有問題
pool_args.append((ii,fnn,) ) #(child_do接受兩個參數)
if (ii % 200) == 0: #實際可設每1000個啟動一次子進程池
mysum=do_result(pool, pool_args)
pool_args=[] #清空參數隊列
print('啟動進程池',ii)
time.sleep(0.5) #僅為了測試時暫停一下,方便查看輸出
#循環結束,再啟動一次進程池,把剩余隊列的處理掉
mysum=do_result(pool, pool_args)
print('啟動進程池',ii)
pool.shutdown() #關閉進程池
# 處理匯總的結果
if __name__=='__main__':
do_main()