concurrent.futures模塊簡單介紹(線程池,進程池)


一、基類Executor

Executor類是ThreadPoolExecutor 和ProcessPoolExecutor 的基類。它為我們提供了如下方法:

submit(fn, *args, **kwargs):提交任務。以 fn(*args **kwargs) 方式執行並返回 Future 對像。

fn:函數地址。

*args:位置參數。

**kwargs:關鍵字參數。

map(func, *iterables, timeout=None, chunksize=1):

func:函數地址。

iterables:一個可迭代對象,以迭代的方式將參數傳遞給函數。

timeout:這個參數沒弄明白,如果是None等待所有進程結束。

chunksize:使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊,並作為獨立的任務提交到執行池中。這些塊的數量可以由 chunksize 指定設置。 對很長的迭代器來說,設置chunksize 值比默認值 1 能顯著地提高性能。 chunksize 對 ThreadPoolExecutor 沒有效果。

shutdown(wait=True):如果為True會等待線程池或進程池執行完成后釋放正在使用的資源。如果 wait 為 False,將立即返回,所有待執行的期程完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的期程完成執行后才退出。

二、線程池對象

ThreadPoolExecutor 是 Executor 的子類,下面介紹ThreadPoolExecutor 的參數。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):

max_workers:線程池的數量。

thread_name_prefix:線程名前綴。默認線程名ThreadPoolExecutor-線程數。

initializer:一個函數或方法,在啟用線程前會調用這個函數(給線程池添加額外任務)

initargs :以元祖的方式給initializer中的函數傳遞參數。

這里需要說明的是除了max_workers這個參數外其它三個參數基本很少用。max_workers很好理解就是線程池的數量。

下面來說initializer和initargs 這兩個奇怪的家伙。

示例一:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工作線程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test(7))  # 開啟2個線程  initializer指定參數test(7)
executor.submit(work)  
executor.submit(work)

# 打印內容如下
test: 7
工作線程
工作線程

示例二:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工作線程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 這里我們使用initargs=(7,)的方式給test傳遞參數。
executor.submit(work)
executor.submit(work)

# 打印內容如下
test: 7
工作線程
工作線程
test: 7

通過示例一和示例二我們可以發現initializer=test(7)時,test函數只被調用了1次,當initializer=test,initargs=(7,)時,test被調用了2次。具體原因沒有去分析。感覺沒什么用。以后有時間看看源碼在補上。

三、進程池對象

ProcessPoolExecutor 也是 Executor 的子類,下面是ProcessPoolExecutor 參數介紹:

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

max_workers:工作進程數。如果 max_workers 為 None 或未給出,它將默認為機器的處理器個數。 如果 max_workers 小於等於 0,則將引發 ValueError。 在 Windows 上,max_workers 必須小於等於 61,否則將引發 ValueError。 如果 max_workers 為 None,則所選擇的默認最多為 61,即使存在更多處理器。

mp_context :可以是一個多進程上下文或是 None。 它將被用來啟動工作進程。 如果 mp_context 為 None 或未給出,將使用默認的多進程上下文。

initializer:一個函數或方法,在啟用線程前會調用這個函數。

initargs :以元祖的方式給initializer中的函數傳遞參數。

關於說initializer和initargs 與ThreadPoolExecutor 類似這里不多說了。


 

四、創建線程池

from concurrent.futures import ThreadPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工作線程:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)  # 創建線程池,數量為5
    for i in range(5):
        executor.submit(work, i)
    print('主線程')

# 打印內容如下
主線程
工作線程:   0
工作線程:   1
工作線程:   2
工作線程:   3
工作線程:   4
# 使用shutdown等待所有線程結束后在打印主線程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 創建線程池,數量為5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待線程池結束 print('主線程') # 打印內容如下 工作線程: 0 工作線程: 1 工作線程: 2 工作線程: 3 工作線程: 4 主線程

如果想要在線程執行的過程中添加額外的功能,可以使用initializer參數,如下:

from concurrent.futures import ThreadPoolExecutor

def work(num):
    print('工作線程:',num)
def test(num):
    print('額外任務:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主線程')

# 打印內容如下
額外任務: 7
工作線程: 0
額外任務: 7
工作線程: 1
額外任務: 7
工作線程: 2 
額外任務: 7
工作線程: 3 
額外任務: 7
工作線程: 4 
主線程

五、進程池

進程池與線程池用法基本一致,只是名字和實現不一樣而已。

from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工作進程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 創建進程池,數量為5
    for i in range(5):
        executor.submit(work, i)
    print('主線程')

# 打印內容如下
主線程
工作進程: 0
工作進程: 1
工作進程: 2
工作進程: 3
工作進程: 4

# 使用shutdown等待所有線程結束后在打印主線程
from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工作進程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 創建進程池,數量為5
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)  # 等待進程池結束
    print('主線程')
# 打印內容如下
工作進程: 0
工作進程: 1
工作進程: 2
工作進程: 3
工作進程: 4
主線程

如果想要在線程執行的過程中添加額外的功能,可以使用initializer參數,如下:

from concurrent.futures import ProcessPoolExecutor

def work(num):
    print('工作進程:',num)
def test(num):
    print('額外任務:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主線程')

# 打印內容如下
額外任務: 7
工作進程: 0
工作進程: 1
工作進程: 2
工作進程: 3
工作進程: 4
額外任務: 7
額外任務: 7
額外任務: 7
額外任務: 7
主線程

 


 

六、Future Objects

future類封裝了可調用文件的異步執行。future的實例由executor.submit()時被創建的,除了測試之外不應該直接實例化future對象,所以為了獲取future對象我們可以f=executor.submit()即可。

class concurrent.futures.Future類中的方法:

cancel():嘗試取消執行線程池中的函數調用。如果調用當前正在執行或已完成運行,並且無法取消,則方法將返回false,否則調用將被取消,方法將返回true。

cancelled():如果線程池中的函數執行成功返回True,調用失敗返回false。

running():如果線程池中的調用當前正在執行且無法取消,則返回true。

done():如果呼叫成功取消或完成運行,則返回true。否則返回false

result(timeout=None):返回線程函數的返回值。如果線程函數未執行完成,則此方法將最多等待timeout秒,如果線程函數未在超時秒內完成,則將引發concurrent.futures.TimeoutError。超時可以是int或float。如果未指定超時 timeout=None,則會阻塞,一直等待函數執行完成。如果在線程函數完成之前使用future對象取消了執行,則將引發CancelederRor。如果調用raised,此方法將引發相同的異常。

exception(timeout=None):返回線程函數引發的異常。如果線程函數尚未完成,則此方法將最多等待timeout秒。如果線程函數未在超時秒內完成,則將引發concurrent.futures.TimeoutError。超時可以是int或float。如果未指定超時或無超時timeout=None,則會一直等待。如果在線程函數完成之前使用future對象取消了執行,則將引發CancelederRor如果線程函數完成但未引發,則返回None。

add_done_callback(fn):將可調用fn附加到future對象。當future對象被取消或結束運行時,將調用fn,其中future對象是惟一的參數。添加的可調用對象是按照添加順序調用的,並且總是在屬於添加它們的進程的線程中調用。如果Callable引發異常子類,它將被記錄並忽略。如果可調用引發BaseException子類,則行為未定義。


 

七、Module Functions

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):將fs綁定一個future實例,如果future執行完成或取消執行fs函數。

fs:fs是一個函數綁定在future實例(可能由不同的執行器實例創建)。返回2個命名元組的集合。第一組名為“done”,包含等待完成,完成前(完成或future對象取消)。第二組名為“not_done”,包含未完成的future(未完成或正在運行的future)。

timeout:如果為None一直等待,否則會等待timeout秒。

return_when :必須是如下范圍。

Constant

Description

FIRST_COMPLETED

當任何future 完成或取消或者線程函數執行完成時。

FIRST_EXCEPTION

當future通過引發異常而結束時,線程函數將返回。如果沒有future引發異常,那么它相當於所有已完成的。

ALL_COMPLETED

當所有future完成或取消時,函數將返回。

 

concurrent.futures.as_completed(fs, timeout=None):返回一個future迭代器。

fs:可迭代對象的future。

timeout:超時時間,如果為None會一直阻塞直到執行完成。否則將等待timeout秒。

from concurrent.futures._base import as_completed
from concurrent.futures import ThreadPoolExecutor

def work(num):
    return num ** 2
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    future_list = []  # 存放future對象
    for i in range(5):
        future_list.append(executor.submit(work, i))
    for future in as_completed(future_list):   # 這是一個無聊的用法
        res = future.result()
        print(f'結果:{res}')  # 打印工作線程返回的結果
# 打印結果如下

結果:0
結果:4
結果:16
結果:1
結果:9

 

參考文檔:https://docs.python.org/3/library/concurrent.futures.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM