Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。
concurrent.futures基礎模塊是executor和future。
Executor
Executor是一個抽象類,它不能被直接使用。它為具體的異步執行定義了一些基本的方法。
ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
def shutdown(self, wait=True):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
submit()方法
Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future對象代表的就是給定的調用。
通過下面的例子來理解submit對線程池/進程池的操作。
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time
def return_future(msg):
time.sleep(3)
return msg
# 創建一個線程池
pool = ThreadPoolExecutor(max_workers=2)
# 往線程池加入2個task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')
print(f1.done())
time.sleep(3)
print(f2.done())
print(f1.result())
print(f2.result())
改寫為進程池形式很簡單,把ThreadPoolExecutor替換為ProcessPoolExecutor即可。如果需要提交多個task,可以通過循環多次submit()。
map()方法
除了submit,Exectuor還為我們提供了map方法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。可以通過下面的例子來理解:
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor as Pool
import requests
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
def task(url, timeout=10):
return requests.get(url, timeout=timeout)
pool = Pool(max_workers=3)
results = pool.map(task, URLS)
for ret in results:
print('%s, %s' % (ret.url, len(ret.content)))
執行結果
http://www.baidu.com/, 2381 http://www.qq.com/, 252160 http://www.sina.com.cn/, 607265
Future
Future可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
import requests
data = requests.get('http://www.baidu.com').content
print len(data)
Future實例是由Executor.submit()創建的。Future提供了豐富的方法來處理調用。
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
def task(url, timeout=10):
return requests.get(url, timeout=timeout)
with Pool(max_workers=3) as executor:
future_tasks = [executor.submit(task, url) for url in URLS]
for f in future_tasks:
if f.running():
print('%s is running' % str(f))
for f in as_completed(future_tasks):
try:
ret = f.done()
if ret:
f_ret = f.result()
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
except Exception as e:
f.cancel()
print(str(e))
結果
<Future at 0x7fc2716e1f60 state=running> is running <Future at 0x7fc27136d4e0 state=running> is running <Future at 0x7fc27136d710 state=running> is running <Future at 0x7fc27136d710 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x7fc2716e1f60 state=finished returned Response>, done, result: http://www.qq.com/, 252343 <Future at 0x7fc27136d4e0 state=finished returned Response>, done, result: http://www.sina.com.cn/, 602366
從運行結果可以看出,as_completed不是按照URLS列表元素的順序返回的。這也表明,並發訪問不通的url時,沒有阻塞。
wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默認設置為ALL_COMPLETED。
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import wait
import requests
URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
def task(url, timeout=10):
return requests.get(url, timeout=timeout)
with Pool(max_workers=3) as executor:
future_tasks = [executor.submit(task, url) for url in URLS]
for f in future_tasks:
if f.running():
print('%s is running' % str(f))
results = wait(future_tasks)
done = results[0]
for x in done:
print(x)
wait有timeout和return_when兩個參數可以設置。
timeout控制wait()方法返回前等待的時間。
return_when決定方法什么時間點返回:如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成;如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。
轉自
http://blog.csdn.net/dutsoft/article/details/54728706
