假設我們要從一個網站用Python程序下載5張圖片,最傳統的思路就是寫個for循環挨個挨個下載,但是這樣做有個缺點,就是除了第一張,每張圖片都必須等待前一張圖片下載完畢后,才可以開始下載。由於網絡有很高的延遲,為了不浪費CPU周期去等待,最好在收到網絡響應之前做一些其他的事。比方,我們可以開啟5個線程同時下載5張圖片,當線程發起網絡請求時,Python解釋器切換到其他線程,而當網絡請求發回響應時,Python解釋器再切回到原先的線程,繼續執行下個步驟
下面,我們來看兩個小例子:
import os import time import sys import requests POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710", "pms_1512614327.2483640", "pms_1525853341.8312102"] # <1>設定我們要下載的五張圖片的名稱 BASE_URL = 'https://i1.mifile.cn/a1' # <2>圖片的站點 DEST_DIR = 'downloads\\' # <3>我們保存圖片到本地的路徑 def save_flag(img, filename): # <4>保存圖片的方法 path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): # <5>從網絡讀取要下載的圖片內容 url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def download_many(cc_list): # <6>循環下載圖片 for cc in sorted(cc_list): image = get_flag(cc) sys.stdout.flush() save_flag(image, cc.lower() + '.jpg') return len(cc_list) def main(download_many): # <7>計算連續下載5張圖片的時間 directory = os.path.join(DEST_DIR) if not os.path.exists(directory): os.mkdir(directory) t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main(download_many)
運行結果:
5 flags downloaded in 0.50s
從上面可以看到,連續下載5張圖片,需要0.5s。接下來,讓我們用多線程下載5張圖片,在用多線程下載之前,我們先介紹concurrent.futures模塊,這個模塊的主要特色是ThreadPoolExecutor和ProcessPoolExecutor類,這兩個類實現的接口能分別在不同的線程或進程中執行可調用的對象。這兩個類的內部維護着一個工作線程或進程池,以及要執行的任務隊列
利用ThreadPoolExecutor,我們就可以實現多線程下載圖片了:
import os from concurrent import futures import sys import requests import time MAX_WORKERS = 20 # <1>最大線程數 POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710", "pms_1512614327.2483640", "pms_1525853341.8312102"] BASE_URL = 'https://i1.mifile.cn/a1' DEST_DIR = 'downloads\\' def save_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def download_one(cc): image = get_flag(cc) sys.stdout.flush() save_flag(image, cc.lower() + '.jpg') return cc def download_many(cc_list): # <2>多線程同時下載圖片,這里活躍線程數為5,即5張圖片 workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one, sorted(cc_list)) return len(list(res)) def main(download_many): directory = os.path.join(DEST_DIR) if not os.path.exists(directory): os.mkdir(directory) t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main(download_many)
運行結果:
5 flags downloaded in 0.10s
從運行結果來看,5張圖片只要0.1s,下載速度僅僅是上個例子的五分之一,速度大大提高。
executor.map()方法會返回一個生成器,因為可以迭代獲取每個線程的執行結果
Future類:
標准庫中有兩個名為Future的類,分別是concurrent.futures.Future和asyncio.Future。這兩個類的作用相同,兩個Future類的實例都表示可能已經完成或尚未完成的延遲計算。通常,我們不用自己創建Future的實例,而是由並發框架來實例化。原因很簡單,Future表示終將運行或者完成的事情,而確定某件事會發生的唯一方式就是執行的時間已經排定。因此,只有把特定的某件事交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。例如,Executor.submit()方法的參數就是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,並返回一個Future對象
客戶端不應該去修改Future對象的狀態,並發框架會在線程計算完畢后改變Future對象的狀態,而我們無控制計算的開始運行和結束運行
concurrent.futures.Future和asyncio.Future兩個類中都有done()方法,這個方法不阻塞,返回的是布爾值,用來表明鏈接的可調用對象是否已經執行了。客戶端代碼通常不會詢問Future對象是否運行結束,而是等待通知。因此,兩個Future類都有add_done_callback()方法,這個方法只有一個參數,類型是可調用對象,Future運行結束后悔調用指定的可調用對象,如下例:
from concurrent.futures import ThreadPoolExecutor def add(x, y): return x + y def square(obj): res = obj.result() ** 2 print(res) return res t = ThreadPoolExecutor(2) t.submit(add, 1, 2).add_done_callback(square)
運行結果:
9
此外,concurrent.futures.Future和asyncio.Future都有result()方法。在Future運行結束后調用,這兩個方法作用相同,都是返回可調用對象的結果,或者拋出執行可調用對象時拋出的異常。如果運行期還沒結束的話,result()方法在兩個Future類中的行為差別很大。對於concurrent.futures.Future實例來說,f.result()方法會阻塞調用方所在的線程,直到有結果返回。此時,result()方法可接受timeout參數,如果在指定的時間范圍內可調用對象還沒執行完,就會拋出TimeoutError異常,而asyncio.Future的result()方法不支持設定超時時間,從那個庫中取出運行結果最好的辦法是使用yield from結構。不過,對concurrent.futures.Future不能這么做
這兩個庫中有幾個函數會返回Future對象,其他函數則是使用Future對象,如Executor.map()方法屬於后者,返回的是一個迭代器,迭代器的__next__方法調用Future對象的result()方法,得到我們的運行結果
concurrent.futures.as_completed()函數的參數是一個Future列表,返回值是一個迭代器,在調用as_completed()方法時不會阻塞,只有當對迭代器進行循環時,每調用一次next()方法,如果當前Future對象還未執行結束,則會陷入阻塞
下面展示如何使用as_completed()函數:
import time from time import sleep from concurrent.futures import ThreadPoolExecutor, as_completed def download_img(cc): # <1> sleep(.03) return cc cc_list = ["a", "b", "c"] with ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in cc_list: future = executor.submit(download_img, cc) to_do.append(future) result = [] t0 = time.time() for future in as_completed(to_do): # <2> res = future.result() result.append(res) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed))
運行結果:
3 flags downloaded in 0.03s
<1>處該方法模擬下載一張圖片后,返回圖片名稱
<2>as_completed()函數接收一個Future列表,返回一個生成器,在迭代時如果有Future對象還未運行完畢,則會陷入阻塞直到結果返回
阻塞型I/O和GIL
CPython解釋器本身就不是線程安全的,因此有個全局解釋器鎖(GIL),一次只允許使用一個線程執行Python字節碼。因此,一個Python進程通常不能同時使用多個CPU。然而,這並不意味着Python的多線程一無是處,相反,Python的多線程更適合I/O密集型的場景,當一個Python線程等待網絡響應時,阻塞型的I/O函數會釋放GIL,再運行另外一個線程。
如果非要再一個Python進程中使用多個CPU,有兩個辦法,第一個是編寫Python的C擴展,這樣可以真正的使用到系統的多個CPU,另外一個辦法是在一個Python進程中再啟動多個Python進程,用這個辦法繞開GIL。由於第一個方法比較復雜,這里不做介紹,主要介紹第二個繞開GIL的辦法,為了使用到操作系統的所有CPU,我們可以使用concurrent.futures.ProcessPoolExecutor類把一些CPU密集型的工作分配給多個Python進程來處理
下面的兩個例子,分別用ThreadPoolExecutor類和ProcessPoolExecutor類來計算CPU密集型程序,這里的calc()方法就是我們的CPU密集型的方法,這個方法里不再發送網絡請求,而是直接計算10000次的UUID
import time from concurrent.futures import ThreadPoolExecutor, as_completed import uuid m, n = 100, 10000 max_workers = 10 def calc(): for i in range(n): uuid.uuid1() return True def thread_pool_test(): to_do = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: t0 = time.time() for i in range(m): future = executor.submit(calc) to_do.append(future) result = [] for future in as_completed(to_do): res = future.result() result.append(res) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed)) if __name__ == "__main__": thread_pool_test()
運行結果:
100 flags downloaded in 3.26s
我們可以看到,我們設定10個活躍線程,發起100次的並發任務,每次任務都執行10000次的UUID的計算,最后所需時間為3.26S,我們再看下一個例子
import time from concurrent.futures import ProcessPoolExecutor, as_completed import uuid m, n = 100, 10000 def calc(): for i in range(n): uuid.uuid1() return True def process_pool_test(): with ProcessPoolExecutor() as executor: t0 = time.time() to_do = {executor.submit(calc): i for i in range(m)} result = [] for future in as_completed(to_do): res = future.result() result.append(res) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed)) if __name__ == "__main__": process_pool_test()
運行結果:
100 flags downloaded in 1.91s
在上面這個例子中,我們由原先的線程改為進程計算,這里我們並沒有指定用多少個進程計算,一般不指定進程數量的時候,ProcessPoolExecutor默認最大進程數為os.cpu_count(),我是4核CPU,所以我的最大工作數量沒有上一個例子中最大工作數20多,這里我們依然執行100次任務,每個任務都是執行10000次UUID的計算,但需要的時間只要1.91S
Executor.map()方法
如果想並發運行多個可調用對象,可使用Executor.map()方法,而不必像之前通過for循環再把可執行對象提交給Executor
from time import sleep, strftime from concurrent.futures import ThreadPoolExecutor max_workers = 3 def display(n): print(strftime('[%H:%M:%S]'), "%s thread start" % n) sleep(n) print(strftime('[%H:%M:%S]'), "%s thread end" % n) return n def main(): with ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(display, range(0, 6)) # <1> for index, result in enumerate(results): # <2> print('result {}: {}'.format(index, result)) main()
運行結果:
[10:51:31] 0 thread start [10:51:31] 0 thread end [10:51:31] 1 thread start [10:51:31] 2 thread start result 0: 0 [10:51:31] 3 thread start [10:51:32] 1 thread end [10:51:32] 4 thread start result 1: 1 [10:51:33] 2 thread end [10:51:33] 5 thread start result 2: 2 [10:51:34] 3 thread end result 3: 3 [10:51:36] 4 thread end result 4: 4 [10:51:38] 5 thread end result 5: 5
<1>處我們把一個可執行的對象以及所需參數,提交給executor.map()方法,返回一個生成器
<2>處for循環中的enumrate()函數會隱式調用next(results),這個函數會間接調用Future中的result()方法,這個方法會阻塞調用線程,一直到運行結束