concurrent.futures 模塊使用說明


1. 概述

concurrent.futures 是 3.2 中引入的新模塊,它為異步執行可調用對象提供了高層接口。
可以使用 ThreadPoolExecutor 來進行多線程編程,ProcessPoolExecutor 進行多進程編程,兩者實現了同樣的接口,這些接口由抽象類 Executor 定義。
這個模塊提供了兩大類型,一個是執行器類 Executor,另一個是 Future 類。
執行器用來管理工作池,future 用來管理工作計算出來的結果,通常不用直接操作 future 對象,因為有豐富的 API。

2. Executor Object 執行器對象

concurrent.futures.Executor 類

這個抽象類提供了一系列方法,可以用於異步執行調用。
它不能直接使用,只能通過子類化出來的具體類來使用。

它定義的方法有:

submit(fn, *args, **kwargs)

安排可調用對象 fn 以 fn(*args, **kwargs) 的形式執行,並返回 Future 對象來表示它的執行。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

類似內置函數 map(func, *iterables),但是有兩點不同:

  1. 立即獲取 iterables 而不會惰性獲取;
  2. 異步執行 func,並支持多次並發調用。

它返回一個迭代器。
從調用 Executor.map() 開始的 timeout 秒之后,如果在迭代器上調用了 __next__() 並且無可用結果的話,迭代器會拋出 concurrent.futures.TimeoutError 異常。
timeout 秒數可以是浮點數或者整數,如果設置為 None 或者不指定,則不限制等待時間。

如果 func 調用拋出了異常,那么該異常會在從迭代器獲取值的時候拋出。

當使用 ProcessPoolExecutor 的時候,這個方法會把 iterables 划分成多個塊,作為獨立的任務提交到進程池。這些塊的近似大小可以通過給 chunksize 指定一個正整數。對於很長的 iterables,使用較大的 chunksize 而不是采用默認值 1,可以顯著提高性能。對於 ThreadPoolExecutor,chunksize 不起作用。

注意:不管並發任務的執行次序如何,map 總是基於輸入順序來返回值。map 返回的迭代器,在主程序迭代的時候,會等待每一項的響應。

shutdown(wait=True)

告訴執行器 executor 在當前所有等待的 future 對象運行完畢后,應該釋放執行器用到的所有資源。
在 shutdown 之后再調用 Executor.submit() 和 Executor.map() 會報運行時錯誤 RuntimeError。
如果 wait 為 True,那么這個方法會在所有等待的 future 都執行完畢,並且屬於執行器 executor 的資源都釋放完之后才會返回。
如果 wait 為 False,本方法會立即返回。屬於執行器的資源會在所有等待的 future 執行完畢之后釋放。
不管 wait 取值如何,整個 Python 程序在等待的 future 執行完畢之前不會退出。
你可以通過 with 語句來避免顯式調用本方法。with 語句會用 wait=True 的默認參數調用 Executor.shutdown() 方法。

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

 執行器類 Executor 實現了上下文協議,可以用做上下文管理器。它能並發執行任務,等待它們全部完成。當上下文管理器退出時,自動調用 shutdown() 方法。

3. ThreadPoolExecutor 線程池執行器

ThreadPoolExecutor 線程池執行器是 Executor 執行器的子類,通過線程池來執行異步調用。它管理一組工作線程,當工作線程有富余的時候,給它們傳遞任務。
當屬於一個 Future 對象的可調用對象等待另一個 Future 的返回時,會發生死鎖 deadlock。
舉個例子:

import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor
from concurrent.futures import Future
from multiprocessing import Pool

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# 通過submit函數提交執行的函數到線程池中, submit 是立即返回
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))

# 要獲取已經成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
for future in as_completed(all_task):
    data = future.result()
    print("get {} page".format(data))
# 通過executor的map獲取已經完成的task的值
for data in executor.map(get_html, urls):
    print("get {} page".format(data))

# done方法用於判定某個任務是否完成
print(task1.done())
print(task2.cancel())
time.sleep(3)
print(task1.done())

# result方法可以獲取task的執行結果
print(task1.result())

concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

這個 Executor 子類最多用 max_workers 個線程來異步執行調用。

initializer 是一個可選的可調用對象,會在每個 worker 線程啟動之前調用。
initargs 是傳遞給 initializer 的參數元組。
如果 initializer 拋出了異常,那么當前所有等待的任務都會拋出 BrokenThreadPool 異常,繼續提交 submit 任務也會拋出此異常。

4. ThreadPoolExecutor 例子

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

5. ProcessPoolExecutor 進程池執行器

ProcessPoolExecutor 進程池執行器類是 Executor 執行器類的子類,使用進程池來異步執行調用。
ProcessPoolExecutor 使用了 multiprocessing 模塊,這允許它可以規避 Global Interpreter Lock,但是也意味着只能執行和返回可序列化的(picklable)對象。

__main__ 模塊必須被 worker 子進程導入,這意味着 ProcessPoolExecutor 在交互解釋器中無法工作。

在已經被提交到 ProcessPoolExecutor 中的可調用對象內使用 Executor 或者 Future 方法會導致死鎖。

concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

這個 Executor 子類最多用 max_workers 個進程來異步執行調用。
如果不指定 max_workers 或者為 None,它默認為本機的處理器數量。
如果 max_workers 小於等於 0,會拋出 ValueError 異常。
mp_context 是多進程上下文(multiprocessing context)或者 None,它會被用來啟動 workers。如果不指定 mp_context 或者為 None,會使用默認的多進程上下文環境。

initializer 是一個可選的可調用對象,會在每個 worker 進程啟動之前調用。
initargs 是傳遞給 initializer 的參數元組。
如果 initializer 拋出了異常,那么當前所有等待的任務都會拋出 BrokenProcessPool 異常,繼續提交 submit 任務也會拋出此異常。

7. Future 對象

Future 類封裝了可調用對象的異步執行。
Future 實例通過 Executor.submit() 創建。

concurrent.futures.Future

封裝了可調用對象的異步執行。

  1. Future 實例通過 Executor.submit() 創建,除非用於測試,不應該直接手動創建。
  2. cancel() 嘗試取消調用,如果該調用正在執行中,無法取消,本方法返回 False,其他情況下調用會被取消,並返回 True。
  3. cancelled() 如果調用已經被成功取消,返回 True。
  4. running() 如果調用正在執行,無法被取消,則返回 True。
  5. done() 如果調用成功被取消或者已經執行完畢,返回 True。
  6. result(timeout=None) 返回調用的返回值。如果調用還沒有完成,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutError。timeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。如果 future 在完成之前被取消了,會拋出 CancelledError 異常。
  7. exception(timeout=None)
    返回被調用拋出的異常。如果調用還沒有執行完畢,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutError。timeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。
    如果 future 在完成之前被取消了,會拋出 CancelledError 異常。
    如果調用完成並且沒有拋出異常,返回 None。

  8. add_done_callback(fn)
    為 future 附加可調用對象 fn。當 future 運行完畢或者被取消時,它會被用作 fn 的唯一參數,並調用 fn。
    可調用對象按照添加順序依次調用,並且總是在添加時所處進程的一個線程內調用它。如果該可調用對象拋出了屬於 Exception 子類的異常,它會被記錄並忽略。如果它拋出了屬於 BaseException 子類的異常,該行為未定義。
    如果 future 已經完成或者已經取消,fn 會被立即調用。

8. 模塊函數

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 Future 實例完成,這些實例可能由多個不同的執行器實例創建,通過 fs 指定這些 Future 實例。返回具名元組,該元組有兩個元素,每個元素都是一個集合。第一個元素名叫 done,該集合包括已完成的 futures;第二個元素名叫 not_done,該集合包括未完成的 futures。
timeout 用來控制返回之前等待的最大秒數,可以是整數或者浮點數。如果不指定或為 None,不限制等待時間。
return_when 指明函數何時應該返回。它必須是下列常量之一:

  1. FIRST_COMPLETED:函數在任意一個 future 完成或者被取消時返回。
  2. FIRST_EXCEPTION:函數在任意一個 future 因為異常而結束時返回。如果沒有 future 拋出異常,它等價於 ALL_COMPLETED。
  3. ALL_COMPLETED:當所有 future 完成或者被取消時函數才會返回。

concurrent.futures.as_completed(fs, timeout=None)

當通過 fs 指定的 Future 實例全部執行完畢或者被取消后,返回這些 Future 實例組成的迭代器。fs 中的 Future 實例可以被不同的執行器創建。任何在 as_completed() 調用之前就已經完成的 Future 實例會被最先生成。

查看源碼發現,實際上這是一個用到了 yield from 的生成器函數,所以調用返回一個生成器。

如果從 as_completed() 調用開始,經過 timeout 秒之后,對返回的迭代器調用 __next__() 時結果仍不可用,則會拋出 concurrent.futures.TimeoutError 異常。timeout 可以是整數或者浮點數,如果 timeout 沒有指定或者為 None,則不限制等待時間。

 

import concurrent.futures
import random
import time

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']


class CrawlerFramework(object):
    def a(self, url):
        print(url)
        time.sleep(random.random())
        return "ok"


if __name__ == "__main__":
    crawler_framework = CrawlerFramework()
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        future = executor.submit(crawler_framework.a, URLS[0])  # 單個
        print(future.result())
        future_to_url = {executor.submit(crawler_framework.a, url): url for url in URLS}  # 多個
        future_to_url = executor.map(crawler_framework.a, URLS)  # 多個
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            data = future.result()
            print(data)

  

  

  

  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM