python中multiprocessing、multiprocessing.dummy和threading用法笔记


一、multiprocessing

用法参考地址:multiprocessing用法
首先解释一个误区:
进程池的大小是每次同时执行的进程数,但是并不会影响主进程申请进程的数量。主进程申请多进程量不等于池子大小。

1、子进程无返回值

# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' pool = Pool(processes=3) for i in xrange(1, 5): msg = 'hello %d' % (i) pool.apply_async(func,(msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。 # pool.imap(func,[msg,]) # 非阻塞, 注意与apply传的参数的区别 # pool.map(func, [msg, ]) # 阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print 'sub-process done' 
  1. 非阻塞方法
    multiprocessing.Pool.apply_async() 和 multiprocessing.Pool.imap()
    进程并发执行
  2. 阻塞方法
    multiprocessing.Pool.apply()和 multiprocessing.Pool.map()
    进程顺序执行

2、子进程有返回值

只有apply_async可以有返回值,apply,map,imap不可以设置返回值.

# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' return msg pool = Pool(processes=3) result = [] for i in xrange(1, 5): msg = 'hello %d' % (i) res = pool.apply_async(func,(msg,)) # 非阻塞 只有apply_async可以有返回值,apply,map,imap不可以设置返回值 result.append(res) print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 for res in result: print "sub_process return: ", res.get() print 'sub-process done' 

一定要注意res.get()方法是堵塞的。只有子进程执行完毕并返回数据时 res.get()方法才会执行,否则主进程堵塞,并等待。
看下面这个程序: 如何高效处理子进程有返回值的多进程任务

from multiprocessing import Pool import Queue import time def test(p): time.sleep(0.5) if p == 100: return (p,True) else: return (p,False) if __name__ == "__main__": pool = Pool(processes=10) q = Queue.Queue() for i in xrange(500): # 将子进程对象存入队列中。 q.put( pool.apply_async(test, args=(i,)) ) # 维持执行的进程总数为10,当一个进程执行完后添加新进程. print(i) ''' 因为这里使用的为pool.apply_async异步方法,因此子进程执行的过程中,父进程会执行while,获取返回值并校验。 ''' print("======", q.qsize()) while 1: a = q.get().get(); print(a) if a[1]: pool.terminate() # 结束进程池中的所有子进程。 break pool.join() 

该程序瞬间执行到 print("======", q.qsize()) 行,并且每次执行 a = q.get().get()代码时,如果对应进程没有执行完,即没有返回输出值时,该行代码导致主进程堵塞等待。

如果需要申请庞大的进程数量时,就会很浪费资源比如下面:

for i in xrange(500000000): # 将子进程对象存入队列中。 q.put( pool.apply_async(test, args=(i,)) ) # 维持执行的进程总数为10,当一个进程执行完后添加新进程. print(i) 

我们可以开启2个线程,一个线程申请进程,另一个线程判断结束所有子进程的进程是否已经到达。
如下:

from multiprocessing import Pool import Queue import threading import time def test(p): time.sleep(0.001) if p == 10000: return True else: return False if __name__ == "__main__": result = Queue.Queue() # 队列 pool = Pool() def pool_th(): for i in xrange(50000000000): ##这里需要创建执行的子进程非常多 try: result.put(pool.apply_async(test, args=(i,))) except: break def result_th(): while 1: a = result.get().get() # 获取子进程返回值 if a: pool.terminate() # 结束所有子进程 break ''' 利用多线程,同时运行Pool函数创建执行子进程,以及运行获取子进程返回值函数。 ''' t1 = threading.Thread(target=pool_th) t2 = threading.Thread(target=result_th) t1.start() t2.start() t1.join() t2.join() pool.join() 

3、多进程共享资源

申请进程有两种方式一种是multiprocessing.Process(),另一种是multiprocessing.Pool(process=3).apply_async().
multiprocessing提供三种多进程之间共享数据的数据结构: Queue, Array 和Manager.

from multiprocessing import Queue, Array, Manager 

Queue、和Array只适用Process类申请的多进程共享资源。
Manager可以适用Pool和Process类申请的多进程共享资源。

import time from multiprocessing import Manager, Pool lists = Manager().list() # 定义可被子进程共享的全局变量lists def func(i): # time.sleep(1) lists.append(i) print i pool = Pool(processes=3) for i in xrange(10000000): if len(lists) <= 0: pool.apply_async(func, args=(i,)) else: break pool.close() pool.join() print(lists) 

输出结果为:且i最大值不定。主进程申请多进程量不等于池子大小。
在这里插入图片描述

二、多线程 Multiprocessing.dummy

1、子进程无返回值

Multiprocessing.dummy.Pool() 与Multiprocessing.Pool() 的用法一样

  1. 非阻塞方法
    multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
    线程并发执行
  2. 阻塞方法
    multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
    线程顺序执行
from multiprocessing.dummy import Pool as Pool import time def func(msg): print('msg:', msg) time.sleep(2) print('end:') pool = Pool(processes=3) for i in range(1, 5): msg = 'hello %d' % (i) pool.apply_async(func, (msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。 # pool.imap(func,[msg,]) # 非阻塞, 注意与apply传的参数的区别 # pool.map(func, [msg, ]) # 阻塞 print('Mark~~~~~~~~~~~~~~~') pool.close() pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print('sub-process done') 

2、子进程有返回值

与多进程一样,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以设置返回值.
3、多进程共享资源

三、多线程 Threading

1、创建方法

  1. 直接使用Thread类
from threading import Thread import time def run(a = None, b = None) : print a, b time.sleep(1) t = Thread(target = run, args = ("this is a", "thread")) #此时线程是新建状态 print t.getName()#获得线程对象名称 print t.isAlive()#判断线程是否还活着。 t.start()#启动线程 t.join()#等待其他线程运行结束 
  1. 继承Thread类
from threading import Thread import time class MyThread(Thread) : def __init__(self, a) : super(MyThread, self).__init__() #调用父类的构造方法 self.a = a def run(self) : print "sleep :", self.a time.sleep(self.a) t1 = MyThread(2) t2 = MyThread(4) t1.start() t2.start() t1.join() t2.join()
 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM