(轉)Python多進程Process、Pool的使用總結


原文:https://www.cnblogs.com/wangdac/p/13892208.html

python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。

進程池Pool模塊接口說明

拆分任務一提高執行效率時,獨立多進程apply_async(),通過列表解析添加子任務,是最優選擇。, 見pool部分

multiprocessing.Pool類接口詳解

join方法的意義

join()方法可以等待子進程結束后再繼續往下運行(更准確地說,在當前位置阻塞主進程,帶執行join()的進程結束后再繼續執行主進程),通常用於進程間的同步。(進一步地解釋,哪個子進程調用了join方法,主進程就要等該子進程執行完后才能繼續向下執行,具體可見下邊的分析圖)

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

case:使用進程池(非阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print "Sub-process(es) done." ''' 函數解釋: apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別) close() 關閉pool,使其不在接受新的任務。 terminate() 結束工作進程,不在處理未完成的任務。 join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。 執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。 pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 ''' 

Python多進程pool.map()函數,有兩個參數可以傳,第一個參數傳的是函數,第二個參數傳的是數據列表。那么怎么在第二個數據列表,多傳幾個參數呢,方法是通過對有多個參數的方法進行封裝,在進程中運行封裝后的方法。

pool.map

# -*- coding:utf-8 -*- import time import multiprocessing def job(x ,y): """ :param x: :param y: :return: """ return x * y def job1(z): """ :param z: :return: """ return job(z[0], z[1]) if __name__ == "__main__": time1=time.time() pool = multiprocessing.Pool(2) data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)] res = pool.map(job1,data_list) time2=time.time() print(res) pool.close() pool.join() print('總共耗時:' + str(time2 - time1) + 's') 

pool.apply_async

case1 : https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py
#!/usr/bin/env python3 # -*- coding: utf-8 -*- from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') 
case2: https://blog.csdn.net/jinping_shi/article/details/52433867 import multiprocessing import time def func(msg): print multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() # 關閉進程池,表示不能在往進程池中添加進程 pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用 print "Sub-process(es) done." 
case3:https://blog.csdn.net/weixin_42898819/article/details/81811514 from multiprocessing import Process,Pool #導入進程池 import time,os def Foo(i): time.sleep(2) print('到了2s') return i+100 def Bar(arg): print('結果:',arg) if __name__ == '__main__': pool=Pool(processes= 5) #允許進程池同時放入5個進程 for i in range(10): #10個進程都啟動 但是一次只能運行5個 #pool.apply(func= Foo,args=(i,)) #串行執行進程,一次執行1個進程 pool.apply_async(func= Foo,args=(i,),callback= Bar) #並行執行進程,一次5個,callback回調 Foo執行完就會執行Bar print('end') pool.close() pool.join() #等待進程池中的進程執行完畢 必須先close() 在join() 

什么時候用進程池Pool

當我們需要的進程數量不多的時候,我們可以使用multiprocessing的Process類來創建進程。但是如果我們需要的進程特別多的時候,手動創建工作量太大了,所以Python也為我們提供了Pool(池)的方式來創建大量進程。

from multiprocessing import Pool import os,time def run(msg): print("開始一個子線程運行了……") time.sleep(1) print("開始一個子線程運行結束了……") if __name__ == "__main__": pool = Pool(3) # 表示初始化一個進程池,最大進程數為5 for x in range(10): pool.apply_async(run, args=("hello pool",)) print("------start----") pool.close() # 關閉池 pool.join() # 等待所有的子進程完成,必須放在close后面 print("-------end------") ''' 注意:一般我們使用apply_async這個方法,表示非阻塞的運行,一旦使用了apply方法表示阻塞式執行任務,此時就是單任務執行了(一般不會使用,特殊場景才會使用) ''' 

Pool.map多參數任務

map的多參數解決辦法

#也可以用List將多個參數拼接成一個argList元組,然后多個argList再組合為pool.map要求的可迭代對象 def job(x ,y): return x * y def job1(z): return job(z[0], z[1]) if __name__ == "__main__": pool = multiprocessing.Pool() res = pool.map(job1, [(2, 3), (3, 4)]) print res 
# 將多個輸入變量打包到一個參數 x = [1,2,3,4,5,6] y = [1,1,1,1,1,1] x_y = zip(x, y) results = pool.map(work, x_y) #使用pathos包下的multiprocessing 這個包是使用dill的multiprocessing的一個fork,允許多參數輸入: from pathos.multiprocessing import ProcessingPoll as Pool pool = Pool(4) results = pool.map(work, x, y) 

Pool.apply_async()有序輸出多個迭代結果

在使用apply_async()方法接收多個參數的方法時,在任務方法中正常定義多個參數,參數以元組形式傳入即可
但是給apply_async()方法傳入多個值獲取多個迭代結果時就會報錯,因為該方法只能接收一個值,所以可以將該方法放入一個列表生成式中,如下

def job(x): return x * x if __name__ == "__main__": pool multiprocessing.Pool() res = [pool.apply_async(target=job, (i,)) for i in range(3)] print [r.get() for r in res] 

進程池 apply_async, map方法示例

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。就是固定有幾個進程可以使用。

進程池中有兩個方法:

apply:同步,一般不使用

apply_async:異步

from multiprocessing import Process,Pool import os, time, random def fun1(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': pool = Pool(5) #創建一個5個進程的進程池 for i in range(10): pool.apply_async(func=fun1, args=(i,)) pool.close() pool.join() print('結束測試') 

結果

Run task 0 (37476)... Run task 1 (4044)... Task 0 runs 0.03 seconds. Run task 2 (37476)... Run task 3 (17252)... Run task 4 (16448)... Run task 5 (24804)... Task 2 runs 0.27 seconds. Run task 6 (37476)... Task 1 runs 0.58 seconds. Run task 7 (4044)... Task 3 runs 0.98 seconds. Run task 8 (17252)... Task 5 runs 1.13 seconds. Run task 9 (24804)... Task 6 runs 1.46 seconds. Task 4 runs 2.73 seconds. Task 8 runs 2.18 seconds. Task 7 runs 2.93 seconds. Task 9 runs 2.93 seconds. 結束測試 

Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。

進程池map方法

因為網上看到這個例子覺得不錯,所以這里就不自己寫案例,這個案例比較有說服力

import os import PIL from multiprocessing import Pool from PIL import Image SIZE = (75,75) SAVE_DIRECTORY = \'thumbs\' def get_image_paths(folder): return (os.path.join(folder, f) for f in os.listdir(folder) if \'jpeg\' in f) def create_thumbnail(filename): im = Image.open(filename) im.thumbnail(SIZE, Image.ANTIALIAS) base, fname = os.path.split(filename) save_path = os.path.join(base, SAVE_DIRECTORY, fname) im.save(save_path) if __name__ == \'__main__\': folder = os.path.abspath( \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\') os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) images = get_image_paths(folder) pool = Pool() pool.map(creat_thumbnail, images) #關鍵點,images是一個可迭代對象 pool.close() pool.join() 
復制

上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,並將這些縮略圖保存到特定文件夾中。這我的機器上,用這一程序處理 6000 張圖片需要花費 27.9 秒。 map 函數並不支持手動線程管理,反而使得相關的 debug 工作也變得異常簡單。

map在爬蟲的領域里也可以使用,比如多個URL的內容爬取,可以把URL放入元祖里,然后傳給執行函數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM