Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持,他屬於上層的封裝,對於用戶來說,不用在考慮那么多東西了。
官方參考資料:https://pythonhosted.org/futures/
1.Executor
Exectuor是基礎模塊,這是一個抽象類,其子類分為ThreadPoolExecutor和ProcessPoolExecutor,分別被用來創建線程池和進程池。
提供的方法如下:
Executor.submit(fn, *args, **kwargs)
fn:為需要異步執行的函數
args,kwargs:為給函數傳遞的參數
就來看看官網的這個例子:
1
2
3
|
with ThreadPoolExecutor(max_workers
=
1
) as executor:
future
=
executor.submit(
pow
,
323
,
1235
)
print
(future.result())
|
我們使用submit方法來往線程池中加入一個task(pow函數),submit返回一個Future對象。其中future.result()的result方法的作用是拿到調用返回的結果。如果沒有執行完畢就會去等待。這里我們使用with操作符,使得當任務執行完成之后,自動執行shutdown函數,而無需編寫相關釋放代碼。
關於更多future的具體方法說明看后面的future部分解釋。
Executor.map(fn, *args, **kwargs)
map(func, *iterables, timeout=None)
此map函數和python自帶的map函數功能類似,只不過concurrent模塊的map函數從迭代器獲得參數后異步執行。並且,每一個異步操作,能用timeout參數來設置超時時間,timeout的值可以是int或float型,如果操作timeout的話,會raisesTimeoutError。如果timeout參數不指定的話,則不設置超時間。
func:為需要異步執行的函數
iterables:可以是一個能迭代的對象.
timeout:設置每次異步操作的超時時間
1
2
3
4
5
6
7
8
9
|
from
concurrent.futures
import
ThreadPoolExecutor
import
requests
URLS
=
[
'http://www.163.com'
,
'https://www.baidu.com/'
,
'https://github.com/'
]
def
load_url(url):
req
=
requests.get(url, timeout
=
60
)
print
(
'%r page is %d bytes'
%
(url,
len
(req.content)))
executor
=
ThreadPoolExecutor(max_workers
=
3
)
executor.
map
(load_url,URLS)
print
(
'主線程結束'
)
|
submit函數和map函數,根據需要,選一個使用即可。
Executor.shutdown(wait=True)
此函數用於釋放異步執行操作后的系統資源。Executor實現了enter__和__exit使得其對象可以使用with操作符。
在這里可以使用with上下文關鍵字代替,如上面第一個submit的例子。
2.Future對象
submit函數返回future對象,future提供了跟蹤任務執行狀態的方法,Future實例可以被Executor.submit()方法創建。除了測試之外不應該直接創建。
cancel():嘗試去取消調用。如果調用當前正在執行,不能被取消。這個方法將返回False,否則調用將會被取消,方法將返回True
cancelled():如果調用被成功取消返回True
running():如果當前正在被執行不能被取消返回True
done():如果調用被成功取消或者完成running返回True
result(Timeout = None):拿到調用返回的結果。如果沒有執行完畢就會去等待
exception(timeout=None):捕獲程序執行過程中的異常
add_done_callback(fn):將fn綁定到future對象上。當future對象被取消或完成運行時,fn函數將會被調用
3.wait方法
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。
如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/env python
# encoding: utf-8
from
concurrent.futures
import
ThreadPoolExecutor,wait,as_completed
import
requests
URLS
=
[
'http://www.163.com'
,
'https://www.baidu.com/'
,
'https://github.com/'
]
def
load_url(url):
req
=
requests.get(url, timeout
=
60
)
print
(
'%r page is %d bytes'
%
(url,
len
(req.content)))
executor
=
ThreadPoolExecutor(max_workers
=
3
)
f_list
=
[]
for
url
in
URLS:
future
=
executor.submit(load_url,url)
f_list.append(future)
print
(wait(f_list))
print
(
'主線程結束'
)
|
如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from
concurrent.futures
import
ThreadPoolExecutor,wait,as_completed
import
requests
URLS
=
[
'http://www.163.com'
,
'https://www.baidu.com/'
,
'https://github.com/'
]
def
load_url(url):
req
=
requests.get(url, timeout
=
60
)
print
(
'%r page is %d bytes'
%
(url,
len
(req.content)))
executor
=
ThreadPoolExecutor(max_workers
=
3
)
f_list
=
[]
for
url
in
URLS:
future
=
executor.submit(load_url,url)
f_list.append(future)
print
(wait(f_list,return_when
=
'FIRST_COMPLETED'
))
print
(
'主線程結束'
)
|
關於模塊的基本使用就是上面的這些。后續會做一些拓展或者案例。