python的multiprocessing模塊是用來創建多進程的,下面對multiprocessing總結一下使用記錄。
multiprocessing創建多進程在windows和linux系統下的對比
fork()
import ospid = os.fork() # 創建一個子進程if pid == 0: print('這是子進程') print(os.getpid(),os.getppid())else: print('這是父進程') print(os.getpid())os.wait() # 等待子進程結束釋放資源
-
fork函數被調用后會返回兩次,pid為0的代表子進程,其他返回子進程的id號表示父進程。
-
getpid和getppid函數可以獲取本進程和父進程的id號;
fork方式的缺點:
-
兼容性差,只能在類linux系統下使用,windows系統不可使用;
-
擴展性差,當需要多條進程的時候,進程管理變得很復雜;
-
會產生“孤兒”進程和“僵屍”進程,需要手動回收資源。
優點:
是系統自帶的接近低層的創建方式,運行效率高。
Process創建進程
-
創建方式一:
from multiprocessing import Queue, Processimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))if __name__ == '__main__': p = Process(target=test) p.start() # 子進程 開始執行 p.join() # 等待子進程結束 print('ths peocess is ended')
-
創建方式二:
from multiprocessing import Queue, Processimport osclass MyProcess(Process): def run(self): time.sleep(2) print('this is process {}'.format(os.getpid())) def __del__(self): print('del the process {}'.format(os.getpid()))if __name__ == '__main__': p = MyProcess() p.start() print('ths process is ended')# 結果:ths process is ended this is process 7600del the process 7600del the process 12304
說明:
-
Process對象可以創建進程,但Process對象不是進程,其刪除與否與系統資源是否被回收沒有直接的關系。
-
上例看到del方法被調用了兩次,Process進程創建時,子進程會將主進程的Process對象完全復制一份,這樣在主進程和子進程各有一個Process對象,但是p1.start()啟動的是子進程,主進程中的Process對象作為一個靜態對象存在。
-
主進程執行完畢后會默認等待子進程結束后回收資源,不需要手動回收資源;
-
join()函數用來控制子進程結束的順序,主進程會阻塞等待子進程結束,其內部也有一個清除僵屍進程的函數,可以回收資源;
-
當子進程執行完畢后,會產生一個僵屍進程,其會被join函數回收,或者再有一條進程開啟,start函數也會回收僵屍進程,所以不一定需要寫join函數。
-
windows系統在子進程結束后會立即自動清除子進程的Process對象,而linux系統子進程的Process對象如果沒有join函數和start函數的話會在主進程結束后統一清除。
Process對象分析
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): pass # Process對象是python用來創建進程的類 group:擴展保留字段; target:目標代碼,一般是我們需要創建進程執行的目標函數。 name:進程的名字,如果不指定會自動分配一個; args:目標函數的普通參數; kwargs:目標函數的鍵值對參數; # 方法 start():創建一個子進程並執行,該方法一個Process實例只能執行一次,其會創建一個進程執行該類的run方法。 run():子進程需要執行的代碼; join():主進程阻塞等待子進程直到子進程結束才繼續執行,可以設置等待超時時間timeout. terminate():使活着的進程終止; is_alive():判斷子進程是否還活着。
進程池Pool
如果需要創建大量的進程,就需要使用Pool了。
from multiprocessing import Queue, Process, Poolimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))def get_pool(n=5): p = Pool(n) # 設置進程池的大小 for i in range(10): p.apply_async(test) p.close() # 關閉進程池 p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
分析:廈門廈工叉車怎么樣?
-
如上,進程池Pool被創建出來后,即使實際需要創建的進程數遠遠大於進程池的最大上限,p1.apply_async(test)代碼依舊會不停的執行,並不會停下等待;相當於向進程池提交了10個請求,會被放到一個隊列中;
-
當執行完p1 = Pool(5)這條代碼后,5條進程已經被創建出來了,只是還沒有為他們各自分配任務,也就是說,無論有多少任務,實際的進程數只有5條,計算機每次最多5條進程並行。
-
當Pool中有進程任務執行完畢后,這條進程資源會被釋放,pool會按先進先出的原則取出一個新的請求給空閑的進程繼續執行;
-
當Pool所有的進程任務完成后,會產生5個僵屍進程,如果主線程不結束,系統不會自動回收資源,需要調用join函數去回收。
-
join函數是主進程等待子進程結束回收系統資源的,如果沒有join,主程序退出后不管子進程有沒有結束都會被強制殺死;
-
創建Pool池時,如果不指定進程最大數量,默認創建的進程數為系統的內核數量.
Pool對象分析
class Pool(object): def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): pass # 初始化參數 processes:進程池的大小,默認cpu內核的數量 initializer:創建進程執行的目標函數,其會按照進程池的大小創建相應個數的進程; initargs:目標函數的參數 context:代碼的上下文 # 方法 apply():使用阻塞方式調用func; apply_async():使用非阻塞方式條用func; close():關閉Pool,使其不再接受新的任務; terminate():不管任務是否完成,立即終止; join():主進程阻塞,等待子進程的退出,必須在close()后面使用; map(self, func, iterable, chunksize=None):多進程執行一個函數,傳入不同的參數; starmap(self, func, iterable, chunksize=None):和map類似,但iterable參數可解壓縮; starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):使用異步的方式的starmap,callback為返回后的處理函數 map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步方式的map
-
實例
from multiprocessing import Poolimport osdef test(n): time.sleep(1) print('this is process {}'.format(os.getpid())) return ndef test1(n, m): print(n, m) print('this is process {}'.format(os.getpid()))def back_func(values): # 多進程執行完畢會返回所有的結果的列表 print(values)def back_func_err(values): # 多進程執行完畢會返回所有錯誤的列表 print(values)def get_pool(n=5): p = Pool(n) # p.map(test, (i for i in range(10))) # 阻塞式多進程執行 # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多進程執行多參數函數 # 異步多進程執行函數 p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err) # 異步多進程執行多參數函數 p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err) print('-----') p.close() p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
進程鎖
進程雖然不像線程那樣共享內存的數據,而是每個進程有單獨的內存,但多進程也是共享文件系統的,即硬盤系統;當多進程同時寫入文件操作時,可能造成數據的破壞,因此進程也存在同步鎖。
from multiprocessing import Pool, Lockmuex = Lock()def test(): if muex.acquire(): f = open('./test_pro.txt', 'r+', encoding='utf-8') x = f.read() if not x: f.write('0') else: f.seek(0) f.write(str(int(x)+1)) f.close() muex.release()if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(test) p.close() p.join() with open('./test_pro.txt', 'r+', encoding='utf-8') as f: print(f.read())
進程鎖可以保證文件系統的安全,但是它使得並行變成了串行,效率下降了,也可能造成死鎖問題,一般避免用鎖機制。