進程池與線程池
在剛開始學多進程或多線程時,我們迫不及待地基於多進程或多線程實現並發的套接字通信,然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨着並發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,於是我們必須對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制
介紹
官網:https://docs.python.org/dev/library/concurrent.futures.html concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class.
基本方法
1、submit(fn, *args, **kwargs) 異步提交任務 2、map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作 3、shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源后才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前 4、result(timeout=None) 取得結果 5、add_done_callback(fn) 回調函數
線程池用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
進程程池用法
把ThreadPoolExecutor換成ProcessPoolExecutor,其余用法全部相同
回調函數
可以為進程池或線程池內得每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發,並接受任務的返回值當作參數,該函數成為回調函數。
提交任務的兩種方法:
1、同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,在執行下一行代碼,導致程序是串行
2、異步調用:提交完任務后,不用原地等待任務執行完畢

#提交任務的兩種方式 #1、同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行 # # from concurrent.futures import ThreadPoolExecutor # import time # import random # # def la(name): # print('%s is laing' %name) # time.sleep(random.randint(3,5)) # res=random.randint(7,13)*'#' # return {'name':name,'res':res} # # def weigh(shit): # name=shit['name'] # size=len(shit['res']) # print('%s 拉了 《%s》kg' %(name,size)) # # # if __name__ == '__main__': # pool=ThreadPoolExecutor(13) # # shit1=pool.submit(la,'alex').result() # weigh(shit1) # # shit2=pool.submit(la,'wupeiqi').result() # weigh(shit2) # # shit3=pool.submit(la,'yuanhao').result() # weigh(shit3) #2、異步調用:提交完任務后,不地等待任務執行完畢, from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print('%s is laing' %name) time.sleep(random.randint(3,5)) res=random.randint(7,13)*'#' return {'name':name,'res':res} def weigh(shit): shit=shit.result() name=shit['name'] size=len(shit['res']) print('%s 拉了 《%s》kg' %(name,size)) if __name__ == '__main__': pool=ThreadPoolExecutor(13) pool.submit(la,'alex').add_done_callback(weigh) pool.submit(la,'wupeiqi').add_done_callback(weigh) pool.submit(la,'yuanhao').add_done_callback(weigh)

from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print('GET %s' % url) response = requests.get(url) time.sleep(3) return {'url': url, 'content': response.text} def parse(res): res = res.result() print('%s parse res is %s' % (res['url'], len(res['content']))) if __name__ == '__main__': urls=[ 'http://www.cnblogs.com/linhaifeng', 'https://www.python.org', 'https://www.openstack.org', ] pool = ThreadPoolExecutor(2) for url in urls: pool.submit(get, url).add_done_callback(parse)