抨擊線程的往往是系統程序員,他們考慮的使用場景對一般的應用程序員來說,也許一生都不會遇到……應用程序員遇到的使用場景,99% 的情況下只需知道如何派生一堆獨立的線程,然后用隊列收集結果。
示例:網絡下載的三種風格
為了高效處理網絡 I/O,需要使用並發,因為網絡有很高的延遲,所以為了不浪費 CPU 周期去等待,最好在收到網絡響應之前做些其他的事。
為了通過代碼說明這一點,我寫了三個示例程序,從網上下載 20 個國家的國旗圖像。第一個示例程序 flags.py 是依序下載的:下載完一個圖像,並將其保存在硬盤中之后,才請求下一個圖像。另外兩個腳本是並發下載的:幾乎同時請求所有圖像,每下載完一個文件就保存一個文件。flags_threadpool.py 腳本使用 concurrent.futures 模塊,而flags_asyncio.py 腳本使用 asyncio 包。
運行 flags.py、flags_threadpool.py 和 flags_asyncio.py 腳本得到的結果
$ python3 flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN ➊ 每次運行腳本后,首先顯示下載過程中下載完畢的國家代碼,最后顯示一個消息,說明耗時 20 flags downloaded in 7.26s ➋ flags.py 腳本下載 20 個圖像平均用時 7.18 秒 $ python3 flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 7.20s $ python3 flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 7.09s $ python3 flags_threadpool.py DE BD CN JP ID EG NG BR RU CD IR MX US PH FR PK VN IN ET TR 20 flags downloaded in 1.37s ➌ flags_threadpool.py 腳本平均用時 1.40 秒 $ python3 flags_threadpool.py EG BR FR IN BD JP DE RU PK PH CD MX ID US NG TR CN VN ET IR 20 flags downloaded in 1.60s $ python3 flags_threadpool.py BD DE EG CN ID RU IN VN ET MX FR CD NG US JP TR PK BR IR PH 20 flags downloaded in 1.22s $ python3 flags_asyncio.py ➍ flags_asyncio.py 腳本平均用時 1.35 秒 BD BR IN ID TR DE CN US IR PK PH FR RU NG VN ET MX EG JP CD 20 flags downloaded in 1.36s $ python3 flags_asyncio.py RU CN BR IN FR BD TR EG VN IR PH CD ET ID NG DE JP PK MX US 20 flags downloaded in 1.27s $ python3 flags_asyncio.py RU IN ID DE BR VN PK MX US IR ET EG NG BD FR CN JP PH CD TR ➎ 注意國家代碼的順序:對並發下載的腳本來說,每次下載的順序都不同 20 flags downloaded in 1.42s
兩個並發下載的腳本之間性能差異不大,不過都比依序下載的腳本快 5倍多。這只是一個特別小的任務,如果把下載的文件數量增加到幾百個,並發下載的腳本能比依序下載的腳本快 20 倍或更多。
依序下載的腳本
🌰 flags.py:依序下載的腳本;另外兩個腳本會重用其中幾個函數
1 import os 2 import sys 3 import time 4 5 import requests 6 7 POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 8 'MX PH VN ET EG DE IR TR CD FR').split() 9 10 BASE_URL = 'http://flupy.org/data/flags' # 下載國旗網站的入口 11 12 DEST_DIR = 'downloads/' 13 14 os.mkdir(DEST_DIR) if not os.path.exists(DEST_DIR) else 'ok' #判斷目錄是否存在,不存在在就創建 15 16 17 def save_flag(img, filename): # 保存圖片的函數,傳遞img的二進制流和還有國旗的名稱 18 path = os.path.join(DEST_DIR, filename) 19 with open(path, 'wb') as fp: 20 fp.write(img) 21 22 23 def get_flag(cc): # 獲取國旗的下載地址,通過requests中的content獲取二進制流 24 url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) 25 resp = requests.get(url) 26 return resp.content 27 28 29 def show(text): # 顯示一個字符串,然后刷新sys.stdout 30 print(text, end=' ') 31 sys.stdout.flush() 32 33 34 def download_many(cc_list): # download_many 是與並發實現比較的關鍵函數 35 for cc in sorted(cc_list): 36 image = get_flag(cc) 37 show(cc) 38 save_flag(image, cc.lower() + '.gif') 39 40 return len(cc_list) 41 42 43 def main(download_many): # main 函數記錄並報告運行 download_many 函數之后的耗時 44 t0 = time.time() 45 count = download_many(POP20_CC) 46 elapsed = time.time() - t0 47 msg = '\n{} flags downloaded in {:.2f}s' 48 print(msg.format(count, elapsed)) 49 50 51 if __name__ == "__main__": 52 main(download_many)
使用concurrent.futures模塊下載
concurrent.futures 模塊的主要特色是 ThreadPoolExecutor 和ProcessPoolExecutor 類,這兩個類實現的接口能分別在不同的線程或進程中執行可調用的對象。這兩個類在內部維護着一個工作線程或進程池,以及要執行的任務隊列。不過,這個接口抽象的層級很高,像下載國旗這種簡單的案例,無需關心任何實現細節。
🌰 flags_threadpool.py:使用futures.ThreadPoolExecutor 類實現多線程下載的腳本
1 from concurrent import futures 2 3 from flags import save_flag, get_flag, show, main 4 5 6 MAX_WORKERS = 20 # 設定 ThreadPoolExecutor 類最多使用幾個線程 7 8 9 def download_one(cc): # 下載一個圖像的函數;這是在各個線程中執行的函數 10 image = get_flag(cc) 11 show(cc) 12 save_flag(image, cc.lower() + '.gif') 13 return cc 14 15 16 def download_many(cc_list): 17 workers = min(MAX_WORKERS, len(cc_list)) # 設定工作的線程數量:使用允許的最大值(MAX_WORKERS)與要 18 # 處理的數量之間較小的那個值,以免創建多余的線程。 19 with futures.ThreadPoolExecutor(workers) as executor: # 使用工作的線程數實例化 ThreadPoolExecutor類 20 res = executor.map(download_one, sorted(cc_list)) # map 方法的作用與內置的 map 函數類似,不過 download_one 函數會在 21 # 多個線程中並發調用;map 方法返回一個生成器,因此可以迭代,獲取各個函數返回的值。 22 return len(list(res)) #返回獲取的結果數量;如果有線程拋出異常,異常會在這里拋出 23 24 if __name__ == "__main__": 25 main(download_many)
期物在哪里
期物是 concurrent.futures 模塊和 asyncio 包的重要組件,從 Python 3.4 起,標准庫中有兩個名為 Future 的類:concurrent.futures.Future 和 asyncio.Future。這兩個類的作用相同:兩個 Future 類的實例都表示可能已經完成或者尚未完成的延遲計算。這與 Twisted 引擎中的 Deferred 類、Tornado 框架中的Future 類,以及多個 JavaScript 庫中的 Promise 對象類似。
期物封裝待完成的操作,可以放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)后可以獲取結果(或異常)。
我們要記住一件事:通常情況下自己不應該創建期物,而只能由並發框架(concurrent.futures 或 asyncio)實例化。原因很簡單:期物表示終將發生的事情,而確定某件事會發生的唯一方式是執行的時間已經排定。因此,只有排定把某件事交給concurrent.futures.Executor 子類處理時,才會創建concurrent.futures.Future 實例。例如,Executor.submit() 方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,並返回一個期物。
這兩種期物都有 .done() 方法,這個方法不阻塞,返回值是布爾值,指明期物鏈接的可調用對象是否已經執行。客戶端代碼通常不會詢問期物是否運行結束,而是會等待通知。因此,兩個 Future 類都有.add_done_callback() 方法:這個方法只有一個參數,類型是可調用的對象,期物運行結束后會調用指定的可調用對象。
此外,還有 .result() 方法。在期物運行結束后調用的話,這個方法在兩個 Future 類中的作用相同:返回可調用對象的結果,或者重新拋出執行可調用的對象時拋出的異常。可是,如果期物沒有運行結束,result 方法在兩個 Future 類中的行為相差很大。對concurrency.futures.Future 實例來說,調用 f.result() 方法會阻塞調用方所在的線程,直到有結果可返回。此時,result 方法可以接收可選的 timeout 參數,如果在指定的時間內期物沒有運行完畢,會拋出 TimeoutError 異常。
為了使用 futures.as_completed 函數,只需修改 download_many 函數,把較抽象的 executor.map 調用換成兩個 for 循環:一個用於創建並排定期物,另一個用於獲取期物的結果。同時,我們會添加幾個print 調用,顯示運行結束前后的期物。修改后的 download_many 函數如示例。
🌰 flags_threadpool_ac.py:把download_many 函數中的executor.map 方法換成 executor.submit 方法和futures.as_completed 函數
1 from concurrent import futures 2 3 from flags_threadpool import download_one, main 4 5 def download_many(cc_list): 6 cc_list = cc_list[:5] # 這次演示只使用人口最多的 5 個國家 7 with futures.ThreadPoolExecutor(max_workers=3) as executor: # 把 max_workers 硬編碼為 3,以便在輸出中觀察待完成的期物 8 to_do = [] 9 for cc in sorted(cc_list): # 按照字母表順序迭代國家代碼,明確表明輸出的順序與輸入一致 10 future = executor.submit(download_one, cc) # executor.submit 方法排定可調用對象的執行時間, 11 # 然后返回一個期物,表示這個待執行的操作。 12 to_do.append(future) # 存儲各個期物,后面傳給 as_completed 函數 13 msg = 'Scheduled for {}: {}' 14 print(msg.format(cc, future)) # 顯示一個消息,包含國家代碼和對應的期物 15 16 results = [] 17 for future in futures.as_completed(to_do): # as_completed 函數在期物運行結束后產出期物 18 res = future.result() # 獲取該期物的結果 19 msg = '{} result: {!r}' 20 print(msg.format(future, res)) # 顯示期物及其結果 21 results.append(res) 22 23 return len(results) 24 25 if __name__ == "__main__": 26 main(download_many)
flags_threadpool_ac.py 腳本的輸出
$ python3 flags_threadpool_ac.py Scheduled for BR: <Future at 0x100791518 state=running> ➊ 排定的期物按字母表排序;期物的repr()方法會顯示期物的狀態前三個running,以為有三個線程可用 Scheduled for CN: <Future at 0x100791710 state=running> Scheduled for ID: <Future at 0x100791a90 state=running> Scheduled for IN: <Future at 0x101807080 state=pending> ➋ 后兩個期物的狀態是pending,等待有線程可用 Scheduled for US: <Future at 0x101807128 state=pending> CN <Future at 0x100791710 state=finished returned str> result: 'CN' ➌ 這一行里的第一個CN是運行在一個工作線程中的download_one函數里輸出的,隨后的內容是download_many函數輸出的 BR ID <Future at 0x100791518 state=finished returned str> result: 'BR' ➍ 這里有兩個線程輸出國家代碼,然后主線程中有download_many函數輸出第一個線程的結果 <Future at 0x100791a90 state=finished returned str> result: 'ID' IN <Future at 0x101807080 state=finished returned str> result: 'IN' US <Future at 0x101807128 state=finished returned str> result: 'US' 5 flags downloaded in 0.70s
阻塞型I/O和GIL
CPython 解釋器本身就不是線程安全的,因此有全局解釋器鎖(GIL),一次只允許使用一個線程執行 Python 字節碼。因此,一個 Python 進程通常不能同時使用多個 CPU 核心。
編寫 Python 代碼時無法控制 GIL;不過,執行耗時的任務時,可以使用一個內置的函數或一個使用 C 語言編寫的擴展釋放 GIL。其實,有個使用 C 語言編寫的 Python 庫能管理 GIL,自行啟動操作系統線程,利用全部可用的 CPU 核心。這樣做會極大地增加庫代碼的復雜度,因此大多數庫的作者都不這么做。
然而,標准庫中所有執行阻塞型 I/O 操作的函數,在等待操作系統返回結果時都會釋放 GIL。這意味着在 Python 語言這個層次上可以使用多線程,而 I/O 密集型 Python 程序能從中受益:一個 Python 線程等待網絡響應時,阻塞型 I/O 函數會釋放 GIL,再運行一個線程。
使用concurrent.futures模塊啟動進程
concurrent.futures 模塊的文檔(https://docs.python.org/3/library/concurrent.futures.html)副標題是“Launching parallel tasks”(執行並行任務)。這個模塊實現的是真正的並行計算,因為它使用 ProcessPoolExecutor 類把工作分配給多個Python 進程處理。因此,如果需要做 CPU 密集型處理,使用這個模塊能繞開 GIL,利用所有可用的 CPU 核心。
ProcessPoolExecutor 和 ThreadPoolExecutor 類都實現了通用的Executor 接口,因此使用 concurrent.futures 模塊能特別輕松地把基於線程的方案轉成基於進程的方案。
下載國旗的示例或其他 I/O 密集型作業使用 ProcessPoolExecutor 類得不到任何好處。這一點易於驗證,只需把下面 🌰 中這幾行:
def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor:
改成:
def download_many(cc_list): with futures.ProcessPoolExecutor() as executor:
對簡單的用途來說,這兩個實現 Executor 接口的類唯一值得注意的區別是,ThreadPoolExecutor.__init__ 方法需要 max_workers 參數,指定線程池中線程的數量。在 ProcessPoolExecutor 類中,那個參數是可選的,而且大多數情況下不使用——默認值是os.cpu_count() 函數返回的 CPU 數量。這樣處理說得通,因為對CPU 密集型的處理來說,不可能要求使用超過 CPU 數量的值程。而對I/O 密集型處理來說可以在一個 ThreadPoolExecutor 實例中使用 10個、100 個或 1000 個線程;最佳線程數取決於做的是什么事,以及可用內存有多少,因此要仔細測試才能找到最佳的線程數。
實驗Executor.map方法
若想並發運行多個可調用的對象,最簡單的方式是使用 🌰 中見過的 Executor.map 方法
🌰 demo_executor_map.py:簡單演示ThreadPoolExecutor 類的 map 方法
1 from time import sleep, strftime 2 from concurrent import futures 3 4 5 def display(*args): # 打印傳遞的參數,並打印時間戳[HH:MM:SS]格式 6 print(strftime('[%H:%M:%S]'), end=' ') 7 print(*args) 8 9 10 def loiter(n): # 打印信息,休眠n秒 11 msg = '{}loiter({}): doing nothing for {}s...' 12 display(msg.format('\t' * n, n, n)) 13 sleep(n) 14 msg = '{}loiter({}): done.' 15 display(msg.format('\t' * n, n)) 16 return n * 10 # loiter 函數返回 n * 10,以便讓我們了解收集結果的方式 17 18 19 def main(): 20 display('Script starting.') 21 executor = futures.ThreadPoolExecutor(max_workers=3) # 創建 ThreadPoolExecutor 實例,有 3 個線程 22 results = executor.map(loiter, range(5)) # 把5個任務交給executor(3個會提前運行,另外2個等待) 23 display('results:', results) # 生成器,顯示results的結果 24 display('Waiting for individual results:') 25 for i, result in enumerate(results): 26 display('result {}: {}'.format(i, result)) 27 28 main()
以上代碼執行的結果為:
[20:15:11] Script starting. [20:15:11] loiter(0): doing nothing for 0s... [20:15:11] loiter(0): done. [20:15:11] loiter(1): doing nothing for 1s... [20:15:11] loiter(2): doing nothing for 2s... [20:15:11] results: <generator object Executor.map.<locals>.result_iterator at 0x102360f10> [20:15:11] Waiting for individual results: [20:15:11] result 0: 0 [20:15:11] loiter(3): doing nothing for 3s... [20:15:12] loiter(1): done. [20:15:12] loiter(4): doing nothing for 4s... [20:15:12] result 1: 10 [20:15:13] loiter(2): done. [20:15:13] result 2: 20 [20:15:14] loiter(3): done. [20:15:14] result 3: 30 [20:15:16] loiter(4): done. [20:15:16] result 4: 40
顯示下載進度
TQDM 包特別易於使用,項目的 README.md 文件(https://github.com/noamraph/tqdm/blob/master/README.md)中有個 GIF動畫,演示了最簡單的用法。安裝 tqdm 包之后, 在 Python 控制台中輸入下述代碼,會在注釋那里看到進度條動畫:
>>> import time >>> from tqdm import tqdm >>> for i in tqdm(range(1000)): ... time.sleep(.01) ... 51%|████████████████████▎ | 509/1000 [00:05<00:05, 83.79it/s]
除了這個靈巧的效果之外,tqdm 函數的實現方式也很有趣:能處理任何可迭代的對象,生成一個迭代器;使用這個迭代器時,顯示進度條和完成全部迭代預計的剩余時間。為了計算預計剩余時間,tqdm 函數要獲取一個能使用 len 函數確定大小的可迭代對象,或者在第二個參數中指定預期的元素數量。
