python 多進程——使用進程池,多進程消費的數據)是一個隊列的時候,他會自動去隊列里依次取數據


我的mac 4核,因此每次執行的時候同時開啟4個線程處理:

# coding: utf-8

import time
from multiprocessing import Pool


def long_time_task(name):
    print 'task %s starts running' % name
    time.sleep(3)
    print 'task %s ends running --3 seconds' % name


if __name__ == '__main__':
    start = time.time()
    p = Pool()
    for i in range(10):  # CPU有幾核,每次就取出幾個進程
        p.apply_async(func=long_time_task, args=(i,))
    p.close()  # 調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了
    p.join()  # 對Pool對象調用join()方法會等待所有子進程執行完畢
    end = time.time()
    print('多進程(非阻塞)執行共需時間為:%.2f' % (end - start))

運行效果:

task 0 starts running
task 1 starts running
task 2 starts running
task 3 starts running
task 0 ends running --3 seconds
task 1 ends running --3 seconds
task 3 ends running --3 seconds
task 2 ends running --3 seconds
task 4 starts running
task 5 starts running
task 6 starts running
task 7 starts running
task 5 ends running --3 seconds
task 4 ends running --3 seconds
task 7 ends running --3 seconds
task 6 ends running --3 seconds
task 8 starts running
task 9 starts running
task 8 ends running --3 seconds
task 9 ends running --3 seconds
多進程(非阻塞)執行共需時間為:9.13

解釋:

CPU先取出0-3號進程,執行完畢后,4~8號進程才開始執行。0-3號進程花了3秒鍾,4~8號 進程也花了3秒。最后兩個進程9,10又花了三秒,一共9秒。

 

 

也就意味着,我的代碼可以這樣寫,當history_ddos(多進程消費的數據)是一個隊列的時候,他會自動去隊列里依次取數據

    f = open("history_ddos.json", "r")
    history_ddos = json.load(f)
    f.close()

    # 10表示進程池中最多有10個進程一起執行
    p = Pool(10)
    for item in history_ddos:
        # find_ddos_botnet(item)
        p.apply_async(func=find_ddos_botnet(), args=(item,))
    p.close()
    p.join()

 多個子進程並返回值 apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。

import multiprocessing
    import time
     
    def func(msg):
        return multiprocessing.current_process().name + '-' + msg
     
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4) # 創建4個進程
        results = []
        for i in xrange(10):
            msg = "hello %d" %(i)
            results.append(pool.apply_async(func, (msg, )))
        pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用
        pool.join() # 等待進程池中的所有進程執行完畢
        print ("Sub-process(es) done.")
     
        for res in results:
            print (res.get())

結果:

Sub-process(es) done.
ForkPoolWorker-1-hello 0
ForkPoolWorker-2-hello 1
ForkPoolWorker-3-hello 2
ForkPoolWorker-1-hello 3
ForkPoolWorker-4-hello 4
ForkPoolWorker-1-hello 5
ForkPoolWorker-2-hello 6
ForkPoolWorker-1-hello 7
ForkPoolWorker-2-hello 8
ForkPoolWorker-3-hello 9


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM