前言
Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼,但是當項目達到一定的規模,頻繁創建/銷毀進程或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/進程池,以空間換時間。但從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。
相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:
主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。
當一個線程完成的時候,主線程能夠立即知道。
讓多線程和多進程的編碼接口一致。
線程池的使用
from concurrent.futures import ThreadPoolExecutor
import time
def task(i):
time.sleep(i)
print(f'task{i} completed')
pass
thread_executor = ThreadPoolExecutor(max_workers=7) # 創建一個最大容納數量為7的線程池
start = time.time()
task1 = thread_executor.submit(task, 0)
task1 = thread_executor.submit(task, 1)
task1 = thread_executor.submit(task, 2)
task1 = thread_executor.submit(task, 3)
task1 = thread_executor.submit(task, 4)
# 打印該任務是否執行完畢
print(task1.done())
# 只有未被提交的到線程池(在等待提交的隊列中)的任務才能夠取消
print(task3.cancel())
time.sleep(4) # 休眠4秒鍾之后,線程池中的任務全部執行完畢,可以打印狀態
print(task1.done())
print(task1.result()) # 該任務的return 返回值 該方法是阻塞的。
thread_executor.shutdown(wait=True)
end = time.time()
print(f'Time consuming {end - start}')
- 執行結果
task0 completed
False
False
task1 completed
task2 completed
task3 completed
True
None
task4 completed
Time consuming 4.002375364303589
- ThreadPoolExecutor構造實例的時候,傳入max_workers參數來設置線程池中最多能同時運行的線程數目。
- 使用submit函數來提交線程需要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(類似於文件、畫圖),注意submit()不是阻塞的,而是立即返回。
- 通過submit函數返回的任務句柄,能夠使用done()方法判斷該任務是否結束。上面的例子可以看出,由於任務有2s的延時,在task1提交后立刻判斷,task1還未完成,而在延時4s之后判斷,task1就完成了。
- 使用cancel()方法可以取消提交的任務,如果任務已經在線程池中運行了,就取消不了。這個例子中,線程池的大小設置為2,任務已經在運行了,所以取消失敗。如果改變線程池的大小為1,那么先提交的是task1,task2還在排隊等候,這是時候就可以成功取消。
- 使用result()方法可以獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。
as_completed
- 上面雖然提供了判斷任務是否結束的方法,但是不能在主線程中一直判斷啊。有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。
- as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for循環下面的語句,然后繼續阻塞住,循環到所有的任務結束。從結果也可以看出,先完成的任務會先通知主線程。
map
- 除了上面的as_completed方法,還可以使用executor.map方法,但是有一點不同。
from concurrent.futures import ThreadPoolExecutor
import time
def task(i):
time.sleep(i)
print(f'task{i} completed')
pass
thread_executor = ThreadPoolExecutor(max_workers=7) # 創建一個最大容納數量為7的線程池
start = time.time()
params = [i for i in range(5)]
thread_executor.map(task, params)
thread_executor.shutdown(wait=True)
end = time.time()
print(f'Time consuming {end - start}')
- 執行結果
task0 completed
task1 completed
task2 completed
task3 completed
task4 completed
Time consuming 4.002383232116699
ProcessPoolExecutor使用
ProcessPoolExecutor在使用上和ThreadPoolExecutor大致是一樣的,它們在futures中的方法也是相同的,但是對於map()方法ProcessPoolExecutor會多一個參數chunksize(ThreadPoolExecutor中這個參數沒有任何作用),chunksize將迭代對象切成塊,將其作為分開的任務提交給pool,對於很大的iterables,設置較大chunksize可以提高性能。