multiprocessing.pool.apply_async 可以執行並行的進程,但是會將所有進程先讀入列表,對於不是很多數量的進程來說沒有問題,但是如果進程數量很多,比如100萬條,1000萬條,而進程不能很快完成,內存就會占用很多,甚至擠爆內存。那么如何限制內存的占有量呢。網上查詢,找到一種解決方法:可以檢測pool._cache的長度,如果超過一定的長度,就讓最后進入pool中的進程等待,等這個進程結束,再讀入一定長度的進程,以達到減少內存占有的目的。
from multiprocessing import Pool import time def downloadGif(arg): print(arg[0]) time.sleep(1) def downloading_over(arg): pass def foo(num): for i in range(num,1000001): pic_info=[] pic_info.append(str(i)+'gif') txt_info=[] txt_info.append(str(i)+'txt') yield pic_info,txt_info if __name__ == '__main__': pool = Pool(processes=5) # set the processes max number count=1 for download in foo(2): pool.apply_async(func=downloadGif, args=(download[0],),callback=downloading_over) last=pool.apply_async(func=downloadGif, args=(download[1],),callback=downloading_over) count=count+1 print(count) if len(pool._cache) > 1e3: print("waiting for cache to clear...") last.wait() #1e3,500條,占有內存10M #1e4,5000條,占有內存20M #1e5,50000條,占有內存200M #1e6,500000條,占有內存2000M pool.close() pool.join()
核心代碼:
if len(pool._cache) > 1e3: print("waiting for cache to clear...") last.wait()
last 是 AsyncResult
的實例,是pool的返回值
https://docs.python.org/3/library/multiprocessing.html
class multiprocessing.pool.
AsyncResult
¶
The class of the result returned by Pool.apply_async()
and Pool.map_async()
.
-
get
([timeout]) -
Return the result when it arrives. If timeout is not
None
and the result does not arrive within timeout seconds thenmultiprocessing.TimeoutError
is raised. If the remote call raised an exception then that exception will be reraised byget()
.
-
wait
([timeout]) -
Wait until the result is available or until timeout seconds pass.
-
ready
() -
Return whether the call has completed.
-
successful
() -
Return whether the call completed without raising an exception. Will raise
ValueError
if the result is not ready.
本文參考下面鏈接回答: