concurrent.futures的一些經驗


模塊中重要的類

concurrent.futures模塊中最核心的是面向開發者的 2 個類:

  • ThreadPoolExecutor。顧名思義,創建一個可以提交作業的線程池。
  • ProcessPoolExecutor。以相同的方式工作,它使用多進程而不是多線程作為工作池。

選擇它們的經驗法則如下:

  1. 執行重 I/O 操作的任務 (IO 密集型) 選擇 ThreadPoolExecutor,例如請求網頁數據,文件讀寫等涉及網絡、磁盤 I/O 相關的內容。
  2. 執行重 CPU 的任務 (CPU 密集型) 選擇 ProcessPoolExecutor,例如大量消耗 CPU 的數學與邏輯運算、視頻編解碼等內容

其中ProcessPoolExecutor可以避開 GIL 的問題,但是由於需要傳遞參數給工作進程,所以正常情況下只有可序列化的對象可以執行並返回,看一個會出錯的例子:

from concurrent.futures import ProcessPoolExecutor f = open('1.txt', 'a+') def write(f, line): f.writeline(line) with ProcessPoolExecutor() as executor: future = executor.submit(write, f, 'abc') print(f'RESULT: {future.result()}') 

一眼看去好像也沒什么問題,運行一下:

❯ python unpickled.py
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):  File "/usr/local/lib/python3.7/multiprocessing/queues.py", line 236, in _feed  obj = _ForkingPickler.dumps(obj)  File "/usr/local/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps  cls(buf, protocol).dump(obj) TypeError: cannot serialize '_io.TextIOWrapper' object """ ... 

其實這個錯誤輸出已經提示了,是pickle.dumps時出錯了:

In : pickle.dumps(f) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-34-6d61f1b895e9> in <module> ----> 1 pickle.dumps(f) TypeError: cannot serialize '_io.TextIOWrapper' object 

另外還有PicklingError類型錯誤,造成這個問題通常是對象依賴外部系統狀態,用戶可以自定義類可以通過提供__getstate__和`setstate方法來繞過這些限制,當然也可以封裝或者修改業務邏輯,傳遞可以被正常序列化的參數**。

Executor 的重要方法

ProcessPoolExecutorThreadPoolExecutor類中最重要的 2 個方法如下:

  • submit。提交任務,並返回 Future 對象代表可調用對象的執行。
  • map。和 Python 自帶的 map 函數功能類似,只不過是以異步的方式把函數依次作用在列表的每個元素上。

如果一次性提交一批任務可以使用map,如果單個任務提交用submit:

In : def double(n): ...: return n * 2 ...: In : from concurrent.futures import ThreadPoolExecutor In : with ThreadPoolExecutor() as executor: ...: f1 = executor.submit(double, 10) ...: f2 = executor.submit(double, 20) ...: print(f1.result(), f2.result()) ...: 20 40 In : NUMBERS = [10, 20, 30] In : with ThreadPoolExecutor() as executor: ...: for n, rs in zip(NUMBERS, executor.map(double, NUMBERS)): ...: print(f'{n} * 2 -> {rs}') ...: 10 * 2 -> 20 20 * 2 -> 40 30 * 2 -> 60 

函數

模塊下有 2 個重要函數waitas_completed

wait用來等待指定的Future實例完成,它和asyncio.wait意圖很像,返回值有 2 項,第一項表示完成的任務列表 (done),第二項表示為 完成的任務列表 (not_done):

In : from concurrent.futures import ProcessPoolExecutor, wait In : with ProcessPoolExecutor() as executor: ...: fs = [executor.submit(double, n) for url in (1, 2, 3)] ...: rs = wait(fs) ...: In : rs Out: DoneAndNotDoneFutures(done={<Future at 0x106b58080 state=finished returned int>, <Future at 0x1073e2128 state=finished returned int>, <Future at 0x10690acc0 state=finished returned int>}, not_done=set()) 

concurrent.futures.wait也支持return_when參數,默認是ALL_COMPLETED,表示等全部任務完成,其他可選的還有FIRST_COMPLETEDFIRST_EXCEPTION。改一下看效果:

In : def div(a, b): ...: time.sleep(a + b) ...: return a / b ...: In : from concurrent.futures import FIRST_COMPLETED In : with ProcessPoolExecutor() as executor: ...: fs = [executor.submit(div, *item) for item in ((1, 0), (1, 2), (2, 3))] ...: rs = wait(fs, return_when=FIRST_COMPLETED) ...: ...: In : rs Out: DoneAndNotDoneFutures(done={<Future at 0x106aa4c50 state=finished raised ZeroDivisionError>}, not_done={<Future at 0x1065c1ac8 state=finished returned float>, <Future at 0x1053eb860 state=finished returned float>}) 

as_completed函數返回一個包含指定的 Future 實例的迭代器,這些實例會在完成時被 yield 出來:

In : import random In : def double(n): ...: time.sleep(random.randint(0, 5)) ...: return n * 2 ...: ...: In : with ProcessPoolExecutor() as executor: ...: fs = {executor.submit(double, n): n for n in NUMBERS} ...: for future in as_completed(fs): ...: n = fs[future] ...: print(f'{n} * 2 = {future.result()}') ...: 20 * 2 = 40 10 * 2 = 20 30 * 2 = 60 In : with ProcessPoolExecutor() as executor: ...: fs = {executor.submit(double, n): n for n in NUMBERS} ...: for future in as_completed(fs): ...: n = fs[future] ...: print(f'{n} * 2 = {future.result()}') ...: 10 * 2 = 20 30 * 2 = 60 20 * 2 = 40 

由於執行double函數時有隨機 sleep 的影響,能感受到重復執行任務的完成順序是不一樣的。

正確使用 submit/map

在工作中,我見過無數次這樣寫邏輯的:

In : with ProcessPoolExecutor() as executor: ...: f1 = executor.submit(div, 1, 2) ...: f2 = executor.submit(div, 1, 0) ...: f3 = executor.submit(div, 3, 2) ...: In : f1, f2, f3 Out: (<Future at 0x1071e90f0 state=finished returned float>, <Future at 0x1071e9898 state=finished raised ZeroDivisionError>, <Future at 0x106955198 state=finished returned float>) In : def str2int(s): ...: return int(s) ...: In : NUMBERS = ['1', '3.0', 'abc'] In : with ProcessPoolExecutor() as executor: ...: rs = executor.map(str2int, NUMBERS) ...: In : rs Out: <generator object _chain_from_iterable_of_lists at 0x1068ccc78> 

這樣的寫法的問題是忽略了異常,因為返回的是 Future 對象或者生成器,並沒有調用對應的result方法,如果拋了錯用戶是不知道的,所以通常要需要調用其result方法並且捕捉異常:

In : for f in (f1, f2, f3):
...:     try:
...:         print(f.result())
...:     except Exception as exc:
...:         print(f'Generated an exception: {exc}')
...:
0.5
Generated an exception: division by zero
1.5

map的結果就比較麻煩獲取了:

In : while 1: ...: try: ...: print(next(rs)) ...: except StopIteration: ...: break ...: except Exception as exc: ...: print(f'Generated an exception: {exc}') ...: 1 Generated an exception: invalid literal for int() with base 10: '3.0' 

可以看到第一次錯誤發生后生成器就結束了,所以一批任務中可能會出現異常是不合適用map的,因為list(rs)或者對結果做循環是會由於某個任務拋錯而獲得不了后面的那些任務結果,最好的方式還是submit + as_completed

善用 max_workers

ProcessPoolExecutorThreadPoolExecutor都接受max_workers參數,表示用來執行任務的進程 / 線程數量。ProcessPoolExecutor 的默認值是 CPU 的個數 (通過 < code>os.cpu_count () 獲得),而 ThreadPoolExecutor 的默認值是 CPU 的個數的 5 倍!

對於初學者或者通常情況下是不需要手動設置max_workers參數,默認值是可以足夠好的工作的。但是:

  • 根據不同的業務場景,提高 max_workers 可以加快任務完成。不過要注意,不是值越高越高,超過一定閾值會起到反作用。尤其是在 IO 密集型的任務上使用 ThreadPoolExecutor,不同的 max_workers 差別會很大,但是影響網絡問題因素太多,我這里就不舉例了。
  • 有時候服務器上跑了很多重要服務,不希望某個任務影響到全局,還可以按需把 max_workers 的值設置成小於默認值。

善用 chunksize

Executormap方法支持chunksize參數:

Signature: ProcessPoolExecutor.map(self, fn, *iterables, timeout=None, chunksize=1) Source: def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter).  Args:  fn: A callable that will take as many arguments as there are  passed iterables.  timeout: The maximum number of seconds to wait. If None, then there  is no limit on the wait time.  chunksize: If greater than one, the iterables will be chopped into  chunks of size chunksize and submitted to the process pool.  If set to one, the items in the list will be sent one at a time.  ...  """" 

最早了解這個參數是通過multiprocessing.pool.Poolmap方法,而在concurrent.futures里面chunksize默認值是 1,它相當於任務分塊提交的單位,默認就表示一次從任務列表中發送一個任務。** 如果任務量很大且任務執行周期很短,可以改大chunksize的``。

在之前我寫的書 《Python Web 開發實戰》 里面介紹 Celery 時,我也提到了 Prefetch Limits 方面的配置,其實是一個道理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM