python進程池:multiprocessing.pool


轉:https://www.cnblogs.com/kaituorensheng/p/4465768.html#_label0

 

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

 

例1:使用進程池

復制代碼
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
    print "Sub-process(es) done."
復制代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函數解釋

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工作進程,不在處理未完成的任務。
  • join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。

執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。

例2:使用進程池(阻塞)

復制代碼
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
    print "Sub-process(es) done."
復制代碼

一次執行的結果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done. 

例3:使用進程池,並關注結果

復制代碼
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."
復制代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 :get()函數得出每個返回結果的值

例4:使用多個進程池

復制代碼
#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的進程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束
    print 'All subprocesses done.'
復制代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee -6948
 
Run task Marlon -2896
 
Run task Allen -7304
 
Run task Frank -3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

 

multiprocessing pool map

復制代碼
#coding: utf-8
import multiprocessing 

def m1(x): 
    print x * x 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    i_list = range(8)
    pool.map(m1, i_list)
復制代碼

一次執行結果

1
2
3
4
5
6
7
8
0
1
4
9
16
25
36
49

 參考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx 

 

問題:http://bbs.chinaunix.net/thread-4111379-1-1.html

復制代碼
#coding: utf-8
import multiprocessing
import logging

def create_logger(i):
    print i

class CreateLogger(object):
    def __init__(self, func):
        self.func = func

if __name__ == '__main__':
    ilist = range(10)

    cl = CreateLogger(create_logger)
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(cl.func, ilist)

    print "hello------------>"
復制代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
11
0
1
2
3
4
5
6
7
8
9
hello------------>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM