一、multiprocessing模塊
multiprocessing
模塊提供了一個Process
類來代表一個進程對象,multiprocessing模塊像線程一樣管理進程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多
看一下Process類的構造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數說明:
group:進程所屬組(基本不用)
target:表示調用對象
args:表示調用對象的位置參數元組
name:別名
kwargs:表示調用對象的字典
示例:
import multiprocessing def do(n): # 參數n由args=(1,)傳入
name = multiprocessing.current_process().name # 獲取當前進程的名字
print(name, 'starting') print("worker ", n) return
if __name__ == '__main__': numList = [] for i in range(5): p = multiprocessing.Process(target=do, args=(i,)) # (i,)中加入","表示元祖
numList.append(p) print(numList) p.start() # 用start()方法啟動進程,執行do()方法
p.join() # 等待子進程結束以后再繼續往下運行,通常用於進程間的同步
print("Process end.")
運行結果:
[<Process(Process-1, initial)>] Process-1 starting worker 0 Process end. [<Process(Process-1, stopped)>, <Process(Process-2, initial)>] Process-2 starting worker 1 Process end. [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, initial)>] Process-3 starting worker 2 Process end. [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, initial)>] Process-4 starting worker 3 Process end. [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, initial)>] Process-5 starting worker 4 Process end.
通過打印numList可以看出當前進程結束后,再開始下一個進程
注意:
在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要
二、Pool類
Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法:
1.apply()
函數原型:apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,同python中的apply函數一致,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以后不再出現)
2.apply_async
函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一致,但它是非阻塞的且支持結果返回后進行回調
3.map()
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到結果返回
注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程
4.map_async()
函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的
5.close()
關閉進程池(pool),使其不再接受新的任務
6.terminal()
結束工作進程,不再處理未處理的任務
7.join()
主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用
示例1--使用map()函數
import time from multiprocessing import Pool def run(fn): # fn: 函數參數是數據列表的一個元素
time.sleep(1) print(fn * fn) if __name__ == "__main__": testFL = [1, 2, 3, 4, 5, 6] print('shunxu:') # 順序執行(也就是串行執行,單進程)
s = time.time() for fn in testFL: run(fn) t1 = time.time() print("順序執行時間:", int(t1 - s)) print('concurrent:') # 創建多個進程,並行執行
pool = Pool(3) # 創建擁有3個進程數量的進程池
# testFL:要處理的數據列表,run:處理testFL列表中數據的函數
pool.map(run, testFL) pool.close() # 關閉進程池,不再接受新的進程
pool.join() # 主進程阻塞等待子進程的退出
t2 = time.time() print("並行執行時間:", int(t2 - t1))
運行結果:
1、map函數中testFL為可迭代對象--列表
2、當創建3個進程時,會一次打印出3個結果“1,4,9”,當當創建2個進程時,會一次打印出2個結果“1,4”,以此類推,當創建多余6個進程時,會一次打印出所有結果
3、如果使用Pool(),不傳入參數,可以創建一個動態控制大小的進程池
從結果可以看出,並發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以平時工作中,進程數也不能開太大。 對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),讓其不再接受新的Process了
示例2--使用map()_async函數
print('concurrent:') # 創建多個進程,並行執行
pool = Pool(3) # 創建擁有3個進程數量的進程池
# testFL:要處理的數據列表,run:處理testFL列表中數據的函數
pool.map_async(run, testFL)
pool.close() # 關閉進程池,不再接受新的進程
pool.join() # 主進程阻塞等待子進程的退出
t2 = time.time() print("並行執行時間:", int(t2 - t1))
運行結果:
從結果可以看出,map_async()和map()用時相同。目前還沒有看出兩者的區別,后面知道后再完善
示例3--使用apply()函數
print('concurrent:') # 創建多個進程,並行執行
pool = Pool(3) # 創建擁有3個進程數量的進程池
# testFL:要處理的數據列表,run:處理testFL列表中數據的函數
for fn in testFL: pool.apply(run, (fn,)) pool.close() # 關閉進程池,不再接受新的進程
pool.join() # 主進程阻塞等待子進程的退出
t2 = time.time() print("並行執行時間:", int(t2 - t1))
運行結果:
可見,使用apply()方法,並行執行和順序執行用時相同,經過試驗,進程數目增大也不會減少並行執行的時間
原因:以阻塞的形式產生進程任務,生成1個任務進程並等它執行完出池,第2個進程才會進池,主進程一直阻塞等待,每次只執行1個進程任務
示例4--使用apply_async()函數
print('concurrent:') # 創建多個進程,並行執行
pool = Pool(3) # 創建擁有3個進程數量的進程池
# testFL:要處理的數據列表,run:處理testFL列表中數據的函數
for fn in testFL: pool.apply_async(run, (fn,)) pool.close() # 關閉進程池,不再接受新的進程
pool.join() # 主進程阻塞等待子進程的退出
t2 = time.time() print("並行執行時間:", int(t2 - t1))
運行結果:
可見,使用apply_async()方法,並行執行時間與使用map()、map_async()方法相同
注意:
map_async()和map()方法,第2個參數可以是列表也可以是元祖,如下圖:
而使用apply()和apply_async()方法時,第2個參數只能傳入元祖,傳入列表進程不會被執行,如下圖:
三、apply_async()方法callback參數的用法
示例:
from multiprocessing import Pool import time def fun_01(i): time.sleep(2) print('start_time:', time.ctime()) return i + 100
def fun_02(arg): print('end_time:', arg, time.ctime()) if __name__ == '__main__': pool = Pool(3) for i in range(4): pool.apply_async(func=fun_01, args=(i,), callback=fun_02) # fun_02的入參為fun_01的返回值
# pool.apply_async(func=fun_01, args=(i,))
pool.close() pool.join() print('done')
運行結果:
start_time: Thu Nov 14 16:31:41 2019 end_time: 100 Thu Nov 14 16:31:41 2019 start_time: Thu Nov 14 16:31:41 2019 end_time: 101 Thu Nov 14 16:31:41 2019 start_time: Thu Nov 14 16:31:41 2019 end_time: 102 Thu Nov 14 16:31:41 2019 start_time: Thu Nov 14 16:31:43 2019 end_time: 103 Thu Nov 14 16:31:43 2019 done
map_async()方法callback參數的用法與apply_async()相同
四、使用進程池並關注結果
import multiprocessing import time def func(msg): print('hello :', msg, time.ctime()) time.sleep(2) print('end', time.ctime()) return 'done' + msg if __name__ == '__main__': pool = multiprocessing.Pool(2) result = [] for i in range(3): msg = 'hello %s' % i result.append(pool.apply_async(func=func, args=(msg,))) pool.close() pool.join() for res in result: print('***:', res.get()) # get()函數得出每個返回結果的值
print('All end--')
運行結果:
五、多進程執行多個函數
使用apply_async()或者apply()方法,可以實現多進程執行多個方法
示例:
import multiprocessing import time import os def Lee(): print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime()) start = time.time() time.sleep(5) end = time.time() print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime()) def Marlon(): print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime()) start = time.time() time.sleep(10) end = time.time() print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime()) def Allen(): print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime()) start = time.time() time.sleep(15) end = time.time() print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime()) def Frank(): print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime()) start = time.time() time.sleep(20) end = time.time() print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime()) if __name__ == '__main__': func_list = [Lee, Marlon, Allen, Frank] print('parent process id %s' % os.getpid()) pool = multiprocessing.Pool(4) for func in func_list: pool.apply_async(func) print('Waiting for all subprocesses done...') pool.close() pool.join() print('All subprocesses done.')
運行結果:
parent process id 84172 Waiting for all subprocesses done... Run task Lee--84868******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019 Run task Marlon-84252******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019 Run task Allen-85344******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019 Run task Frank-85116******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019 Task Lee,runs 5.00 seconds. ~~~~ Thu Nov 14 17:44:19 2019 Task Marlon runs 10.00 seconds. ~~~~ Thu Nov 14 17:44:24 2019 Task Allen runs 15.00 seconds. ~~~~ Thu Nov 14 17:44:29 2019 Task Frank runs 20.00 seconds. ~~~~ Thu Nov 14 17:44:34 2019 All subprocesses done.
六、其他
1、獲取當前計算機的CPU數量