協程
協程(coroutines)是通過async/await
定義函數或方法,是使用asyncio進行異步編程的首選途徑。如下,是一個協程的例子:
1 |
import asyncio |
上例中的 main
方法就是我們定義的協程
。代碼的功能很簡單:
我們在交互環境(Python3.7)下執行以上代碼,看看效果:
1 |
>>> import asyncio |
需要注意的是:如果像執行普通代碼一樣直接調用main()
,只會返回一個coroutine
對象,main()
方法內的代碼不會執行:
1 |
|
實際上,asyncio提供了三種執行協程
的機制:
- 使用
asyncio.run()
執行協程。一般用於執行最頂層的入口函數,如main()
。 await
一個協程
。一般用於在一個協程
中調用另一協程
。 如下是一個示例:
1 |
|
執行耗時 3秒
- 用
asyncio.create_task()
方法將Coroutine(協程)
封裝為Task(任務)
。一般用於實現異步並發操作。 需要注意的是,只有在當前線程存在事件循環的時候才能創建任務(Task)。
我們修改以上的例程,並發執行 兩個say_after
協程。
1 |
async def main(): |
執行asyncio.run(main())
,結果如下:
1 |
started at 17:01:34 |
耗時2秒
“可等待”對象(Awaitables)
如果一個對象能夠被用在await
表達式中,那么我們稱這個對象是可等待對象(awaitable object)
。很多asyncio API
都被設計成了可等待的
。
主要有三類可等待對象:
- 協程
coroutine
- 任務
Task
- 未來對象
Future
。
Coroutine(協程)
Python的協程
是可等待的(awaitable)
,因此能夠被其他協程
用在await
表達式中。
1 |
import asyncio |
重要:在這篇文章中,術語coroutine
或協程
指代兩個關系緊密的概念:
協程函數(coroutine function)
:由async def
定義的函數;協程對象(coroutine object)
:調用協程函數
返回的對象。
asyncio也支持傳統的基於生成器的協程。
Task(任務)
Task
用來 並發的 調度協程。
當一個協程
通過類似 asyncio.create_task()
的函數被封裝進一個 Task
時,這個協程
會很快被自動調度執行:
1 |
import asyncio |
Future(未來對象)
Future
是一種特殊的 底層 可等待對象,代表一個異步操作的最終結果。
當一個Future
對象被await
的時候,表示當前的協程會持續等待,直到 Future
對象所指向的異步操作執行完畢。
在asyncio中,Future
對象能使基於回調的代碼被用於asyn/await
表達式中。
一般情況下,在應用層編程中,沒有必要 創建Future
對象。
有時候,有些Future
對象會被一些庫和asyncio API暴露出來,我們可以await
它們:
1 |
async def main(): |
底層函數返回Future
對象的一個例子是:loop.run_in_executor
執行asyncio程序
1 |
asyncio.run(coro, * , debug=False) |
這個函數運行coro
參數指定的 協程
,負責 管理asyncio事件循環 , 終止異步生成器。
在同一個線程中,當已經有asyncio事件循環在執行時,不能調用此函數。
如果debug=True
,事件循環將運行在 調試模式。
此函數總是創建一個新的事件循環,並在最后關閉它。建議將它用作asyncio程序的主入口,並且只調用一次。
Python3.7新增
重要:這個函數是在Python3.7被臨時添加到asyncio中的。
創建Task
1 |
asyncio.create_task(coro) |
將coro
參數指定的協程(coroutine)
封裝到一個Task
中,並調度執行。返回值是一個Task
對象。
任務在由get_running_loop()
返回的事件循環(loop)中執行。如果當前線程中沒有正在運行的事件循環,將會引發RuntimeError
異常:
1 |
import asyncio |
因為當前線程中沒有正運行的事件循環,所以引發異常:
1 |
Traceback (most recent call last): |
對以上代碼稍作修改,創建main()
方法,在其中創建Task
對象,然后在主程序中利用asyncio.run()
創建事件循環
:
1 |
import asyncio |
執行結果如下:
1 |
<_WindowsSelectorEventLoop running=True closed=False debug=False> |
此函數已經被引入到Python3.7。在Python早期版本中,可以使用底層函數asyncio.ensure_future()
代替。
1 |
async def coro(): |
Python3.7新增
Sleeping
coroutine asyncio.sleep(delay,result=None,* ,loop=None)
阻塞delay
秒,例如delay=3
,則阻塞3秒。
如果指定了result
參數的值
,則在協程結束時,將該值
返回給調用者。sleep()
通常只暫停當前task
,並不影響其他task
的執行。
不建議使用loop
參數,因為Python計划在3.10
版本中移除它。
以下是一個協程的例子,功能是在5秒鍾內,每秒顯示一次當前的日期
:
1 |
import asyncio |
執行結果大致如下:
1 |
2018-11-20 11:27:15.961830 |
並發執行Tasks
awaitable asyncio.gather(* aws, loop=None, return_exceptions=False)
並發執行aws
參數指定的 可等待(awaitable)對象
序列。
如果 aws
序列中的某個 awaitable 對象
是一個 協程
,則自動將這個 協程
封裝為 Task
對象進行處理。例如:
1 |
import asyncio |
如果所有的awaitable
對象都執行完畢,則返回 awaitable對象執行結果的聚合列表。返回值的順序於aws
參數的順序一致。
簡單修改以上代碼:
1 |
import asyncio |
如果return_execptions
參數為False
(默認值即為False
),引發的第一個異常會立即傳播給等待gather()
的任務,即調用await asyncio.gather()
對象。序列中其他awaitable
對象的執行不會受影響。例如:
1 |
import asyncio |
執行結果:
1 |
2/1=2.0 |
如果return_exceptions
參數為True
,異常會和正常結果一樣,被聚合到結果列表中返回。
對以上代碼稍作修改,將return_exceptions
設為True
:
1 |
import asyncio |
執行結果如下:
1 |
2/1=2.0 |
如果gather()
被取消,則提交的所有awaitable
對象(尚未執行完成的)都會被取消。例如:
1 |
import asyncio |
執行結果:
1 |
5/1=5.0 #除已執行的之外,其他的任務全部被取消 |
如果aws
中某些Task
或Future
被取消,gather()
調用不會被取消,被取消的Task
或Future
會以引發CancelledError
的方式被處理。這樣可以避免個別awaitable
對象的取消操作影響其他awaitable
對象的執行。
例如:
1 |
import asyncio |
預期執行結果如下:
1 |
5/1=5.0 |
避免取消
awaitable asyncio.shield(aw, * , loop=None)
防止awaitable
對象被取消(cancelled)執行。
如果aw
參數是一個協程(coroutines)
,該對象會被自動封裝為Task
對象進行處理。
通常,代碼:
#code 1
res = await shield(something())
同代碼:
#code 2
res = await something()
是等價的。
特殊情況是,如果包含以上代碼的協程
被 取消,code 1
與code 2
的執行效果就完全不同了:
code 1
中,運行於something()
中的任務 不會被取消。code 2
中,運行於something()
中的任務 會被取消。
在code 1
中,從something()
的視角看,取消操作並沒有發生。然而,事實上它的調用者確實被取消了,所以await shield(something())
仍然會引發一個CancelledError
異常。
1 |
import asyncio |
執行結果:
1 |
Start time:10:38:48 |
如果something()
以其他的方式被取消,比如從自身內部取消,那么shield()
也會被取消。
如果希望完全忽略取消操作
(不推薦這么做),則可以將shield()
與try/except
結合起來使用:
1 |
try: |
超時(Timeouts)
1 |
coroutine asyncio.wait_for(aw,timeout,*,loop=None) |
在timeout
時間之內,等待aw
參數指定的awaitable
對象執行完畢。
如果aw
是一個協程,則會被自動作為Task
處理。timeout
可以是None
也可以是一個float
或int
類型的數字,表示需要等待的秒數。如果timeout
是None
,則永不超時,一直阻塞到aw
執行完畢。
如果達到timeout
時間,將會取消待執行的任務,引發asyncio.TimeoutError
.
如果想避免任務被取消,可以將其封裝在shield()
中。
程序會等待到任務確實被取消掉,所以等待的總時間會比timeout
略大。
如果await_for()
被取消,aw
也會被取消。loop
參數將在Python3.10中刪除,所以不推薦使用。
示例:
1 |
async def eternity(): |
Python3.7新特性:當aw
因為超時被取消,wait_for()
等到aw
確實被取消之后返回異常。在以前的版本中,wait_for
會立即返回異常。
等待原語(Waiting Primitives)
wait()
1 |
coroutine asyncio.wait(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED) |
並發執行aws
中的awaitable
對象,一直阻塞到return_when
指定的情況出現。
如果aws
中的某些對象是協程(coroutine)
,則自動轉換為Task
對象進行處理。直接將coroutine
對象傳遞給wait()
會導致令人迷惑的執行結果,所以不建議這么做。
返回值是兩個Task/Future
集合:(done,pending
)。
用法示例:
1 |
done,pending = await asyncio.wait(aws) |
loop
參數將在Python3.10中刪除,所以不建議使用。timeout
參數可以是一個int
或float
類型的值,可以控制最大等待時間。
需要注意的是,wait()
不會引發asyncio.TimeoutError
錯誤。返回前沒有被執行的Future
和Task
會被簡單的放入pending
集合。return_when
決定函數返回的時機。它只能被設置為以下常量:
Constant | Description |
---|---|
FIRST_COMPLETED | The function will return when any future finishes or is cancelled. |
FIRST_EXCEPTION | The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. |
ALL_COMPLETED | The function will return when all futures finish or are cancelled. |
與wait_for()
不同,wait()
不會再超時的時候取消任務。
注意:
因為wait()
會自動將協程
轉換為Task對象
進行處理,然后返回這些隱式創建的Task
到(done,pending)集合,所以以下代碼不會如預期的那樣執行。
1 |
async def foo(): |
上面的代碼可以做如下修正:
1 |
async def foo(): |
所以,正如上文所講,不建議將coroutine
對象直接傳遞給wait()
。
as_completed()
1 |
asyncio.as_completed(aws,*,loop=None,timeout=None) |
並發執行aws
中的awaitable
對象。返回一個Future
對象迭代器。每次迭代時返回的Future
對象代表待執行的awaitable
對象集合里最早出現的結果。注意:迭代器返回的順序與aws
列表的順序無關,只與結果出現的早晚有關。
如果超時之前還有Future
對象未完成,則引發asyncio.TimeoutError
異常。
用法示例:
1 |
for f in as_completed(aws): |
以下為一個完整的例子:
1 |
import asyncio |
執行結果如下:
1 |
Start at: 17:19:11 |
從其他線程調度執行(Scheduling From Other Threads)
1 |
asyncio.run_coroutine_threadsafe(coro,loop) |
向loop
指定的事件循環提交一個由coro
指定協程。線程安全。
返回一個concurrent.futures.Future
對象,等待另一個線程返回結果。
這個函數用於從當前線程
向運行事件循環的線程
提交協程(coroutine)
。
例如:
1 |
# Create a coroutine |
如果協程
出現異常,返回的Future
會收到通知。返回的Future
也可以被用作取消事件循環中的任務:
1 |
try: |
可以參考並發與多線程章節。
與其他asyncio函數不同,該函數需要 顯式 傳遞loop
參數。
新增於Python 3.5.1
自查(Introspection)
current_task()
1 |
asyncio.current_task(loop=None) |
返回事件循環中正在運行的Task
實例,如果沒有Task
在執行,則返回None
。
如果loop
為None
,則使用get_running_loop()
獲取當前事件循環。
新增於Python3.7
all_tasks()
1 |
asyncio.all_tasks(loop=None) |
返回事件循環中尚未運行結束的Task
對象集合。
如果loop
為None
,則,使用get_running_loop()
獲取當前事件循環。
新增於Python3.7
Task對象
1 |
class asyncio.Task(coro,*,loop=None) |
類似與Future
對象,用於執行Python協程。非線程安全。Tasks
用於在事件循環
中執行協程
。如果協程
等待一個Future
,那么Task
會暫停協程
的執行,直到Future
執行完成。當Future
完成時,協程
的執行會恢復。
事件循環的 協作調度 模式:一個事件循環同一時間只執行一個Task
。當這個Task
等待某個Future
返回時,事件循環執行其他的Task
、回調
或IO操作
。
可以通過高層函數asyncio.create_task()
創建Task
,或者通過底層函數loop.create_task()
和ensure_future()
創建Task
。但是不建議直接實例化Task
對象。
如果想要取消一個Task
的執行,可以使用cancel()
方法。調用cancel()
會引起Task
對象向被封裝的協程
拋出CancelledError
異常。當取消行為發生時,如果協程
正在等待某個Future
對象執行,該Future
對象將被取消。
cancelled()
方法用於檢查某個Task
是否已被取消。如果Task
封裝的協程
沒有阻止CancelledError
異常,且Task
確實被取消了,則該方法返回True
。
asyncio.Task
繼承了Future
類中除Future.set_result()
和Future.set_exception()
以外的所有方法。
Task
對象支持contextvars
模塊:當一個Task
被創建的時候,它會復制當前的上下文,然后在復制的上下文副本中執行協程。
Python3.7中的變更:添加了對contextvars
模塊的支持。
cancel()
申請取消任務。
將在下一個事件循環周期中將CancelledError
異常拋給封裝在Task
中的協程。
收到CancelledError
異常后,協程
有機會處理異常,甚至以try ...except CancelledError ...finally
來拒絕請求。因此,與Future.cancel()
不同,Task.cancel()
不能保證Task
一定被取消掉。當然,拒絕取消請求這種操作並不常見,而且很不提倡。
以下例子可以說明協程如何攔截取消請求:
1 |
import asyncio |
cancelled()
如果Task
已經被取消,則返回True
。
當取消請求通過cancel()
被提交,且Task
封裝的協程
傳播了拋給它的CancelledError
異常,則此Task
被取消。
done()
如果Task
已完成,則返回True
。Task
完成有三種情況:
- 封裝的協程已返回
- 封裝的協程已拋出異常
Task
被取消
result()
返回Task
的執行結果。
如果Task
已經完成,則返回Task
封裝的協程的執行結果(如果Task
封裝的協程引發異常,則重新引發該異常)。
如果Task
已經取消,則該方法引發CancelledError
異常。
如果Task
的結果還不可用,該方法引發InvalidStateError
異常。
exception()
返回Task
的異常。
如果封裝的協程引發了異常,則返回此異常。如果封裝的協程執行正常,則返回None
。
如果Task
已被取消,則引發CancelledError
異常。
如果Task
尚未完成,則引發InvalidStateError
異常。
add_done_callback()
添加一個回調函數,在Task
完成后執行。
這個方法只應用在基於回調的底層編程中。
具體細節可以參考Future.remove_done_callback()
get_stack(* ,limit=None)
返回此Task
的堆棧幀列表。
- 如果封裝的協程未完成,此方法返回它暫停位置的堆棧。
- 如果封裝的協程已經完成或已被取消,此方法返回一個空的列表。
- 如果封裝的協程因異常而結束,此方法返回異常回溯列表。
幀的順序總是 由舊到新。
暫停中的協程只返回一個堆棧幀。
可選參數limit
用於限定返回幀的最大數目。默認情況下,所有有效的幀都會返回。
在返回堆棧和返回異常回溯時,列表的順序會有所不同:
- 最新的堆棧幀會被返回
- 最老的回溯幀會被返回(這和異常回溯模塊的機制有關)
print_stack(* ,limit=None,file=None)
打印Task
的棧幀或異常回溯。
此方法用於輸出由get_stack()
取回的幀列表,輸出形式類似於回溯(traceback)模塊
limit
參數會直接傳遞給get_stack()
。file
參數指定輸出的I/O流,默認為sys.stderr
。
classmethod all_tasks(loop=None)
返回一個事件循環上所有任務的集合。
默認情況下,當前事件循環上所有的任務都會被返回。如果loop
參數為’None’,則通過get_event_loop()
方法獲取當前事件循環。
此方法將在Python3.9中被移除,所以不建議使用。可以使用asyncio.all_tasks()
代替。
calssmethod current_task(loop=None)
返回當前正在運行的Task
或None
。
如果loop
參數為’None’,則通過get_event_loop()
方法獲取當前事件循環。
此方法將在Python3.9中被移除,所以不建議使用。可以使用asyncio.current_task()
代替。
基於生成器的協程(Generator-based Coroutines)
提示:對基於生成器的協程的支持將在Python3.10中移除,不建議使用。
基於生成器的協程是早期的異步實現方式,出現在async/await
語法之前,使用yield from
表達式等待Future
或其他協程。
基於生成器的協程應該用`@asyncio.coroutine`來修飾,盡管這不是強制的。
@asyncio.coroutine
基於生成器的協程的修飾器。
這個修飾器能使傳統的基於生成器的協程
與async/await
語法兼容:
1 |
|
此修飾器將在Python3.10中被移除,所以不建議再使用。
此修飾器不能用於async def
的協程中。
asyncio.iscoroutine(obj)
如果obj
對象是一個coroutine
對象,則返回True
。
此方法與inspect.iscoroutine()
不同,因為它對基於生成器的協程也返回True
。
asyncio.iscoroutinefunction(func)
如果func
是一個coroutine
方法,則返回True
。
此方法inspect.iscoroutinefunction()
不同,因為它對用@coroutine
修飾的基於生成器的協程也返回True
。