Python線程池及其原理和使用(超級詳細)
系統啟動一個新線程的成本是比較高的,因為它涉及與操作系統的交互。在這種情形下,使用線程池可以很好地提升性能,尤其是當程序中需要創建大量生存期很短暫的線程時,更應該考慮使用線程池。
線程池在系統啟動時即創建大量空閑的線程,程序只要將一個函數提交給線程池,線程池就會啟動一個空閑的線程來執行它。當該函數執行結束后,該線程並不會死亡,而是再次返回到線程池中變成空閑狀態,等待執行下一個函數。
此外,使用線程池可以有效地控制系統中並發線程的數量。當系統中包含有大量的並發線程時,會導致系統性能急劇下降,甚至導致 Python 解釋器崩潰,而線程池的最大線程數參數可以控制系統中並發線程的數量不超過此數。
如果使用線程池/進程池來管理並發編程,那么只要將相應的 task 函數提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定。
Exectuor 提供了如下常用方法:
程序將 task 函數提交(submit)給線程池后,submit 方法會返回一個 Future 對象,Future 類主要用於獲取線程任務函數的返回值。由於線程任務會在新線程中以異步方式執行,因此,線程執行的函數相當於一個“將來完成”的任務,所以 Python 使用 Future 來代表。
在用完一個線程池后,應該調用該線程池的 shutdown() 方法,該方法將啟動線程池的關閉序列。調用 shutdown() 方法后的線程池不再接收新任務,但會將以前所有的已提交任務執行完成。當線程池中的所有任務都執行完成后,該線程池中的所有線程都會死亡。
使用線程池來執行線程任務的步驟如下:
下面程序示范了如何使用線程池來執行線程任務:
上面程序中,第 13 行代碼創建了一個包含兩個線程的線程池,接下來的兩行代碼只要將 action() 函數提交(submit)給線程池,該線程池就會負責啟動線程來執行 action() 函數。這種啟動線程的方法既優雅,又具有更高的效率。
當程序把 action() 函數提交給線程池時,submit() 方法會返回該任務所對應的 Future 對象,程序立即判斷 futurel 的 done() 方法,該方法將會返回 False(表明此時該任務還未完成)。接下來主程序暫停 3 秒,然后判斷 future2 的 done() 方法,如果此時該任務已經完成,那么該方法將會返回 True。
程序最后通過 Future 的 result() 方法來獲取兩個異步任務返回的結果。
如果程序不希望直接調用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調函數,該回調函數形如 fn(future)。當線程任務完成后,程序會自動觸發該回調函數,並將對應的 Future 對象作為參數傳給該回調函數。
下面程序使用 add_done_callback() 方法來獲取線程任務的返回值:
上面主程序分別為 future1、future2 添加了同一個回調函數,該回調函數會在線程任務結束時獲取其返回值。
主程序的最后一行代碼打印了一條橫線。由於程序並未直接調用 future1、future2 的 result() 方法,因此主線程不會被阻塞,可以立即看到輸出主線程打印出的橫線。接下來將會看到兩個新線程並發執行,當線程任務執行完成后,get_result() 函數被觸發,輸出線程任務的返回值。
另外,由於線程池實現了上下文管理協議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動關閉線程池,如上面的程序所示。
此外,Exectuor 還提供了一個
例如,如下程序使用 Executor 的 map() 方法來啟動線程,並收集線程任務的返回值:
上面程序使用 map() 方法來啟動 3 個線程(該程序的線程池包含 4 個線程,如果繼續使用只包含兩個線程的線程池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,線程空閑出來才會獲得執行的機會),map() 方法的返回值將會收集每個線程任務的返回結果。
運行上面程序,同樣可以看到 3 個線程並發執行的結果,最后通過 results 可以看到 3 個線程任務的返回結果。
通過上面程序可以看出,使用 map() 方法來啟動線程,並收集線程的執行結果,不僅具有代碼簡單的優點,而且雖然程序會以並發方式來執行 action() 函數,但最后收集的 action() 函數的執行結果,依然與傳入參數的結果保持一致。也就是說,上面 results 的第一個元素是 action(50) 的結果,第二個元素是 action(100) 的結果,第三個元素是 action(150) 的結果。
線程池在系統啟動時即創建大量空閑的線程,程序只要將一個函數提交給線程池,線程池就會啟動一個空閑的線程來執行它。當該函數執行結束后,該線程並不會死亡,而是再次返回到線程池中變成空閑狀態,等待執行下一個函數。
此外,使用線程池可以有效地控制系統中並發線程的數量。當系統中包含有大量的並發線程時,會導致系統性能急劇下降,甚至導致 Python 解釋器崩潰,而線程池的最大線程數參數可以控制系統中並發線程的數量不超過此數。
線程池的使用
線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用於創建線程池,而 ProcessPoolExecutor 用於創建進程池。如果使用線程池/進程池來管理並發編程,那么只要將相應的 task 函數提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定。
Exectuor 提供了如下常用方法:
- submit(fn, *args, **kwargs):將 fn 函數提交給線程池。*args 代表傳給 fn 函數的參數,*kwargs 代表以關鍵字參數的形式為 fn 函數傳入參數。
- map(func, *iterables, timeout=None, chunksize=1):該函數類似於全局函數 map(func, *iterables),只是該函數將會啟動多個線程,以異步方式立即對 iterables 執行 map 處理。
- shutdown(wait=True):關閉線程池。
程序將 task 函數提交(submit)給線程池后,submit 方法會返回一個 Future 對象,Future 類主要用於獲取線程任務函數的返回值。由於線程任務會在新線程中以異步方式執行,因此,線程執行的函數相當於一個“將來完成”的任務,所以 Python 使用 Future 來代表。
實際上,在 Java 的多線程編程中同樣有 Future,此處的 Future 與 Java 的 Future 大同小異。
Future 提供了如下方法:- cancel():取消該 Future 代表的線程任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程序會取消該任務,並返回 True。
- cancelled():返回 Future 代表的線程任務是否被成功取消。
- running():如果該 Future 代表的線程任務正在執行、不可被取消,該方法返回 True。
- done():如果該 Funture 代表的線程任務被成功取消或執行完成,則該方法返回 True。
- result(timeout=None):獲取該 Future 代表的線程任務最后返回的結果。如果 Future 代表的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數指定最多阻塞多少秒。
- exception(timeout=None):獲取該 Future 代表的線程任務所引發的異常。如果該任務成功完成,沒有異常,則該方法返回 None。
- add_done_callback(fn):為該 Future 代表的線程任務注冊一個“回調函數”,當該任務成功完成時,程序會自動觸發該 fn 函數。
在用完一個線程池后,應該調用該線程池的 shutdown() 方法,該方法將啟動線程池的關閉序列。調用 shutdown() 方法后的線程池不再接收新任務,但會將以前所有的已提交任務執行完成。當線程池中的所有任務都執行完成后,該線程池中的所有線程都會死亡。
使用線程池來執行線程任務的步驟如下:
- 調用 ThreadPoolExecutor 類的構造器創建一個線程池。
- 定義一個普通函數作為線程任務。
- 調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。
- 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。
下面程序示范了如何使用線程池來執行線程任務:
- from concurrent.futures import ThreadPoolExecutor
- import threading
- import time
- # 定義一個准備作為線程任務的函數
- def action(max):
- my_sum = 0
- for i in range(max):
- print(threading.current_thread().name + ' ' + str(i))
- my_sum += i
- return my_sum
- # 創建一個包含2條線程的線程池
- pool = ThreadPoolExecutor(max_workers=2)
- # 向線程池提交一個task, 50會作為action()函數的參數
- future1 = pool.submit(action, 50)
- # 向線程池再提交一個task, 100會作為action()函數的參數
- future2 = pool.submit(action, 100)
- # 判斷future1代表的任務是否結束
- print(future1.done())
- time.sleep(3)
- # 判斷future2代表的任務是否結束
- print(future2.done())
- # 查看future1代表的任務返回的結果
- print(future1.result())
- # 查看future2代表的任務返回的結果
- print(future2.result())
- # 關閉線程池
- pool.shutdown()
當程序把 action() 函數提交給線程池時,submit() 方法會返回該任務所對應的 Future 對象,程序立即判斷 futurel 的 done() 方法,該方法將會返回 False(表明此時該任務還未完成)。接下來主程序暫停 3 秒,然后判斷 future2 的 done() 方法,如果此時該任務已經完成,那么該方法將會返回 True。
程序最后通過 Future 的 result() 方法來獲取兩個異步任務返回的結果。
讀者可以自己運行此代碼查看運行結果,這里不再演示。
當程序使用 Future 的 result() 方法來獲取結果時,該方法會阻塞當前線程,如果沒有指定 timeout 參數,當前線程將一直處於阻塞狀態,直到 Future 代表的任務返回。獲取執行結果
前面程序調用了 Future 的 result() 方法來獲取線程任務的運回值,但該方法會阻塞當前主線程,只有等到錢程任務完成后,result() 方法的阻塞才會被解除。如果程序不希望直接調用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調函數,該回調函數形如 fn(future)。當線程任務完成后,程序會自動觸發該回調函數,並將對應的 Future 對象作為參數傳給該回調函數。
下面程序使用 add_done_callback() 方法來獲取線程任務的返回值:
- from concurrent.futures import ThreadPoolExecutor
- import threading
- import time
- # 定義一個准備作為線程任務的函數
- def action(max):
- my_sum = 0
- for i in range(max):
- print(threading.current_thread().name + ' ' + str(i))
- my_sum += i
- return my_sum
- # 創建一個包含2條線程的線程池
- with ThreadPoolExecutor(max_workers=2) as pool:
- # 向線程池提交一個task, 50會作為action()函數的參數
- future1 = pool.submit(action, 50)
- # 向線程池再提交一個task, 100會作為action()函數的參數
- future2 = pool.submit(action, 100)
- def get_result(future):
- print(future.result())
- # 為future1添加線程完成的回調函數
- future1.add_done_callback(get_result)
- # 為future2添加線程完成的回調函數
- future2.add_done_callback(get_result)
- print('--------------')
主程序的最后一行代碼打印了一條橫線。由於程序並未直接調用 future1、future2 的 result() 方法,因此主線程不會被阻塞,可以立即看到輸出主線程打印出的橫線。接下來將會看到兩個新線程並發執行,當線程任務執行完成后,get_result() 函數被觸發,輸出線程任務的返回值。
另外,由於線程池實現了上下文管理協議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動關閉線程池,如上面的程序所示。
此外,Exectuor 還提供了一個
map(func, *iterables, timeout=None, chunksize=1)
方法,該方法的功能類似於全局函數 map(),區別在於線程池的 map() 方法會為 iterables 的每個元素啟動一個線程,以並發方式來執行 func 函數。這種方式相當於啟動 len(iterables) 個線程,井收集每個線程的執行結果。
例如,如下程序使用 Executor 的 map() 方法來啟動線程,並收集線程任務的返回值:
- from concurrent.futures import ThreadPoolExecutor
- import threading
- import time
- # 定義一個准備作為線程任務的函數
- def action(max):
- my_sum = 0
- for i in range(max):
- print(threading.current_thread().name + ' ' + str(i))
- my_sum += i
- return my_sum
- # 創建一個包含4條線程的線程池
- with ThreadPoolExecutor(max_workers=4) as pool:
- # 使用線程執行map計算
- # 后面元組有3個元素,因此程序啟動3條線程來執行action函數
- results = pool.map(action, (50, 100, 150))
- print('--------------')
- for r in results:
- print(r)
運行上面程序,同樣可以看到 3 個線程並發執行的結果,最后通過 results 可以看到 3 個線程任務的返回結果。
通過上面程序可以看出,使用 map() 方法來啟動線程,並收集線程的執行結果,不僅具有代碼簡單的優點,而且雖然程序會以並發方式來執行 action() 函數,但最后收集的 action() 函數的執行結果,依然與傳入參數的結果保持一致。也就是說,上面 results 的第一個元素是 action(50) 的結果,第二個元素是 action(100) 的結果,第三個元素是 action(150) 的結果。