Python多進程Process、Pool的使用總結


Python多進程Process、Pool的使用總結

序. multiprocessing包

python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。

進程池Pool模塊接口說明

拆分任務一提高執行效率時,獨立多進程apply_async(),通過列表解析添加子任務,是最優選擇。見pool部分

multiprocessing.Pool類接口詳解

join方法的意義

join()方法可以等待子進程結束后再繼續往下運行(更准確地說,在當前位置阻塞主進程,帶執行join()的進程結束后再繼續執行主進程),通常用於進程間的同步。(進一步地解釋,哪個子進程調用了join方法,主進程就要等該子進程執行完后才能繼續向下執行,具體可見下邊的分析圖)

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

case:使用進程池(非阻塞)
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
    print "Sub-process(es) done."

'''    
函數解釋:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
close()    關閉pool,使其不在接受新的任務。
terminate()    結束工作進程,不在處理未完成的任務。
join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。
執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。    
pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
'''

Python多進程pool.map()函數,有兩個參數可以傳,第一個參數傳的是函數,第二個參數傳的是數據列表。那么怎么在第二個數據列表,多傳幾個參數呢,方法是通過對有多個參數的方法進行封裝,在進程中運行封裝后的方法。

pool.map

# -*- coding:utf-8 -*-

import time
import multiprocessing

def job(x ,y):
	"""
	:param x:
	:param y:
	:return:
	"""
	return x * y

def job1(z):
	"""
	:param z:
	:return:
	"""
	return job(z[0], z[1])

if __name__ == "__main__":
	time1=time.time()
	pool = multiprocessing.Pool(2)
	data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)]
	res = pool.map(job1,data_list)
	time2=time.time()
	print(res)
	pool.close()
	pool.join()
	print('總共耗時:' + str(time2 - time1) + 's')

pool.apply_async

case1 : https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
case2: https://blog.csdn.net/jinping_shi/article/details/52433867
import multiprocessing
import time

def func(msg):
    print multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 創建4個進程
    for i in xrange(10):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))
    pool.close() # 關閉進程池,表示不能在往進程池中添加進程
    pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用
    print "Sub-process(es) done."
case3:https://blog.csdn.net/weixin_42898819/article/details/81811514
from multiprocessing import Process,Pool  #導入進程池
import time,os

def Foo(i):
    time.sleep(2)
    print('到了2s')
    return i+100
def Bar(arg):
    print('結果:',arg)

if __name__  == '__main__':  
    pool=Pool(processes= 5)  #允許進程池同時放入5個進程

    for i in range(10):  #10個進程都啟動 但是一次只能運行5個
        #pool.apply(func= Foo,args=(i,))  #串行執行進程,一次執行1個進程
        pool.apply_async(func= Foo,args=(i,),callback= Bar) #並行執行進程,一次5個,callback回調 Foo執行完就會執行Bar
    print('end')
    pool.close()
    pool.join() #等待進程池中的進程執行完畢  必須先close()  在join()

什么時候用進程池Pool

當我們需要的進程數量不多的時候,我們可以使用multiprocessing的Process類來創建進程。但是如果我們需要的進程特別多的時候,手動創建工作量太大了,所以Python也為我們提供了Pool(池)的方式來創建大量進程。

from multiprocessing import Pool
import os,time

def run(msg):
    print("開始一個子線程運行了……")
    time.sleep(1)
    print("開始一個子線程運行結束了……")

if __name__ == "__main__":
    pool = Pool(3)  # 表示初始化一個進程池,最大進程數為5
    for x in range(10):
        pool.apply_async(run, args=("hello pool",))
    print("------start----")
    pool.close() # 關閉池
    pool.join() # 等待所有的子進程完成,必須放在close后面
    print("-------end------")
    
'''
注意:一般我們使用apply_async這個方法,表示非阻塞的運行,一旦使用了apply方法表示阻塞式執行任務,此時就是單任務執行了(一般不會使用,特殊場景才會使用)
'''    

Pool.map多參數任務

map的多參數解決辦法

#也可以用List將多個參數拼接成一個argList元組,然后多個argList再組合為pool.map要求的可迭代對象

def job(x ,y):
	return x * y

def job1(z):
    return job(z[0], z[1])

if __name__ == "__main__":
	pool = multiprocessing.Pool()
	res = pool.map(job1, [(2, 3), (3, 4)])
	print res
# 將多個輸入變量打包到一個參數
x = [1,2,3,4,5,6]
y = [1,1,1,1,1,1]
x_y = zip(x, y)
results = pool.map(work, x_y)

#使用pathos包下的multiprocessing
這個包是使用dill的multiprocessing的一個fork,允許多參數輸入:

from pathos.multiprocessing import ProcessingPoll as Pool
pool = Pool(4)
results = pool.map(work, x, y)

Pool.apply_async()有序輸出多個迭代結果

在使用apply_async()方法接收多個參數的方法時,在任務方法中正常定義多個參數,參數以元組形式傳入即可
但是給apply_async()方法傳入多個值獲取多個迭代結果時就會報錯,因為該方法只能接收一個值,所以可以將該方法放入一個列表生成式中,如下

def job(x):
    return x * x

if __name__ == "__main__":
    pool multiprocessing.Pool()
    res = [pool.apply_async(target=job, (i,)) for i in range(3)]
    print [r.get() for r in res]

進程池 apply_async, map方法示例

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。就是固定有幾個進程可以使用。

進程池中有兩個方法:

apply:同步,一般不使用

apply_async:異步

from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) #創建一個5個進程的進程池

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    pool.close()
    pool.join()
    print('結束測試')

結果

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
結束測試

Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。

進程池map方法

因為網上看到這個例子覺得不錯,所以這里就不自己寫案例,這個案例比較有說服力

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == \'__main__\':
    folder = os.path.abspath(
        \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images) #關鍵點,images是一個可迭代對象
    pool.close()
    pool.join()

上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,並將這些縮略圖保存到特定文件夾中。這我的機器上,用這一程序處理 6000 張圖片需要花費 27.9 秒。 map 函數並不支持手動線程管理,反而使得相關的 debug 工作也變得異常簡單。

map在爬蟲的領域里也可以使用,比如多個URL的內容爬取,可以把URL放入元祖里,然后傳給執行函數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM