一、基類Executor
Executor類是ThreadPoolExecutor 和ProcessPoolExecutor 的基類。它為我們提供了如下方法:
submit(fn, *args, **kwargs):提交任務。以 fn(*args **kwargs) 方式執行並返回 Future 對像。
fn:函數地址。
*args:位置參數。
**kwargs:關鍵字參數。
map(func, *iterables, timeout=None, chunksize=1):
func:函數地址。
iterables:一個可迭代對象,以迭代的方式將參數傳遞給函數。
timeout:這個參數沒弄明白,如果是None等待所有進程結束。
chunksize:使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊,並作為獨立的任務提交到執行池中。這些塊的數量可以由 chunksize 指定設置。 對很長的迭代器來說,設置chunksize 值比默認值 1 能顯著地提高性能。 chunksize 對 ThreadPoolExecutor 沒有效果。
shutdown(wait=True):如果為True會等待線程池或進程池執行完成后釋放正在使用的資源。如果 wait 為 False,將立即返回,所有待執行的期程完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的期程完成執行后才退出。
二、線程池對象
ThreadPoolExecutor 是 Executor 的子類,下面介紹ThreadPoolExecutor 的參數。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
max_workers:線程池的數量。
thread_name_prefix:線程名前綴。默認線程名ThreadPoolExecutor-線程數。
initializer:一個函數或方法,在啟用線程前會調用這個函數(給線程池添加額外任務)。
initargs :以元祖的方式給initializer中的函數傳遞參數。
這里需要說明的是除了max_workers這個參數外其它三個參數基本很少用。max_workers很好理解就是線程池的數量。
下面來說initializer和initargs 這兩個奇怪的家伙。
示例一:
from concurrent.futures import ThreadPoolExecutor def work(): print('工作線程') def test(num): print('test:',num) executor = ThreadPoolExecutor(max_workers=2,initializer=test(7)) # 開啟2個線程 initializer指定參數test(7) executor.submit(work) executor.submit(work) # 打印內容如下 test: 7 工作線程 工作線程
示例二:
from concurrent.futures import ThreadPoolExecutor def work(): print('工作線程') def test(num): print('test:',num) executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 這里我們使用initargs=(7,)的方式給test傳遞參數。 executor.submit(work) executor.submit(work) # 打印內容如下 test: 7 工作線程 工作線程 test: 7
通過示例一和示例二我們可以發現initializer=test(7)時,test函數只被調用了1次,當initializer=test,initargs=(7,)時,test被調用了2次。具體原因沒有去分析。感覺沒什么用。以后有時間看看源碼在補上。
三、進程池對象
ProcessPoolExecutor 也是 Executor 的子類,下面是ProcessPoolExecutor 參數介紹:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
max_workers:工作進程數。如果 max_workers 為 None 或未給出,它將默認為機器的處理器個數。 如果 max_workers 小於等於 0,則將引發 ValueError。 在 Windows 上,max_workers 必須小於等於 61,否則將引發 ValueError。 如果 max_workers 為 None,則所選擇的默認最多為 61,即使存在更多處理器。
mp_context :可以是一個多進程上下文或是 None。 它將被用來啟動工作進程。 如果 mp_context 為 None 或未給出,將使用默認的多進程上下文。
initializer:一個函數或方法,在啟用線程前會調用這個函數。
initargs :以元祖的方式給initializer中的函數傳遞參數。
關於說initializer和initargs 與ThreadPoolExecutor 類似這里不多說了。
四、創建線程池
from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 創建線程池,數量為5 for i in range(5): executor.submit(work, i) print('主線程') # 打印內容如下 主線程 工作線程: 0 工作線程: 1 工作線程: 2 工作線程: 3 工作線程: 4
# 使用shutdown等待所有線程結束后在打印主線程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 創建線程池,數量為5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待線程池結束 print('主線程') # 打印內容如下 工作線程: 0 工作線程: 1 工作線程: 2 工作線程: 3 工作線程: 4 主線程
如果想要在線程執行的過程中添加額外的功能,可以使用initializer參數,如下:
from concurrent.futures import ThreadPoolExecutor def work(num): print('工作線程:',num) def test(num): print('額外任務:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) print('主線程') # 打印內容如下 額外任務: 7 工作線程: 0 額外任務: 7 工作線程: 1 額外任務: 7 工作線程: 2 額外任務: 7 工作線程: 3 額外任務: 7 工作線程: 4 主線程
五、進程池
進程池與線程池用法基本一致,只是名字和實現不一樣而已。
from concurrent.futures import ProcessPoolExecutor import time def work(num): time.sleep(1) print('工作進程:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5) # 創建進程池,數量為5 for i in range(5): executor.submit(work, i) print('主線程') # 打印內容如下 主線程 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 # 使用shutdown等待所有線程結束后在打印主線程 from concurrent.futures import ProcessPoolExecutor import time def work(num): time.sleep(1) print('工作進程:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5) # 創建進程池,數量為5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待進程池結束 print('主線程') # 打印內容如下 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 主線程
如果想要在線程執行的過程中添加額外的功能,可以使用initializer參數,如下:
from concurrent.futures import ProcessPoolExecutor def work(num): print('工作進程:',num) def test(num): print('額外任務:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) print('主線程') # 打印內容如下 額外任務: 7 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 額外任務: 7 額外任務: 7 額外任務: 7 額外任務: 7 主線程
六、Future Objects
future類封裝了可調用文件的異步執行。future的實例由executor.submit()時被創建的,除了測試之外不應該直接實例化future對象,所以為了獲取future對象我們可以f=executor.submit()即可。
class concurrent.futures.Future類中的方法:
cancel():嘗試取消執行線程池中的函數調用。如果調用當前正在執行或已完成運行,並且無法取消,則方法將返回false,否則調用將被取消,方法將返回true。
cancelled():如果線程池中的函數執行成功返回True,調用失敗返回false。
running():如果線程池中的調用當前正在執行且無法取消,則返回true。
done():如果呼叫成功取消或完成運行,則返回true。否則返回false
result(timeout=None):返回線程函數的返回值。如果線程函數未執行完成,則此方法將最多等待timeout秒,如果線程函數未在超時秒內完成,則將引發concurrent.futures.TimeoutError。超時可以是int或float。如果未指定超時 timeout=None,則會阻塞,一直等待函數執行完成。如果在線程函數完成之前使用future對象取消了執行,則將引發CancelederRor。如果調用raised,此方法將引發相同的異常。
exception(timeout=None):返回線程函數引發的異常。如果線程函數尚未完成,則此方法將最多等待timeout秒。如果線程函數未在超時秒內完成,則將引發concurrent.futures.TimeoutError。超時可以是int或float。如果未指定超時或無超時timeout=None,則會一直等待。如果在線程函數完成之前使用future對象取消了執行,則將引發CancelederRor如果線程函數完成但未引發,則返回None。
add_done_callback(fn):將可調用fn附加到future對象。當future對象被取消或結束運行時,將調用fn,其中future對象是惟一的參數。添加的可調用對象是按照添加順序調用的,並且總是在屬於添加它們的進程的線程中調用。如果Callable引發異常子類,它將被記錄並忽略。如果可調用引發BaseException子類,則行為未定義。
七、Module Functions
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):將fs綁定一個future實例,如果future執行完成或取消執行fs函數。
fs:fs是一個函數綁定在future實例(可能由不同的執行器實例創建)。返回2個命名元組的集合。第一組名為“done”,包含等待完成,完成前(完成或future對象取消)。第二組名為“not_done”,包含未完成的future(未完成或正在運行的future)。
timeout:如果為None一直等待,否則會等待timeout秒。
return_when :必須是如下范圍。
Constant |
Description |
---|---|
|
當任何future 完成或取消或者線程函數執行完成時。 |
|
當future通過引發異常而結束時,線程函數將返回。如果沒有future引發異常,那么它相當於所有已完成的。 |
|
當所有future完成或取消時,函數將返回。 |
concurrent.futures.as_completed(fs, timeout=None):返回一個future迭代器。
fs:可迭代對象的future。
timeout:超時時間,如果為None會一直阻塞直到執行完成。否則將等待timeout秒。
from concurrent.futures._base import as_completed from concurrent.futures import ThreadPoolExecutor def work(num): return num ** 2 if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) future_list = [] # 存放future對象 for i in range(5): future_list.append(executor.submit(work, i)) for future in as_completed(future_list): # 這是一個無聊的用法 res = future.result() print(f'結果:{res}') # 打印工作線程返回的結果
# 打印結果如下
結果:0
結果:4
結果:16
結果:1
結果:9
參考文檔:https://docs.python.org/3/library/concurrent.futures.html