Python利用multiprocessing模塊進程池實現多進程(傳遞多個參數函數,多函數嵌套使用)


  什么是進程和線程?

  進程是指計算機中已運行的程序,是系統進行資源分配和調度的基本單位;線程是CPU調度和分派的基本單位,一般來說,進程是線程的容器,一個進程可以包含多個線程。最近因為一個計算時間比較長的程序,接觸了Python的多進程計算,Python實現多進程多線程計算還是比較容易的,我用的是Python的multiprocessing模塊。

 

  Python的multiprocessing模塊實現多進程

  multiprocessing模塊實現多進程,可以用Process類,也可以用進程池pool(),在這里主要說說我用進程池的一些問題和體會。

  multiprocessing安裝用pip工具就可以

pip install multiprocessing

  在進行計算之前,如果不知道自己的計算機的CPU核數量,可以用multiprocessing下的命令輸出

multiprocessing.cpu_count()

  一個簡單的實現:

import multiprocessing

def func(x):
    y = x * 2
    print(multiprocessing.current_process().name,y)


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)# 創建多個進程
    for i in range(100):
        pool.apply_async(func,(i, ))
    pool.close()
    pool.join()

  代碼中的multiprocessing.current_process().name獲得計算時的CPU核心名,輸出如下,可以看到4個核心都在計算時用到

  apply_async是進程池的常用方法之一,類似的方法還有:

  apply(self, func, args=(), kwds={}),將func函數提交給進程池處理;

  apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None),是apply的一種異步,不會被阻塞;

  map(self, func, iterable, chunksize=None),類似於做一個映射;

  進程計算結束后,必須用 close() 關閉進程池,join() 是等待進程池中的所有進程執行完畢,必須在close()之后調用。

 

  向函數中傳遞多個參數

  以上程序只向func里傳遞了一個函數,那么能不能實現多個函數的傳遞,因為apply等都不支持傳遞多個函數,所以需要對傳遞的多個函數進行一定的處理,這里用到了 zip() 函數,相當於把多個變量壓縮:

import multiprocessing
import numpy as np

def func(x,y,z):
    return x+y,z

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)  # 創建4個進程
    x=np.zeros((100,1))
    y=np.zeros((100,1))
    z=np.zeros((100,1))
    result = []
    for i in range(100):
        x[i,0] = i
        y[i,0] = i
        z[i,0] = i
    xx=np.squeeze(x)
    yy=np.squeeze(y)
    zz=np.squeeze(z)
    c=list(zip(xx,yy,zz))
    cc =np.array(c)
    result=pool.starmap(func,c)
    pool.close() # 關閉進程池
    results=np.array(result)
    print(results)

  因為用apply、map等都不能實現,所以這里用了pool的starmap函數,是與map類似,但是能將傳遞的參數“解壓縮”,就滿足了我們傳遞多個參數的需要。

  

  在函數中使用進程池

  另外,還在寫程序的時候發現一個坑,進程池的定義和關閉必須在主程序下面,在函數中想利用進程池進行多線程的計算,需要在主程序下創建進程池,在函數中就可以直接使用,如果不放在主程序下會報錯,而close也需要放在主程序下,不然函數運行一次進程池就關閉了。

import multiprocessing
import numpy as np

def func(x,y,z):
    return x+y,z

def text():
    x = np.zeros((100, 1))
    y = np.zeros((100, 1))
    z = np.zeros((100, 1))
    result = []
    for i in range(100):
        x[i, 0] = i
        y[i, 0] = i
        z[i, 0] = i
    xx = np.squeeze(x)
    yy = np.squeeze(y)
    zz = np.squeeze(z)
    c = list(zip(xx, yy, zz))
    result = pool.starmap(func, c)
    return result

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)  # 創建4個進程
    result = text()
    pool.close() # 關閉進程池
    pool.join()
    results=np.array(result)
    print(results)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM