模塊中重要的類
concurrent.futures
模塊中最核心的是面向開發者的 2 個類:
- ThreadPoolExecutor。顧名思義,創建一個可以提交作業的線程池。
- ProcessPoolExecutor。以相同的方式工作,它使用多進程而不是多線程作為工作池。
選擇它們的經驗法則如下:
- 執行重 I/O 操作的任務 (IO 密集型) 選擇 ThreadPoolExecutor,例如請求網頁數據,文件讀寫等涉及網絡、磁盤 I/O 相關的內容。
- 執行重 CPU 的任務 (CPU 密集型) 選擇 ProcessPoolExecutor,例如大量消耗 CPU 的數學與邏輯運算、視頻編解碼等內容
其中ProcessPoolExecutor
可以避開 GIL 的問題,但是由於需要傳遞參數給工作進程,所以正常情況下只有可序列化的對象可以執行並返回,看一個會出錯的例子:
from concurrent.futures import ProcessPoolExecutor f = open('1.txt', 'a+') def write(f, line): f.writeline(line) with ProcessPoolExecutor() as executor: future = executor.submit(write, f, 'abc') print(f'RESULT: {future.result()}') |
一眼看去好像也沒什么問題,運行一下:
❯ python unpickled.py
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last): File "/usr/local/lib/python3.7/multiprocessing/queues.py", line 236, in _feed obj = _ForkingPickler.dumps(obj) File "/usr/local/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: cannot serialize '_io.TextIOWrapper' object """ ... |
其實這個錯誤輸出已經提示了,是pickle.dumps
時出錯了:
In : pickle.dumps(f) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-34-6d61f1b895e9> in <module> ----> 1 pickle.dumps(f) TypeError: cannot serialize '_io.TextIOWrapper' object |
另外還有PicklingError
類型錯誤,造成這個問題通常是對象依賴外部系統狀態,用戶可以自定義類可以通過提供__getstate__
和`setstate方法來繞過這些限制,當然也可以封裝或者修改業務邏輯,傳遞可以被正常序列化的參數**。
Executor 的重要方法
ProcessPoolExecutor
和ThreadPoolExecutor
類中最重要的 2 個方法如下:
- submit。提交任務,並返回 Future 對象代表可調用對象的執行。
- map。和 Python 自帶的 map 函數功能類似,只不過是以異步的方式把函數依次作用在列表的每個元素上。
如果一次性提交一批任務可以使用map
,如果單個任務提交用submit
:
In : def double(n): ...: return n * 2 ...: In : from concurrent.futures import ThreadPoolExecutor In : with ThreadPoolExecutor() as executor: ...: f1 = executor.submit(double, 10) ...: f2 = executor.submit(double, 20) ...: print(f1.result(), f2.result()) ...: 20 40 In : NUMBERS = [10, 20, 30] In : with ThreadPoolExecutor() as executor: ...: for n, rs in zip(NUMBERS, executor.map(double, NUMBERS)): ...: print(f'{n} * 2 -> {rs}') ...: 10 * 2 -> 20 20 * 2 -> 40 30 * 2 -> 60 |
函數
模塊下有 2 個重要函數wait
和as_completed
。
wait
用來等待指定的Future
實例完成,它和asyncio.wait
意圖很像,返回值有 2 項,第一項表示完成的任務列表 (done),第二項表示為 完成的任務列表 (not_done):
In : from concurrent.futures import ProcessPoolExecutor, wait In : with ProcessPoolExecutor() as executor: ...: fs = [executor.submit(double, n) for url in (1, 2, 3)] ...: rs = wait(fs) ...: In : rs Out: DoneAndNotDoneFutures(done={<Future at 0x106b58080 state=finished returned int>, <Future at 0x1073e2128 state=finished returned int>, <Future at 0x10690acc0 state=finished returned int>}, not_done=set()) |
concurrent.futures.wait
也支持return_when
參數,默認是ALL_COMPLETED
,表示等全部任務完成,其他可選的還有FIRST_COMPLETED
、FIRST_EXCEPTION
。改一下看效果:
In : def div(a, b): ...: time.sleep(a + b) ...: return a / b ...: In : from concurrent.futures import FIRST_COMPLETED In : with ProcessPoolExecutor() as executor: ...: fs = [executor.submit(div, *item) for item in ((1, 0), (1, 2), (2, 3))] ...: rs = wait(fs, return_when=FIRST_COMPLETED) ...: ...: In : rs Out: DoneAndNotDoneFutures(done={<Future at 0x106aa4c50 state=finished raised ZeroDivisionError>}, not_done={<Future at 0x1065c1ac8 state=finished returned float>, <Future at 0x1053eb860 state=finished returned float>}) |
as_completed
函數返回一個包含指定的 Future 實例的迭代器,這些實例會在完成時被 yield 出來:
In : import random In : def double(n): ...: time.sleep(random.randint(0, 5)) ...: return n * 2 ...: ...: In : with ProcessPoolExecutor() as executor: ...: fs = {executor.submit(double, n): n for n in NUMBERS} ...: for future in as_completed(fs): ...: n = fs[future] ...: print(f'{n} * 2 = {future.result()}') ...: 20 * 2 = 40 10 * 2 = 20 30 * 2 = 60 In : with ProcessPoolExecutor() as executor: ...: fs = {executor.submit(double, n): n for n in NUMBERS} ...: for future in as_completed(fs): ...: n = fs[future] ...: print(f'{n} * 2 = {future.result()}') ...: 10 * 2 = 20 30 * 2 = 60 20 * 2 = 40 |
由於執行double
函數時有隨機 sleep 的影響,能感受到重復執行任務的完成順序是不一樣的。
正確使用 submit/map
在工作中,我見過無數次這樣寫邏輯的:
In : with ProcessPoolExecutor() as executor: ...: f1 = executor.submit(div, 1, 2) ...: f2 = executor.submit(div, 1, 0) ...: f3 = executor.submit(div, 3, 2) ...: In : f1, f2, f3 Out: (<Future at 0x1071e90f0 state=finished returned float>, <Future at 0x1071e9898 state=finished raised ZeroDivisionError>, <Future at 0x106955198 state=finished returned float>) In : def str2int(s): ...: return int(s) ...: In : NUMBERS = ['1', '3.0', 'abc'] In : with ProcessPoolExecutor() as executor: ...: rs = executor.map(str2int, NUMBERS) ...: In : rs Out: <generator object _chain_from_iterable_of_lists at 0x1068ccc78> |
這樣的寫法的問題是忽略了異常,因為返回的是 Future 對象或者生成器,並沒有調用對應的result
方法,如果拋了錯用戶是不知道的,所以通常要需要調用其result
方法並且捕捉異常:
In : for f in (f1, f2, f3):
...: try:
...: print(f.result())
...: except Exception as exc:
...: print(f'Generated an exception: {exc}')
...:
0.5
Generated an exception: division by zero
1.5
而map
的結果就比較麻煩獲取了:
In : while 1: ...: try: ...: print(next(rs)) ...: except StopIteration: ...: break ...: except Exception as exc: ...: print(f'Generated an exception: {exc}') ...: 1 Generated an exception: invalid literal for int() with base 10: '3.0' |
可以看到第一次錯誤發生后生成器就結束了,所以一批任務中可能會出現異常是不合適用map
的,因為list(rs)
或者對結果做循環是會由於某個任務拋錯而獲得不了后面的那些任務結果,最好的方式還是submit + as_completed
。
善用 max_workers
ProcessPoolExecutor
和ThreadPoolExecutor
都接受max_workers
參數,表示用來執行任務的進程 / 線程數量。ProcessPoolExecutor
的默認值是 CPU 的個數 (通過 < code>os.cpu_count () 獲得),而 ThreadPoolExecutor
的默認值是 CPU 的個數的 5 倍!
對於初學者或者通常情況下是不需要手動設置max_workers
參數,默認值是可以足夠好的工作的。但是:
- 根據不同的業務場景,提高
max_workers
可以加快任務完成。不過要注意,不是值越高越高,超過一定閾值會起到反作用。尤其是在 IO 密集型的任務上使用ThreadPoolExecutor
,不同的max_workers
差別會很大,但是影響網絡問題因素太多,我這里就不舉例了。 - 有時候服務器上跑了很多重要服務,不希望某個任務影響到全局,還可以按需把
max_workers
的值設置成小於默認值。
善用 chunksize
Executor
的map
方法支持chunksize
參數:
Signature: ProcessPoolExecutor.map(self, fn, *iterables, timeout=None, chunksize=1) Source: def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. ... """" |
最早了解這個參數是通過multiprocessing.pool.Pool
的map
方法,而在concurrent.futures
里面chunksize
默認值是 1,它相當於任務分塊提交的單位,默認就表示一次從任務列表中發送一個任務。** 如果任務量很大且任務執行周期很短,可以改大chunksize
的``。
在之前我寫的書 《Python Web 開發實戰》 里面介紹 Celery 時,我也提到了 Prefetch Limits 方面的配置,其實是一個道理。