python多進程multiprocessing Pool相關問題


python多進程想必大部分人都用到過,可以充分利用多核CPU讓代碼效率更高效。

我們看看multiprocessing.pool.Pool.map的官方用法

map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

 

一、多參數傳入如何變成一個參數

map的用法,函數func只允許一個可迭代的參數傳遞進去。

如果我們需要傳遞多個參數時怎么辦呢,

一種方法是把多個參數放入到一個list或者元祖里當做一個參數傳入func中

還有一種是使用偏函數,偏函數(Partial function)是通過將一個函數的部分參數預先綁定為某些值,從而得到一個新的具有較少可變參數的函數。在Python中,可以通過functools中的partial高階函數來實現偏函數功能。偏函數partial的源碼如下:

def partial(func, *args, **keywords):
    """New function with partial application of the given arguments
    and keywords.
    """
    if hasattr(func, 'func'):
        args = func.args + args
        tmpkw = func.keywords.copy()
        tmpkw.update(keywords)
        keywords = tmpkw
        del tmpkw
        func = func.func

    def newfunc(*fargs, **fkeywords):
        newkeywords = keywords.copy()
        newkeywords.update(fkeywords)
        return func(*(args + fargs), **newkeywords)
    newfunc.func = func
    newfunc.args = args
    newfunc.keywords = keywords
    return newfunc

使用方法也很簡單,比如我們有一個func函數,里面要傳入texts,lock, data三個參數,但是我們想要用多進程把data分別傳入進去計算,那么我們就可以先用partial函數,將texts和lock先固定到函數里組成一個新的函數,然后新函數傳入data一個參數就可以了

from functools import partial
def func(texts, lock, data):
    ......


pt = partial(func, tests, lock)  

# 新函數pt只需要傳入一個參數data

這我們就可以對pt函數套用pool.map函數並且只傳入一個參數data里。

 

二、多進程內存復制

python對於多進程中使用的是copy on write機制,python 使用multiprocessing來創建多進程時,無論數據是否不會被更改,子進程都會復制父進程的狀態(內存空間數據等)。所以如果主進程耗的資源較多時,不小心就會造成不必要的大量的內存復制,從而可能導致內存爆滿的情況。

進程的啟動有spawn、fork、forkserver三種方式

spawn:調用該方法,父進程會啟動一個新的python進程,子進程只會繼承運行進程對象run()方法所需的那些資源。特別地,子進程不會繼承父進程中不必要的文件描述符和句柄。與使用forkforkserver相比,使用此方法啟動進程相當慢。

           Available on Unix and Windows. The default on Windows.

fork:父進程使用os.fork()來fork Python解釋器。子進程在開始時實際上與父進程相同,父進程的所有資源都由子進程繼承。請注意,安全創建多線程進程尚存在一定的問題。

          Available on Unix only. The default on Unix.

forkserver:當程序啟動並選擇forkserverstart方法時,將啟動服務器進程。從那時起,每當需要一個新進程時,父進程就會連接到服務器並請求它fork一個新進程。 fork服務器進程是單線程的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。

         Available on Unix platforms which support passing file descriptors over Unix pipes.

      要選擇以上某一種start方法,請在主模塊的if __name__ == '__ main__'子句中使用mp.set_start_method()。並且mp.set_start_method()在一個程序中僅僅能使用一次。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

設置maxtasksperchild=1,因此,每個任務完成后都不會重新生成進程,

對pool.map的調用中指定chunksize = 1.這樣iterable中的多個項目將不會從工作進程的感知捆綁到一個“任務”中:

import multiprocessing
import time
import os

def f(x):
    print("PID: %d" % os.getpid())
    time.sleep(x)
    complex_obj = 5 #more complex axtually
    return complex_obj

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    pool = multiprocessing.Pool(4, maxtasksperchild=1)
    pool.map(f, [5]*30, chunksize=1)
    pool.close()

 

三、多進程間通訊

還有一種情況是,多進程間要相互之間通訊,比如我每一個進程的結果都要存入到texts這個list里。當然要把這個texts當做參數傳入到函數里面,但是一般的list並不能共享給所有的進程,我們需要用multiprocessing.Manager().list()建立的list才可以用於進程間通訊,防止沖突,還要對texts加上鎖,防止操作沖突。注意multiprocessing.Lock() 創建的鎖不能傳遞,需要使用multiprocessing.Manager().Lock()來創建。multiprocessing.Manager()可創建字典,也可創建list,lock,它創建的變量可用於多進程間傳遞才不會出錯。比如以下代碼:

texts = multiprocessing.Manager().list()
lock = multiprocessing.Manager().Lock()
pool = multiprocessing.Pool(processes=4)
data = list(range(20))
pt = partial(func, texts, lock)
pool.map(pt, data)
pool.close()
pool.join()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM