Python通過future處理並發


future初識

通過下面腳本來對future進行一個初步了解:
例子1:普通通過循環的方式

 1 import os
 2 import time
 3 import sys
 4 
 5 import requests
 6 
 7 
 8 POP20_CC = (
 9     "CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR"
10 ).split()
11 
12 
13 BASE_URL = 'http://flupy.org/data/flags'
14 
15 DEST_DIR = 'downloads/'
16 
17 
18 def save_flag(img,filename):
19     path = os.path.join(DEST_DIR,filename)
20     with open(path,'wb') as fp:
21         fp.write(img)
22 
23 
24 def get_flag(cc):
25     url = "{}/{cc}/{cc}.gif".format(BASE_URL,cc=cc.lower())
26     resp = requests.get(url)
27     return resp.content
28 
29 
30 def show(text):
31     print(text,end=" ")
32     sys.stdout.flush()
33 
34 
35 def download_many(cc_list):
36     for cc in sorted(cc_list):
37         image = get_flag(cc)
38         show(cc)
39         save_flag(image,cc.lower()+".gif")
40 
41     return len(cc_list)
42 
43 
44 def main(download_many):
45     t0 = time.time()
46     count = download_many(POP20_CC)
47     elapsed = time.time()-t0
48     msg = "\n{} flags downloaded in {:.2f}s"
49     print(msg.format(count,elapsed))
50 
51 
52 if __name__ == '__main__':
53     main(download_many)

例子2:通過future方式實現,這里對上面的部分代碼進行了復用

 1 from concurrent import futures
 2 
 3 from flags import save_flag, get_flag, show, main
 4 
 5 
 6 MAX_WORKERS = 20
 7 
 8 
 9 def download_one(cc):
10     image = get_flag(cc)
11     show(cc)
12     save_flag(image, cc.lower()+".gif")
13     return cc
14 
15 
16 def download_many(cc_list):
17     workers = min(MAX_WORKERS,len(cc_list))
18     with futures.ThreadPoolExecutor(workers) as executor:
19         res = executor.map(download_one, sorted(cc_list))
20 
21     return len(list(res))
22 
23 
24 if __name__ == '__main__':
25     main(download_many)

分別運行三次,兩者的平均速度:13.67和1.59s,可以看到差別還是非常大的。

future

future是concurrent.futures模塊和asyncio模塊的重要組件
從python3.4開始標准庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個Future類的實例都表示可能完成或者尚未完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能類似

注意:通常情況下自己不應該創建future,而是由並發框架(concurrent.futures或asyncio)實例化

原因:future表示終將發生的事情,而確定某件事情會發生的唯一方式是執行的時間已經安排好,因此只有把某件事情交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。
如:Executor.submit()方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排定時間,並返回一個future

客戶端代碼不能應該改變future的狀態,並發框架在future表示的延遲計算結束后會改變期物的狀態,我們無法控制計算何時結束。

這兩種future都有.done()方法,這個方法不阻塞,返回值是布爾值,指明future鏈接的可調用對象是否已經執行。客戶端代碼通常不會詢問future是否運行結束,而是會等待通知。因此兩個Future類都有.add_done_callback()方法,這個方法只有一個參數,類型是可調用的對象,future運行結束后會調用指定的可調用對象。

.result()方法是在兩個Future類中的作用相同:返回可調用對象的結果,或者重新拋出執行可調用的對象時拋出的異常。但是如果future沒有運行結束,result方法在兩個Futrue類中的行為差別非常大。
對concurrent.futures.Future實例來說,調用.result()方法會阻塞調用方所在的線程,直到有結果可返回,此時,result方法可以接收可選的timeout參數,如果在指定的時間內future沒有運行完畢,會拋出TimeoutError異常。
而asyncio.Future.result方法不支持設定超時時間,在獲取future結果最好使用yield from結構,但是concurrent.futures.Future不能這樣做

不管是asyncio還是concurrent.futures.Future都會有幾個函數是返回future,其他函數則是使用future,在最開始的例子中我們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法調用各個future的result方法,因此我們得到的是各個futrue的結果,而不是future本身

關於future.as_completed函數的使用,這里我們用了兩個循環,一個用於創建並排定future,另外一個用於獲取future的結果

 1 from concurrent import futures
 2 
 3 from flags import save_flag, get_flag, show, main
 4 
 5 
 6 MAX_WORKERS = 20
 7 
 8 
 9 def download_one(cc):
10     image = get_flag(cc)
11     show(cc)
12     save_flag(image, cc.lower()+".gif")
13     return cc
14 
15 
16 def download_many(cc_list):
17     cc_list = cc_list[:5]
18     with futures.ThreadPoolExecutor(max_workers=3) as executor:
19         to_do = []
20         for cc in sorted(cc_list):
21             future = executor.submit(download_one,cc)
22             to_do.append(future)
23             msg = "Secheduled for {}:{}"
24             print(msg.format(cc,future))
25 
26         results = []
27         for future in futures.as_completed(to_do):
28             res = future.result()
29             msg = "{}result:{!r}"
30             print(msg.format(future,res))
31             results.append(res)
32 
33     return len(results)
34 
35 
36 if __name__ == '__main__':
37     main(download_many)

結果如下:

注意:Python代碼是無法控制GIL,標准庫中所有執行阻塞型IO操作的函數,在等待操作系統返回結果時都會釋放GIL.運行其他線程執行,也正是因為這樣,Python線程可以在IO密集型應用中發揮作用

以上都是concurrent.futures啟動線程,下面通過它啟動進程

concurrent.futures啟動進程

concurrent.futures中的ProcessPoolExecutor類把工作分配給多個Python進程處理,因此,如果需要做CPU密集型處理,使用這個模塊能繞開GIL,利用所有的CPU核心。
其原理是一個ProcessPoolExecutor創建了N個獨立的Python解釋器,N是系統上面可用的CPU核數。
使用方法和ThreadPoolExecutor方法一樣

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM