python多進程想必大部分人都用到過,可以充分利用多核CPU讓代碼效率更高效。
我們看看multiprocessing.pool.Pool.map的官方用法
map(func, iterable[, chunksize]) A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready. This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
一、多參數傳入如何變成一個參數
map的用法,函數func只允許一個可迭代的參數傳遞進去。
如果我們需要傳遞多個參數時怎么辦呢,
一種方法是把多個參數放入到一個list或者元祖里當做一個參數傳入func中
還有一種是使用偏函數,偏函數(Partial function)是通過將一個函數的部分參數預先綁定為某些值,從而得到一個新的具有較少可變參數的函數。在Python中,可以通過functools中的partial高階函數來實現偏函數功能。偏函數partial的源碼如下:
def partial(func, *args, **keywords): """New function with partial application of the given arguments and keywords. """ if hasattr(func, 'func'): args = func.args + args tmpkw = func.keywords.copy() tmpkw.update(keywords) keywords = tmpkw del tmpkw func = func.func def newfunc(*fargs, **fkeywords): newkeywords = keywords.copy() newkeywords.update(fkeywords) return func(*(args + fargs), **newkeywords) newfunc.func = func newfunc.args = args newfunc.keywords = keywords return newfunc
使用方法也很簡單,比如我們有一個func函數,里面要傳入texts,lock, data三個參數,但是我們想要用多進程把data分別傳入進去計算,那么我們就可以先用partial函數,將texts和lock先固定到函數里組成一個新的函數,然后新函數傳入data一個參數就可以了
from functools import partial def func(texts, lock, data): ...... pt = partial(func, tests, lock) # 新函數pt只需要傳入一個參數data
這我們就可以對pt函數套用pool.map函數並且只傳入一個參數data里。
二、多進程內存復制
python對於多進程中使用的是copy on write機制,python 使用multiprocessing來創建多進程時,無論數據是否不會被更改,子進程都會復制父進程的狀態(內存空間數據等)。所以如果主進程耗的資源較多時,不小心就會造成不必要的大量的內存復制,從而可能導致內存爆滿的情況。
進程的啟動有spawn、fork、forkserver三種方式
spawn:調用該方法,父進程會啟動一個新的python進程,子進程只會繼承運行進程對象run()
方法所需的那些資源。特別地,子進程不會繼承父進程中不必要的文件描述符和句柄。與使用fork
或forkserver
相比,使用此方法啟動進程相當慢。
Available on Unix and Windows. The default on Windows.
fork:父進程使用os.fork()
來fork Python解釋器。子進程在開始時實際上與父進程相同,父進程的所有資源都由子進程繼承。請注意,安全創建多線程進程尚存在一定的問題。
Available on Unix only. The default on Unix.
forkserver:當程序啟動並選擇forkserver
start方法時,將啟動服務器進程。從那時起,每當需要一個新進程時,父進程就會連接到服務器並請求它fork一個新進程。 fork服務器進程是單線程的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。
Available on Unix platforms which support passing file descriptors over Unix pipes.
要選擇以上某一種start方法,請在主模塊的if __name__ == '__ main__'
子句中使用mp.set_start_method()
。並且mp.set_start_method()
在一個程序中僅僅能使用一次。
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
設置maxtasksperchild=1,因此,每個任務完成后都不會重新生成進程,
對pool.map的調用中指定chunksize = 1.這樣iterable中的多個項目將不會從工作進程的感知捆綁到一個“任務”中:
import multiprocessing import time import os def f(x): print("PID: %d" % os.getpid()) time.sleep(x) complex_obj = 5 #more complex axtually return complex_obj if __name__ == '__main__': multiprocessing.set_start_method('spawn') pool = multiprocessing.Pool(4, maxtasksperchild=1) pool.map(f, [5]*30, chunksize=1) pool.close()
三、多進程間通訊
還有一種情況是,多進程間要相互之間通訊,比如我每一個進程的結果都要存入到texts這個list里。當然要把這個texts當做參數傳入到函數里面,但是一般的list並不能共享給所有的進程,我們需要用multiprocessing.Manager().list()建立的list才可以用於進程間通訊,防止沖突,還要對texts加上鎖,防止操作沖突。注意multiprocessing.Lock() 創建的鎖不能傳遞,需要使用multiprocessing.Manager().Lock()來創建。multiprocessing.Manager()可創建字典,也可創建list,lock,它創建的變量可用於多進程間傳遞才不會出錯。比如以下代碼:
texts = multiprocessing.Manager().list() lock = multiprocessing.Manager().Lock() pool = multiprocessing.Pool(processes=4) data = list(range(20)) pt = partial(func, texts, lock) pool.map(pt, data) pool.close() pool.join()