(转)Python多进程Process、Pool的使用总结


原文:https://www.cnblogs.com/wangdac/p/13892208.html

python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。

进程池Pool模块接口说明

拆分任务一提高执行效率时,独立多进程apply_async(),通过列表解析添加子任务,是最优选择。, 见pool部分

multiprocessing.Pool类接口详解

join方法的意义

join()方法可以等待子进程结束后再继续往下运行(更准确地说,在当前位置阻塞主进程,带执行join()的进程结束后再继续执行主进程),通常用于进程间的同步。(进一步地解释,哪个子进程调用了join方法,主进程就要等该子进程执行完后才能继续向下执行,具体可见下边的分析图)

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

case:使用进程池(非阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print "Sub-process(es) done." ''' 函数解释: apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别) close() 关闭pool,使其不在接受新的任务。 terminate() 结束工作进程,不在处理未完成的任务。 join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。 执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。 pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 ''' 

Python多进程pool.map()函数,有两个参数可以传,第一个参数传的是函数,第二个参数传的是数据列表。那么怎么在第二个数据列表,多传几个参数呢,方法是通过对有多个参数的方法进行封装,在进程中运行封装后的方法。

pool.map

# -*- coding:utf-8 -*- import time import multiprocessing def job(x ,y): """ :param x: :param y: :return: """ return x * y def job1(z): """ :param z: :return: """ return job(z[0], z[1]) if __name__ == "__main__": time1=time.time() pool = multiprocessing.Pool(2) data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)] res = pool.map(job1,data_list) time2=time.time() print(res) pool.close() pool.join() print('总共耗时:' + str(time2 - time1) + 's') 

pool.apply_async

case1 : https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py
#!/usr/bin/env python3 # -*- coding: utf-8 -*- from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') 
case2: https://blog.csdn.net/jinping_shi/article/details/52433867 import multiprocessing import time def func(msg): print multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 创建4个进程 for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() # 关闭进程池,表示不能在往进程池中添加进程 pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用 print "Sub-process(es) done." 
case3:https://blog.csdn.net/weixin_42898819/article/details/81811514 from multiprocessing import Process,Pool #导入进程池 import time,os def Foo(i): time.sleep(2) print('到了2s') return i+100 def Bar(arg): print('结果:',arg) if __name__ == '__main__': pool=Pool(processes= 5) #允许进程池同时放入5个进程 for i in range(10): #10个进程都启动 但是一次只能运行5个 #pool.apply(func= Foo,args=(i,)) #串行执行进程,一次执行1个进程 pool.apply_async(func= Foo,args=(i,),callback= Bar) #并行执行进程,一次5个,callback回调 Foo执行完就会执行Bar print('end') pool.close() pool.join() #等待进程池中的进程执行完毕 必须先close() 在join() 

什么时候用进程池Pool

当我们需要的进程数量不多的时候,我们可以使用multiprocessing的Process类来创建进程。但是如果我们需要的进程特别多的时候,手动创建工作量太大了,所以Python也为我们提供了Pool(池)的方式来创建大量进程。

from multiprocessing import Pool import os,time def run(msg): print("开始一个子线程运行了……") time.sleep(1) print("开始一个子线程运行结束了……") if __name__ == "__main__": pool = Pool(3) # 表示初始化一个进程池,最大进程数为5 for x in range(10): pool.apply_async(run, args=("hello pool",)) print("------start----") pool.close() # 关闭池 pool.join() # 等待所有的子进程完成,必须放在close后面 print("-------end------") ''' 注意:一般我们使用apply_async这个方法,表示非阻塞的运行,一旦使用了apply方法表示阻塞式执行任务,此时就是单任务执行了(一般不会使用,特殊场景才会使用) ''' 

Pool.map多参数任务

map的多参数解决办法

#也可以用List将多个参数拼接成一个argList元组,然后多个argList再组合为pool.map要求的可迭代对象 def job(x ,y): return x * y def job1(z): return job(z[0], z[1]) if __name__ == "__main__": pool = multiprocessing.Pool() res = pool.map(job1, [(2, 3), (3, 4)]) print res 
# 将多个输入变量打包到一个参数 x = [1,2,3,4,5,6] y = [1,1,1,1,1,1] x_y = zip(x, y) results = pool.map(work, x_y) #使用pathos包下的multiprocessing 这个包是使用dill的multiprocessing的一个fork,允许多参数输入: from pathos.multiprocessing import ProcessingPoll as Pool pool = Pool(4) results = pool.map(work, x, y) 

Pool.apply_async()有序输出多个迭代结果

在使用apply_async()方法接收多个参数的方法时,在任务方法中正常定义多个参数,参数以元组形式传入即可
但是给apply_async()方法传入多个值获取多个迭代结果时就会报错,因为该方法只能接收一个值,所以可以将该方法放入一个列表生成式中,如下

def job(x): return x * x if __name__ == "__main__": pool multiprocessing.Pool() res = [pool.apply_async(target=job, (i,)) for i in range(3)] print [r.get() for r in res] 

进程池 apply_async, map方法示例

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。就是固定有几个进程可以使用。

进程池中有两个方法:

apply:同步,一般不使用

apply_async:异步

from multiprocessing import Process,Pool import os, time, random def fun1(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': pool = Pool(5) #创建一个5个进程的进程池 for i in range(10): pool.apply_async(func=fun1, args=(i,)) pool.close() pool.join() print('结束测试') 

结果

Run task 0 (37476)... Run task 1 (4044)... Task 0 runs 0.03 seconds. Run task 2 (37476)... Run task 3 (17252)... Run task 4 (16448)... Run task 5 (24804)... Task 2 runs 0.27 seconds. Run task 6 (37476)... Task 1 runs 0.58 seconds. Run task 7 (4044)... Task 3 runs 0.98 seconds. Run task 8 (17252)... Task 5 runs 1.13 seconds. Run task 9 (24804)... Task 6 runs 1.46 seconds. Task 4 runs 2.73 seconds. Task 8 runs 2.18 seconds. Task 7 runs 2.93 seconds. Task 9 runs 2.93 seconds. 结束测试 

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

进程池map方法

因为网上看到这个例子觉得不错,所以这里就不自己写案例,这个案例比较有说服力

import os import PIL from multiprocessing import Pool from PIL import Image SIZE = (75,75) SAVE_DIRECTORY = \'thumbs\' def get_image_paths(folder): return (os.path.join(folder, f) for f in os.listdir(folder) if \'jpeg\' in f) def create_thumbnail(filename): im = Image.open(filename) im.thumbnail(SIZE, Image.ANTIALIAS) base, fname = os.path.split(filename) save_path = os.path.join(base, SAVE_DIRECTORY, fname) im.save(save_path) if __name__ == \'__main__\': folder = os.path.abspath( \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\') os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) images = get_image_paths(folder) pool = Pool() pool.map(creat_thumbnail, images) #关键点,images是一个可迭代对象 pool.close() pool.join() 
复制

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

map在爬虫的领域里也可以使用,比如多个URL的内容爬取,可以把URL放入元祖里,然后传给执行函数。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM