我的mac 4核,因此每次執行的時候同時開啟4個線程處理:
# coding: utf-8 import time from multiprocessing import Pool def long_time_task(name): print 'task %s starts running' % name time.sleep(3) print 'task %s ends running --3 seconds' % name if __name__ == '__main__': start = time.time() p = Pool() for i in range(10): # CPU有幾核,每次就取出幾個進程 p.apply_async(func=long_time_task, args=(i,)) p.close() # 調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了 p.join() # 對Pool對象調用join()方法會等待所有子進程執行完畢 end = time.time() print('多進程(非阻塞)執行共需時間為:%.2f' % (end - start))
運行效果:
task 0 starts running
task 1 starts running
task 2 starts running
task 3 starts running
task 0 ends running --3 seconds
task 1 ends running --3 seconds
task 3 ends running --3 seconds
task 2 ends running --3 seconds
task 4 starts running
task 5 starts running
task 6 starts running
task 7 starts running
task 5 ends running --3 seconds
task 4 ends running --3 seconds
task 7 ends running --3 seconds
task 6 ends running --3 seconds
task 8 starts running
task 9 starts running
task 8 ends running --3 seconds
task 9 ends running --3 seconds
多進程(非阻塞)執行共需時間為:9.13
解釋:
CPU先取出0-3號進程,執行完畢后,4~8號進程才開始執行。0-3號進程花了3秒鍾,4~8號 進程也花了3秒。最后兩個進程9,10又花了三秒,一共9秒。
也就意味着,我的代碼可以這樣寫,當history_ddos(多進程消費的數據)是一個隊列的時候,他會自動去隊列里依次取數據:
f = open("history_ddos.json", "r") history_ddos = json.load(f) f.close() # 10表示進程池中最多有10個進程一起執行 p = Pool(10) for item in history_ddos: # find_ddos_botnet(item) p.apply_async(func=find_ddos_botnet(), args=(item,)) p.close() p.join()
多個子進程並返回值 apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。
import multiprocessing import time def func(msg): return multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 results = [] for i in xrange(10): msg = "hello %d" %(i) results.append(pool.apply_async(func, (msg, ))) pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用 pool.join() # 等待進程池中的所有進程執行完畢 print ("Sub-process(es) done.") for res in results: print (res.get())
結果:
Sub-process(es) done.
ForkPoolWorker-1-hello 0
ForkPoolWorker-2-hello 1
ForkPoolWorker-3-hello 2
ForkPoolWorker-1-hello 3
ForkPoolWorker-4-hello 4
ForkPoolWorker-1-hello 5
ForkPoolWorker-2-hello 6
ForkPoolWorker-1-hello 7
ForkPoolWorker-2-hello 8
ForkPoolWorker-3-hello 9