python 多進程及並行計算: multiprocessing總結 & joblib.Parallel函數


一、背景

  • 由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。
  • Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。
  • multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

二、 multiprocessing包介紹

multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。

但在使用這些共享API的時候,我們要注意以下幾點:

  • 在UNIX平台上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。
  • multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
  • 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由於每個進程有自己獨立的內存空間,以上方法並不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,並因為同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果進程還沒有start(),則PID為None。

window系統下,需要注意的是要想啟動一個子進程,必須加上那句if name == "main",進程相關的要寫在這句下面。

簡單創建多進程
點擊查看代碼1:直接在for循環中將Process類一個初始化,把需要並行化的函數作為類的輸入,然后start該對象即可
from multiprocessing import Process
import threading
import time


def foo(i):
    print 'say hi', i

if __name__ == '__main__':
    for i in range(10):  #創建10個進程
        p = Process(target=foo, args=(i,)) 
        p.start()
點擊查看代碼1運行結果
say hi 4
say hi 3
say hi 5
say hi 2
say hi 1
say hi 6
say hi 0
say hi 7
say hi 8
say hi 9

Process finished with exit code 0
#可以看出多個進程隨機順序執行
點擊查看代碼2:將Procee生成一個自定義的派生類,在派生類中自定義run函數
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self, arg):
        super(MyProcess, self).__init__()
        self.arg = arg

    def run(self):
        print 'say hi', self.arg
        time.sleep(1)
    
if __name__ == '__main__':

    for i in range(10):
        p = MyProcess(i)
        p.start()

三、 jobLib.Parallel函數

Joblib:將Python代碼轉換為並行計算模式,可以大大簡化我們寫並行計算代碼的步驟.過操作該包內的函數來實現目標代碼的並行計算,從而提高代碼運行效率。

3.1 例子

3.1.1 不並行操作

首先 ,定義一個簡單的函數single(a),該函數順序執行休眠1s然后打印a的值的操作:

from joblib import Parallel, delayed
import time
def single(a):
    """ 定義一個簡單的函數  """
    time.sleep(1)  # 休眠1s
    print(a)       # 打印出a

然后使用for循環運行10次single()函數,並記錄運行的時間,由結果可知,這種情況下代碼大概會運行10s。


start = time.time()  # 記錄開始的時間
for i in range(10):  # 執行10次single()函數
    single(i)
Time = time.time() - start  # 計算執行的時間
print(str(Time)+'s')
 
#  運行結果如下  #
0
1
2
3
4
5
6
7
8
9
10.0172278881073s

不並行操作的時候,一個函數操作是1s,則運行多少次就得花多少倍的時間。

3.1.2 使用Parallel包來並行操作
  • Parallel函數會創建一個進程池,以便在多進程中執行每一個列表項。
  • 函數中,我們設置參數n_jobs來設置開啟進程數。
  • 函數delayed是一個創建元組(function, args, kwargs)的簡單技巧,比如下面代碼中的意思是創建10個實參分別為0~9的single()函數的workers。
start = time.time()  # 記錄開始的時間
Parallel(n_jobs=3)(delayed(single)(i) for i in range(10))   # 並行化處理
Time = time.time() - start  # 計算執行的時間
print(str(Time)+'s')

#  運行結果如下  #
0
1
2
3
4
5
6
7
8
9
4.833665370941162s

可見並行化處理后,運行時間相比順序執行大大減小。由於進程切換等操作的時間開銷,最終的執行時間並不是理想的3.33s,而是大於一個3.33s的時間。
當n_jobs的值為1時,即相當於for循環的順序執行,結果仍然會是10s。因此我們可以改變不同的n_jobs值來查看最終的運行結果。

3.2 Parallel函數介紹

3.2.1 Parallel函數的定義方式:
class joblib.parallel(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', 
                   batch_size='auto',temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None)

Parallel參數眾多,但常用的基本只有n_jobs和backend參數。

3.2.2 n_jobs: int, default: None —— 設置並行執行任務的最大數量。

當backend="multiprocessing"時指python工作進程的數量,或者backend="threading"時指線程池大小。當n_jobs=-1時,使用所有的CPU執行並行計算。當n_jobs=1時,就不會使用並行代碼,即等同於順序執行,可以在debug情況下使用。另外,當n_jobs<-1時,將會使用(n_cpus + 1 + n_jobs)個CPU,例如n_jobs=-2時,將會使用n_cpus-1個CPU核,其中n_cpus為CPU核的數量。當n_jobs=None的情況等同於n_jobs=1

The maximum number of concurrently running jobs, such as the number of Python worker processes when backend ="multiprocessing" or the size of the thread-pool when backend="threading". If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. None is a marker for 'unset' that will be interpreted as n_jobs=1 (sequential execution) unless the call is performed under a parallel_backend context manager that sets another value for n_jobs.

3.2.3 backend: str, default: 'loky' —— 指定並行化后端的實現方法

backend='loky': 在與Python進程交換輸入和輸出數據時,可導致一些通信和內存開銷。

backend='multiprocessing': 基於multiprocessing.Pool的后端,魯棒性不如loky。

backend='threading': threading是一個開銷非常低的backend。但是如果被調用的函數大量依賴於Python對象,它就會受到Python全局解釋器(GIL)鎖的影響。當執行瓶頸是顯式釋放GIL的已編譯擴展時,“threading”非常有用(例如,封裝在“with nogil”塊中的Cython循環,或者對庫(如NumPy)的大量調用)。

  • "loky" used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes.

  • "multiprocessing" previous process-based backend based on multiprocessing.Pool. Less robust than loky`.

  • "threading" is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. "threading" is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a "with nogil" block or an expensive call to a library such as NumPy).

  • finally, you can register backends by calling register_parallel_backend. This will allow you to implement a backend of your liking.

It is not recommended to hard-code the backend name in a call to Parallel in a library. Instead it is recommended to set soft hints (prefer) or hard constraints (require) so as to make it possible for library users to change the backend from the outside using the parallel_backend context manager.

ref:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM