協程
協程(coroutines)是通過async/await定義函數或方法,是使用asyncio進行異步編程的首選途徑。如下,是一個協程的例子:
1 |
import asyncio |
上例中的 main 方法就是我們定義的協程 。代碼的功能很簡單:

我們在交互環境(Python3.7)下執行以上代碼,看看效果:
1 |
>>> import asyncio |
需要注意的是:如果像執行普通代碼一樣直接調用main(),只會返回一個coroutine對象,main()方法內的代碼不會執行:
1 |
|
實際上,asyncio提供了三種執行協程的機制:
- 使用
asyncio.run()執行協程。一般用於執行最頂層的入口函數,如main()。 await一個協程。一般用於在一個協程中調用另一協程。 如下是一個示例:
1 |
|
執行耗時 3秒
- 用
asyncio.create_task()方法將Coroutine(協程)封裝為Task(任務)。一般用於實現異步並發操作。 需要注意的是,只有在當前線程存在事件循環的時候才能創建任務(Task)。
我們修改以上的例程,並發執行 兩個say_after協程。
1 |
async def main(): |
執行asyncio.run(main()),結果如下:
1 |
started at 17:01:34 |
耗時2秒
“可等待”對象(Awaitables)
如果一個對象能夠被用在await表達式中,那么我們稱這個對象是可等待對象(awaitable object)。很多asyncio API都被設計成了可等待的。
主要有三類可等待對象:
- 協程
coroutine - 任務
Task - 未來對象
Future。
Coroutine(協程)
Python的協程是可等待的(awaitable),因此能夠被其他協程用在await表達式中。
1 |
import asyncio |
重要:在這篇文章中,術語coroutine或協程指代兩個關系緊密的概念:
協程函數(coroutine function):由async def定義的函數;協程對象(coroutine object):調用協程函數返回的對象。
asyncio也支持傳統的基於生成器的協程。
Task(任務)
Task用來 並發的 調度協程。
當一個協程通過類似 asyncio.create_task() 的函數被封裝進一個 Task時,這個協程 會很快被自動調度執行:
1 |
import asyncio |
Future(未來對象)
Future 是一種特殊的 底層 可等待對象,代表一個異步操作的最終結果。
當一個Future對象被await的時候,表示當前的協程會持續等待,直到 Future對象所指向的異步操作執行完畢。
在asyncio中,Future對象能使基於回調的代碼被用於asyn/await表達式中。
一般情況下,在應用層編程中,沒有必要 創建Future對象。
有時候,有些Future對象會被一些庫和asyncio API暴露出來,我們可以await它們:
1 |
async def main(): |
底層函數返回Future對象的一個例子是:loop.run_in_executor
執行asyncio程序
1 |
asyncio.run(coro, * , debug=False) |
這個函數運行coro參數指定的 協程,負責 管理asyncio事件循環 , 終止異步生成器。
在同一個線程中,當已經有asyncio事件循環在執行時,不能調用此函數。
如果debug=True,事件循環將運行在 調試模式。
此函數總是創建一個新的事件循環,並在最后關閉它。建議將它用作asyncio程序的主入口,並且只調用一次。
Python3.7新增
重要:這個函數是在Python3.7被臨時添加到asyncio中的。
創建Task
1 |
asyncio.create_task(coro) |
將coro參數指定的協程(coroutine)封裝到一個Task中,並調度執行。返回值是一個Task對象。
任務在由get_running_loop()返回的事件循環(loop)中執行。如果當前線程中沒有正在運行的事件循環,將會引發RuntimeError異常:
1 |
import asyncio |
因為當前線程中沒有正運行的事件循環,所以引發異常:
1 |
Traceback (most recent call last): |
對以上代碼稍作修改,創建main()方法,在其中創建Task對象,然后在主程序中利用asyncio.run()創建事件循環:
1 |
import asyncio |
執行結果如下:
1 |
<_WindowsSelectorEventLoop running=True closed=False debug=False> |
此函數已經被引入到Python3.7。在Python早期版本中,可以使用底層函數asyncio.ensure_future()代替。
1 |
async def coro(): |
Python3.7新增
Sleeping
coroutine asyncio.sleep(delay,result=None,* ,loop=None)
阻塞delay秒,例如delay=3,則阻塞3秒。
如果指定了result參數的值,則在協程結束時,將該值返回給調用者。sleep()通常只暫停當前task,並不影響其他task的執行。
不建議使用loop參數,因為Python計划在3.10版本中移除它。
以下是一個協程的例子,功能是在5秒鍾內,每秒顯示一次當前的日期:
1 |
import asyncio |
執行結果大致如下:
1 |
2018-11-20 11:27:15.961830 |
並發執行Tasks
awaitable asyncio.gather(* aws, loop=None, return_exceptions=False)
並發執行aws參數指定的 可等待(awaitable)對象序列。
如果 aws 序列中的某個 awaitable 對象 是一個 協程,則自動將這個 協程 封裝為 Task對象進行處理。例如:
1 |
import asyncio |
如果所有的awaitable對象都執行完畢,則返回 awaitable對象執行結果的聚合列表。返回值的順序於aws參數的順序一致。
簡單修改以上代碼:
1 |
import asyncio |
如果return_execptions參數為False(默認值即為False),引發的第一個異常會立即傳播給等待gather()的任務,即調用await asyncio.gather()對象。序列中其他awaitable對象的執行不會受影響。例如:
1 |
import asyncio |
執行結果:
1 |
2/1=2.0 |
如果return_exceptions參數為True,異常會和正常結果一樣,被聚合到結果列表中返回。
對以上代碼稍作修改,將return_exceptions設為True:
1 |
import asyncio |
執行結果如下:
1 |
2/1=2.0 |
如果gather()被取消,則提交的所有awaitable對象(尚未執行完成的)都會被取消。例如:
1 |
import asyncio |
執行結果:
1 |
5/1=5.0 #除已執行的之外,其他的任務全部被取消 |
如果aws中某些Task或Future被取消,gather()調用不會被取消,被取消的Task或Future會以引發CancelledError的方式被處理。這樣可以避免個別awaitable對象的取消操作影響其他awaitable對象的執行。
例如:
1 |
import asyncio |
預期執行結果如下:
1 |
5/1=5.0 |
避免取消
awaitable asyncio.shield(aw, * , loop=None)
防止awaitable對象被取消(cancelled)執行。
如果aw參數是一個協程(coroutines),該對象會被自動封裝為Task對象進行處理。
通常,代碼:
#code 1
res = await shield(something())
同代碼:
#code 2
res = await something()
是等價的。
特殊情況是,如果包含以上代碼的協程被 取消,code 1與code 2的執行效果就完全不同了:
code 1中,運行於something()中的任務 不會被取消。code 2中,運行於something()中的任務 會被取消。
在code 1中,從something()的視角看,取消操作並沒有發生。然而,事實上它的調用者確實被取消了,所以await shield(something())仍然會引發一個CancelledError異常。
1 |
import asyncio |
執行結果:
1 |
Start time:10:38:48 |
如果something()以其他的方式被取消,比如從自身內部取消,那么shield()也會被取消。
如果希望完全忽略取消操作(不推薦這么做),則可以將shield()與try/except結合起來使用:
1 |
try: |
超時(Timeouts)
1 |
coroutine asyncio.wait_for(aw,timeout,*,loop=None) |
在timeout時間之內,等待aw參數指定的awaitable對象執行完畢。
如果aw是一個協程,則會被自動作為Task處理。timeout可以是None也可以是一個float或int類型的數字,表示需要等待的秒數。如果timeout是None,則永不超時,一直阻塞到aw執行完畢。
如果達到timeout時間,將會取消待執行的任務,引發asyncio.TimeoutError.
如果想避免任務被取消,可以將其封裝在shield()中。
程序會等待到任務確實被取消掉,所以等待的總時間會比timeout略大。
如果await_for()被取消,aw也會被取消。loop參數將在Python3.10中刪除,所以不推薦使用。
示例:
1 |
async def eternity(): |
Python3.7新特性:當aw因為超時被取消,wait_for()等到aw確實被取消之后返回異常。在以前的版本中,wait_for會立即返回異常。
等待原語(Waiting Primitives)
wait()
1 |
coroutine asyncio.wait(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED) |
並發執行aws中的awaitable對象,一直阻塞到return_when指定的情況出現。
如果aws中的某些對象是協程(coroutine),則自動轉換為Task對象進行處理。直接將coroutine對象傳遞給wait()會導致令人迷惑的執行結果,所以不建議這么做。
返回值是兩個Task/Future集合:(done,pending)。
用法示例:
1 |
done,pending = await asyncio.wait(aws) |
loop參數將在Python3.10中刪除,所以不建議使用。timeout參數可以是一個int或float類型的值,可以控制最大等待時間。
需要注意的是,wait()不會引發asyncio.TimeoutError錯誤。返回前沒有被執行的Future和Task會被簡單的放入pending集合。return_when決定函數返回的時機。它只能被設置為以下常量:
| Constant | Description |
|---|---|
| FIRST_COMPLETED | The function will return when any future finishes or is cancelled. |
| FIRST_EXCEPTION | The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. |
| ALL_COMPLETED | The function will return when all futures finish or are cancelled. |
與wait_for()不同,wait()不會再超時的時候取消任務。
注意:
因為wait()會自動將協程轉換為Task對象進行處理,然后返回這些隱式創建的Task到(done,pending)集合,所以以下代碼不會如預期的那樣執行。
1 |
async def foo(): |
上面的代碼可以做如下修正:
1 |
async def foo(): |
所以,正如上文所講,不建議將coroutine對象直接傳遞給wait()。
as_completed()
1 |
asyncio.as_completed(aws,*,loop=None,timeout=None) |
並發執行aws中的awaitable對象。返回一個Future對象迭代器。每次迭代時返回的Future對象代表待執行的awaitable對象集合里最早出現的結果。注意:迭代器返回的順序與aws列表的順序無關,只與結果出現的早晚有關。
如果超時之前還有Future對象未完成,則引發asyncio.TimeoutError異常。
用法示例:
1 |
for f in as_completed(aws): |
以下為一個完整的例子:
1 |
import asyncio |
執行結果如下:
1 |
Start at: 17:19:11 |
從其他線程調度執行(Scheduling From Other Threads)
1 |
asyncio.run_coroutine_threadsafe(coro,loop) |
向loop指定的事件循環提交一個由coro指定協程。線程安全。
返回一個concurrent.futures.Future對象,等待另一個線程返回結果。
這個函數用於從當前線程向運行事件循環的線程提交協程(coroutine)。
例如:
1 |
# Create a coroutine |
如果協程出現異常,返回的Future會收到通知。返回的Future也可以被用作取消事件循環中的任務:
1 |
try: |
可以參考並發與多線程章節。
與其他asyncio函數不同,該函數需要 顯式 傳遞loop參數。
新增於Python 3.5.1
自查(Introspection)
current_task()
1 |
asyncio.current_task(loop=None) |
返回事件循環中正在運行的Task實例,如果沒有Task在執行,則返回None。
如果loop為None,則使用get_running_loop()獲取當前事件循環。
新增於Python3.7
all_tasks()
1 |
asyncio.all_tasks(loop=None) |
返回事件循環中尚未運行結束的Task對象集合。
如果loop為None,則,使用get_running_loop()獲取當前事件循環。
新增於Python3.7
Task對象
1 |
class asyncio.Task(coro,*,loop=None) |
類似與Future對象,用於執行Python協程。非線程安全。Tasks用於在事件循環中執行協程。如果協程等待一個Future,那么Task會暫停協程的執行,直到Future執行完成。當Future完成時,協程的執行會恢復。
事件循環的 協作調度 模式:一個事件循環同一時間只執行一個Task。當這個Task等待某個Future返回時,事件循環執行其他的Task、回調或IO操作。
可以通過高層函數asyncio.create_task()創建Task,或者通過底層函數loop.create_task()和ensure_future()創建Task。但是不建議直接實例化Task對象。
如果想要取消一個Task的執行,可以使用cancel()方法。調用cancel()會引起Task對象向被封裝的協程拋出CancelledError異常。當取消行為發生時,如果協程正在等待某個Future對象執行,該Future對象將被取消。
cancelled()方法用於檢查某個Task是否已被取消。如果Task封裝的協程沒有阻止CancelledError異常,且Task確實被取消了,則該方法返回True。
asyncio.Task繼承了Future類中除Future.set_result()和Future.set_exception()以外的所有方法。
Task對象支持contextvars模塊:當一個Task被創建的時候,它會復制當前的上下文,然后在復制的上下文副本中執行協程。
Python3.7中的變更:添加了對contextvars模塊的支持。
cancel()
申請取消任務。
將在下一個事件循環周期中將CancelledError異常拋給封裝在Task中的協程。
收到CancelledError異常后,協程有機會處理異常,甚至以try ...except CancelledError ...finally來拒絕請求。因此,與Future.cancel()不同,Task.cancel()不能保證Task一定被取消掉。當然,拒絕取消請求這種操作並不常見,而且很不提倡。
以下例子可以說明協程如何攔截取消請求:
1 |
import asyncio |
cancelled()
如果Task已經被取消,則返回True。
當取消請求通過cancel()被提交,且Task封裝的協程傳播了拋給它的CancelledError異常,則此Task被取消。
done()
如果Task已完成,則返回True。Task完成有三種情況:
- 封裝的協程已返回
- 封裝的協程已拋出異常
Task被取消
result()
返回Task的執行結果。
如果Task已經完成,則返回Task封裝的協程的執行結果(如果Task封裝的協程引發異常,則重新引發該異常)。
如果Task已經取消,則該方法引發CancelledError異常。
如果Task的結果還不可用,該方法引發InvalidStateError異常。
exception()
返回Task的異常。
如果封裝的協程引發了異常,則返回此異常。如果封裝的協程執行正常,則返回None。
如果Task已被取消,則引發CancelledError異常。
如果Task尚未完成,則引發InvalidStateError異常。
add_done_callback()
添加一個回調函數,在Task完成后執行。
這個方法只應用在基於回調的底層編程中。
具體細節可以參考Future.remove_done_callback()
get_stack(* ,limit=None)
返回此Task的堆棧幀列表。
- 如果封裝的協程未完成,此方法返回它暫停位置的堆棧。
- 如果封裝的協程已經完成或已被取消,此方法返回一個空的列表。
- 如果封裝的協程因異常而結束,此方法返回異常回溯列表。
幀的順序總是 由舊到新。
暫停中的協程只返回一個堆棧幀。
可選參數limit用於限定返回幀的最大數目。默認情況下,所有有效的幀都會返回。
在返回堆棧和返回異常回溯時,列表的順序會有所不同:
- 最新的堆棧幀會被返回
- 最老的回溯幀會被返回(這和異常回溯模塊的機制有關)
print_stack(* ,limit=None,file=None)
打印Task的棧幀或異常回溯。
此方法用於輸出由get_stack()取回的幀列表,輸出形式類似於回溯(traceback)模塊limit參數會直接傳遞給get_stack()。file參數指定輸出的I/O流,默認為sys.stderr。
classmethod all_tasks(loop=None)
返回一個事件循環上所有任務的集合。
默認情況下,當前事件循環上所有的任務都會被返回。如果loop參數為’None’,則通過get_event_loop()方法獲取當前事件循環。
此方法將在Python3.9中被移除,所以不建議使用。可以使用asyncio.all_tasks()代替。
calssmethod current_task(loop=None)
返回當前正在運行的Task或None。
如果loop參數為’None’,則通過get_event_loop()方法獲取當前事件循環。
此方法將在Python3.9中被移除,所以不建議使用。可以使用asyncio.current_task()代替。
基於生成器的協程(Generator-based Coroutines)
提示:對基於生成器的協程的支持將在Python3.10中移除,不建議使用。
基於生成器的協程是早期的異步實現方式,出現在async/await語法之前,使用yield from表達式等待Future或其他協程。
基於生成器的協程應該用`@asyncio.coroutine`來修飾,盡管這不是強制的。
@asyncio.coroutine
基於生成器的協程的修飾器。
這個修飾器能使傳統的基於生成器的協程與async/await語法兼容:
1 |
|
此修飾器將在Python3.10中被移除,所以不建議再使用。
此修飾器不能用於async def的協程中。
asyncio.iscoroutine(obj)
如果obj對象是一個coroutine對象,則返回True。
此方法與inspect.iscoroutine()不同,因為它對基於生成器的協程也返回True。
asyncio.iscoroutinefunction(func)
如果func是一個coroutine方法,則返回True。
此方法inspect.iscoroutinefunction()不同,因為它對用@coroutine修飾的基於生成器的協程也返回True。
