python的並發模塊concurrent


 

Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持,他屬於上層的封裝,對於用戶來說,不用在考慮那么多東西了。

官方參考資料:https://pythonhosted.org/futures/

1.Executor

Exectuor是基礎模塊,這是一個抽象類,其子類分為ThreadPoolExecutor和ProcessPoolExecutor,分別被用來創建線程池和進程池。

提供的方法如下:

Executor.submit(fn, *args, **kwargs)

fn:為需要異步執行的函數
args,kwargs:為給函數傳遞的參數
就來看看官網的這個例子:

?
1
2
3
with ThreadPoolExecutor(max_workers = 1 ) as executor:
     future = executor.submit( pow , 323 , 1235 )
     print (future.result())

  

我們使用submit方法來往線程池中加入一個task(pow函數),submit返回一個Future對象。其中future.result()的result方法的作用是拿到調用返回的結果。如果沒有執行完畢就會去等待。這里我們使用with操作符,使得當任務執行完成之后,自動執行shutdown函數,而無需編寫相關釋放代碼。
關於更多future的具體方法說明看后面的future部分解釋。

Executor.map(fn, *args, **kwargs)

map(func, *iterables, timeout=None)
此map函數和python自帶的map函數功能類似,只不過concurrent模塊的map函數從迭代器獲得參數后異步執行。並且,每一個異步操作,能用timeout參數來設置超時時間,timeout的值可以是int或float型,如果操作timeout的話,會raisesTimeoutError。如果timeout參數不指定的話,則不設置超時間。

func:為需要異步執行的函數
iterables:可以是一個能迭代的對象.
timeout:設置每次異步操作的超時時間

?
1
2
3
4
5
6
7
8
9
from concurrent.futures import ThreadPoolExecutor
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
         req = requests.get(url, timeout = 60 )
         print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
executor. map (load_url,URLS)
print ( '主線程結束' )

  

submit函數和map函數,根據需要,選一個使用即可。

Executor.shutdown(wait=True)

此函數用於釋放異步執行操作后的系統資源。Executor實現了enter__和__exit使得其對象可以使用with操作符。
在這里可以使用with上下文關鍵字代替,如上面第一個submit的例子。

2.Future對象

submit函數返回future對象,future提供了跟蹤任務執行狀態的方法,Future實例可以被Executor.submit()方法創建。除了測試之外不應該直接創建。

cancel():嘗試去取消調用。如果調用當前正在執行,不能被取消。這個方法將返回False,否則調用將會被取消,方法將返回True

cancelled():如果調用被成功取消返回True

running():如果當前正在被執行不能被取消返回True

done():如果調用被成功取消或者完成running返回True

result(Timeout = None):拿到調用返回的結果。如果沒有執行完畢就會去等待

exception(timeout=None):捕獲程序執行過程中的異常

add_done_callback(fn):將fn綁定到future對象上。當future對象被取消或完成運行時,fn函數將會被調用

3.wait方法

 wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。

  如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python 
# encoding: utf-8 
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
     req = requests.get(url, timeout = 60 )
     print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
f_list = []
for url in URLS:
     future = executor.submit(load_url,url)
     f_list.append(future)
print (wait(f_list))
print ( '主線程結束' )

  

如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
     req = requests.get(url, timeout = 60 )
     print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
f_list = []
for url in URLS:
     future = executor.submit(load_url,url)
     f_list.append(future)
print (wait(f_list,return_when = 'FIRST_COMPLETED' ))
print ( '主線程結束' )

  

關於模塊的基本使用就是上面的這些。后續會做一些拓展或者案例。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM