1. 概述
concurrent.futures 是 3.2 中引入的新模塊,它為異步執行可調用對象提供了高層接口。
可以使用 ThreadPoolExecutor 來進行多線程編程,ProcessPoolExecutor 進行多進程編程,兩者實現了同樣的接口,這些接口由抽象類 Executor 定義。
這個模塊提供了兩大類型,一個是執行器類 Executor,另一個是 Future 類。
執行器用來管理工作池,future 用來管理工作計算出來的結果,通常不用直接操作 future 對象,因為有豐富的 API。
2. Executor Object 執行器對象
concurrent.futures.Executor 類
這個抽象類提供了一系列方法,可以用於異步執行調用。
它不能直接使用,只能通過子類化出來的具體類來使用。
它定義的方法有:
submit(fn, *args, **kwargs)
安排可調用對象 fn 以 fn(*args, **kwargs) 的形式執行,並返回 Future 對象來表示它的執行。
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
map(func, *iterables, timeout=None, chunksize=1)
類似內置函數 map(func, *iterables)
,但是有兩點不同:
- 立即獲取
iterables
而不會惰性獲取; - 異步執行
func
,並支持多次並發調用。
它返回一個迭代器。
從調用 Executor.map() 開始的 timeout 秒之后,如果在迭代器上調用了 __next__() 並且無可用結果的話,迭代器會拋出 concurrent.futures.TimeoutError 異常。
timeout 秒數可以是浮點數或者整數,如果設置為 None 或者不指定,則不限制等待時間。
如果 func 調用拋出了異常,那么該異常會在從迭代器獲取值的時候拋出。
當使用 ProcessPoolExecutor 的時候,這個方法會把 iterables 划分成多個塊,作為獨立的任務提交到進程池。這些塊的近似大小可以通過給 chunksize 指定一個正整數。對於很長的 iterables,使用較大的 chunksize 而不是采用默認值 1,可以顯著提高性能。對於 ThreadPoolExecutor,chunksize 不起作用。
注意:不管並發任務的執行次序如何,map
總是基於輸入順序來返回值。map
返回的迭代器,在主程序迭代的時候,會等待每一項的響應。
shutdown(wait=True)
告訴執行器 executor 在當前所有等待的 future 對象運行完畢后,應該釋放執行器用到的所有資源。
在 shutdown 之后再調用 Executor.submit() 和 Executor.map() 會報運行時錯誤 RuntimeError。
如果 wait 為 True,那么這個方法會在所有等待的 future 都執行完畢,並且屬於執行器 executor 的資源都釋放完之后才會返回。
如果 wait 為 False,本方法會立即返回。屬於執行器的資源會在所有等待的 future 執行完畢之后釋放。
不管 wait 取值如何,整個 Python 程序在等待的 future 執行完畢之前不會退出。
你可以通過 with 語句來避免顯式調用本方法。with 語句會用 wait=True 的默認參數調用 Executor.shutdown() 方法。
import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
執行器類 Executor
實現了上下文協議,可以用做上下文管理器。它能並發執行任務,等待它們全部完成。當上下文管理器退出時,自動調用 shutdown()
方法。
3. ThreadPoolExecutor 線程池執行器
ThreadPoolExecutor 線程池執行器是 Executor 執行器的子類,通過線程池來執行異步調用。它管理一組工作線程,當工作線程有富余的時候,給它們傳遞任務。
當屬於一個 Future 對象的可調用對象等待另一個 Future 的返回時,會發生死鎖 deadlock。
舉個例子:
import time from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor from concurrent.futures import Future from multiprocessing import Pool def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通過submit函數提交執行的函數到線程池中, submit 是立即返回 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # 要獲取已經成功的task的返回 urls = [3, 2, 4] all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=FIRST_COMPLETED) print("main") for future in as_completed(all_task): data = future.result() print("get {} page".format(data)) # 通過executor的map獲取已經完成的task的值 for data in executor.map(get_html, urls): print("get {} page".format(data)) # done方法用於判定某個任務是否完成 print(task1.done()) print(task2.cancel()) time.sleep(3) print(task1.done()) # result方法可以獲取task的執行結果 print(task1.result())
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
這個 Executor 子類最多用 max_workers 個線程來異步執行調用。
initializer 是一個可選的可調用對象,會在每個 worker 線程啟動之前調用。
initargs 是傳遞給 initializer 的參數元組。
如果 initializer 拋出了異常,那么當前所有等待的任務都會拋出 BrokenThreadPool 異常,繼續提交 submit 任務也會拋出此異常。
4. ThreadPoolExecutor 例子
import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
5. ProcessPoolExecutor 進程池執行器
ProcessPoolExecutor 進程池執行器類是 Executor 執行器類的子類,使用進程池來異步執行調用。
ProcessPoolExecutor 使用了 multiprocessing 模塊,這允許它可以規避 Global Interpreter Lock,但是也意味着只能執行和返回可序列化的(picklable)對象。
__main__ 模塊必須被 worker 子進程導入,這意味着 ProcessPoolExecutor 在交互解釋器中無法工作。
在已經被提交到 ProcessPoolExecutor 中的可調用對象內使用 Executor 或者 Future 方法會導致死鎖。
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
這個 Executor 子類最多用 max_workers 個進程來異步執行調用。
如果不指定 max_workers 或者為 None,它默認為本機的處理器數量。
如果 max_workers 小於等於 0,會拋出 ValueError 異常。
mp_context 是多進程上下文(multiprocessing context)或者 None,它會被用來啟動 workers。如果不指定 mp_context 或者為 None,會使用默認的多進程上下文環境。
initializer 是一個可選的可調用對象,會在每個 worker 進程啟動之前調用。
initargs 是傳遞給 initializer 的參數元組。
如果 initializer 拋出了異常,那么當前所有等待的任務都會拋出 BrokenProcessPool 異常,繼續提交 submit 任務也會拋出此異常。
7. Future 對象
Future 類封裝了可調用對象的異步執行。
Future 實例通過 Executor.submit() 創建。
concurrent.futures.Future
封裝了可調用對象的異步執行。
- Future 實例通過 Executor.submit() 創建,除非用於測試,不應該直接手動創建。
- cancel() 嘗試取消調用,如果該調用正在執行中,無法取消,本方法返回 False,其他情況下調用會被取消,並返回 True。
- cancelled() 如果調用已經被成功取消,返回 True。
- running() 如果調用正在執行,無法被取消,則返回 True。
- done() 如果調用成功被取消或者已經執行完畢,返回 True。
- result(timeout=None) 返回調用的返回值。如果調用還沒有完成,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutError。timeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。如果 future 在完成之前被取消了,會拋出 CancelledError 異常。
-
exception(timeout=None)
返回被調用拋出的異常。如果調用還沒有執行完畢,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutError。timeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。
如果 future 在完成之前被取消了,會拋出 CancelledError 異常。
如果調用完成並且沒有拋出異常,返回 None。 -
add_done_callback(fn)
為 future 附加可調用對象 fn。當 future 運行完畢或者被取消時,它會被用作 fn 的唯一參數,並調用 fn。
可調用對象按照添加順序依次調用,並且總是在添加時所處進程的一個線程內調用它。如果該可調用對象拋出了屬於 Exception 子類的異常,它會被記錄並忽略。如果它拋出了屬於 BaseException 子類的異常,該行為未定義。
如果 future 已經完成或者已經取消,fn 會被立即調用。
8. 模塊函數
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 Future 實例完成,這些實例可能由多個不同的執行器實例創建,通過 fs 指定這些 Future 實例。返回具名元組,該元組有兩個元素,每個元素都是一個集合。第一個元素名叫 done,該集合包括已完成的 futures;第二個元素名叫 not_done,該集合包括未完成的 futures。
timeout 用來控制返回之前等待的最大秒數,可以是整數或者浮點數。如果不指定或為 None,不限制等待時間。
return_when 指明函數何時應該返回。它必須是下列常量之一:
- FIRST_COMPLETED:函數在任意一個 future 完成或者被取消時返回。
- FIRST_EXCEPTION:函數在任意一個 future 因為異常而結束時返回。如果沒有 future 拋出異常,它等價於 ALL_COMPLETED。
- ALL_COMPLETED:當所有 future 完成或者被取消時函數才會返回。
concurrent.futures.as_completed(fs, timeout=None)
當通過 fs 指定的 Future 實例全部執行完畢或者被取消后,返回這些 Future 實例組成的迭代器。fs 中的 Future 實例可以被不同的執行器創建。任何在 as_completed() 調用之前就已經完成的 Future 實例會被最先生成。
查看源碼發現,實際上這是一個用到了 yield from 的生成器函數,所以調用返回一個生成器。
如果從 as_completed() 調用開始,經過 timeout 秒之后,對返回的迭代器調用 __next__() 時結果仍不可用,則會拋出 concurrent.futures.TimeoutError 異常。timeout 可以是整數或者浮點數,如果 timeout 沒有指定或者為 None,則不限制等待時間。
import concurrent.futures import random import time URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] class CrawlerFramework(object): def a(self, url): print(url) time.sleep(random.random()) return "ok" if __name__ == "__main__": crawler_framework = CrawlerFramework() with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor: future = executor.submit(crawler_framework.a, URLS[0]) # 單個 print(future.result()) future_to_url = {executor.submit(crawler_framework.a, url): url for url in URLS} # 多個 future_to_url = executor.map(crawler_framework.a, URLS) # 多個 for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] data = future.result() print(data)