Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。從Python3.2開始,標准庫為我們提供了concurrent.futures
模塊,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類,實現了對threading
和multiprocessing
的更高級的抽象,對編寫線程池/進程池提供了直接的支持。
concurrent.futures基礎模塊是executor和future。
Executor
Executor是一個抽象類,它不能被直接使用。它為具體的異步執行定義了一些基本的方法。
ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。
class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): raise NotImplementedError() def map(self, fn, *iterables, timeout=None): if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() def shutdown(self, wait=True): pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False
submit()方法
Executor中定義了submit()
方法,這個方法的作用是提交一個可執行的回調task
,並返回一個future實例。future對象代表的就是給定的調用。
通過下面的例子來理解submit對線程池/進程池的操作。
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor import time def return_future(msg): time.sleep(3) return msg # 創建一個線程池 pool = ThreadPoolExecutor(max_workers=2) # 往線程池加入2個task f1 = pool.submit(return_future, 'hello') f2 = pool.submit(return_future, 'world') print(f1.done()) time.sleep(3) print(f2.done()) print(f1.result()) print(f2.result())
改寫為進程池形式很簡單,把ThreadPoolExecutor
替換為ProcessPoolExecutor即可。如果需要提交多個task,可以通過循環多次submit()
。
map()方法
除了submit,Exectuor還為我們提供了map方法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。可以通過下面的例子來理解:
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor as Pool import requests URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url, timeout=10): return requests.get(url, timeout=timeout) pool = Pool(max_workers=3) results = pool.map(task, URLS) for ret in results: print('%s, %s' % (ret.url, len(ret.content)))
執行結果
http://www.baidu.com/, 2381 http://www.qq.com/, 252160 http://www.sina.com.cn/, 607265
Future
Future
可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
import requests data = requests.get('http://www.baidu.com').content print len(data)
Future實例是由Executor.submit()
創建的。Future提供了豐富的方法來處理調用。
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ] def task(url, timeout=10): return requests.get(url, timeout=timeout) with Pool(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) for f in as_completed(future_tasks): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cancel() print(str(e))
結果
<Future at 0x7fc2716e1f60 state=running> is running <Future at 0x7fc27136d4e0 state=running> is running <Future at 0x7fc27136d710 state=running> is running <Future at 0x7fc27136d710 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x7fc2716e1f60 state=finished returned Response>, done, result: http://www.qq.com/, 252343 <Future at 0x7fc27136d4e0 state=finished returned Response>, done, result: http://www.sina.com.cn/, 602366
從運行結果可以看出,as_completed不是按照URLS列表元素的順序返回的。這也表明,並發訪問不通的url時,沒有阻塞。
wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默認設置為ALL_COMPLETED。
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import wait import requests URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ] def task(url, timeout=10): return requests.get(url, timeout=timeout) with Pool(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) results = wait(future_tasks) done = results[0] for x in done: print(x)
wait有timeout和return_when兩個參數可以設置。
timeout控制wait()方法返回前等待的時間。
return_when決定方法什么時間點返回:如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成;如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。
轉自
http://blog.csdn.net/dutsoft/article/details/54728706