什么是進程和線程?
進程是指計算機中已運行的程序,是系統進行資源分配和調度的基本單位;線程是CPU調度和分派的基本單位,一般來說,進程是線程的容器,一個進程可以包含多個線程。最近因為一個計算時間比較長的程序,接觸了Python的多進程計算,Python實現多進程多線程計算還是比較容易的,我用的是Python的multiprocessing模塊。
Python的multiprocessing模塊實現多進程
multiprocessing模塊實現多進程,可以用Process類,也可以用進程池pool(),在這里主要說說我用進程池的一些問題和體會。
multiprocessing安裝用pip工具就可以
pip install multiprocessing
在進行計算之前,如果不知道自己的計算機的CPU核數量,可以用multiprocessing下的命令輸出
multiprocessing.cpu_count()
一個簡單的實現:
import multiprocessing def func(x): y = x * 2 print(multiprocessing.current_process().name,y) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4)# 創建多個進程 for i in range(100): pool.apply_async(func,(i, )) pool.close() pool.join()
代碼中的multiprocessing.current_process().name獲得計算時的CPU核心名,輸出如下,可以看到4個核心都在計算時用到
apply_async是進程池的常用方法之一,類似的方法還有:
apply(self, func, args=(), kwds={}),將func函數提交給進程池處理;
apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None),是apply的一種異步,不會被阻塞;
map(self, func, iterable, chunksize=None),類似於做一個映射;
進程計算結束后,必須用 close() 關閉進程池,join() 是等待進程池中的所有進程執行完畢,必須在close()之后調用。
向函數中傳遞多個參數
以上程序只向func里傳遞了一個函數,那么能不能實現多個函數的傳遞,因為apply等都不支持傳遞多個函數,所以需要對傳遞的多個函數進行一定的處理,這里用到了 zip() 函數,相當於把多個變量壓縮:
import multiprocessing import numpy as np def func(x,y,z): return x+y,z if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 x=np.zeros((100,1)) y=np.zeros((100,1)) z=np.zeros((100,1)) result = [] for i in range(100): x[i,0] = i y[i,0] = i z[i,0] = i xx=np.squeeze(x) yy=np.squeeze(y) zz=np.squeeze(z) c=list(zip(xx,yy,zz)) cc =np.array(c) result=pool.starmap(func,c) pool.close() # 關閉進程池 results=np.array(result) print(results)
因為用apply、map等都不能實現,所以這里用了pool的starmap函數,是與map類似,但是能將傳遞的參數“解壓縮”,就滿足了我們傳遞多個參數的需要。
在函數中使用進程池
另外,還在寫程序的時候發現一個坑,進程池的定義和關閉必須在主程序下面,在函數中想利用進程池進行多線程的計算,需要在主程序下創建進程池,在函數中就可以直接使用,如果不放在主程序下會報錯,而close也需要放在主程序下,不然函數運行一次進程池就關閉了。
import multiprocessing import numpy as np def func(x,y,z): return x+y,z def text(): x = np.zeros((100, 1)) y = np.zeros((100, 1)) z = np.zeros((100, 1)) result = [] for i in range(100): x[i, 0] = i y[i, 0] = i z[i, 0] = i xx = np.squeeze(x) yy = np.squeeze(y) zz = np.squeeze(z) c = list(zip(xx, yy, zz)) result = pool.starmap(func, c) return result if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 result = text() pool.close() # 關閉進程池 pool.join() results=np.array(result) print(results)