異步asyncio
asyncio是一個使用async / await語法編寫並發代碼的庫。
asyncio用作多個Python異步框架的基礎,這些框架提供高性能的網絡和Web服務器,數據庫連接庫,分布式任務隊列等。
asyncio通常非常適合IO綁定和高級 結構化網絡代碼。
asyncio提供了一組高級 API:
- 同時運行Python協同程序並完全控制它們的執行;
- 執行網絡IO和IPC ;
- 控制子過程 ;
- 通過隊列分配任務;
- 同步並發代碼;
此外,還有一些用於庫和框架開發人員的低級 API :
- 創建和管理事件循環,提供異步API
networking,運行subprocesses,處理等;OS signals - 使用傳輸實現有效的協議 ;
- 使用async / await語法橋接基於回調的庫和代碼。
Conroutines
使用async / await語法聲明的協同程序是編寫asyncio應用程序的首選方法。例如,以下代碼片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
# hello
# world
上面代碼等同於下面(不推薦使用基於生成器的協同程序的支持,並計划在Python 3.10中刪除。)
import asyncio
@asyncio.coroutine
def main():
print('hello')
yield from asyncio.sleep(1)
print('world')
asyncio.run(main())
asyncio實際等同於下面的工作(參數為An asyncio.Future, a coroutine or an awaitable is required)
import asyncio
@asyncio.coroutine
def main():
print('hello')
yield from asyncio.sleep(1)
print('world')
# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())
# hello
# world
1 This function runs the passed coroutine, taking care of 2 managing the asyncio event loop and finalizing asynchronous 3 generators. 4 5 This function cannot be called when another asyncio event loop is 6 running in the same thread. 7 8 If debug is True, the event loop will be run in debug mode. 9 10 This function always creates a new event loop and closes it at the end. 11 It should be used as a main entry point for asyncio programs, and should 12 ideally only be called once.
實際運行協程asyncio提供了三種主要機制:
1、The asyncio.run()函數來運行頂層入口點“main()”函數(見上面的例子)
2、Awaiting on a coroutine 以下代碼片段將在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” :
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
# started at 11:54:48
# hello
# world
# finished at 11:54:51
3、asyncio.create_task()與asyncio同時運行協同程序的功能Tasks;讓我們修改上面的例子並同時運行兩個say_after協同程序 :
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24
稍微改變一下形式,可以理解的更多
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await asyncio.sleep(3)
# await task1
# await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44
Awaitables
我們說如果一個對象可以在表達式中使用,那么它就是一個等待對象await。許多asyncio API旨在接受等待。
有三種主要類型的等待對象:coroutines, Tasks, and Futures.
Coroutines
Python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
# 42
重要
在本文檔中,術語“coroutine”可用於兩個密切相關的概念:
- 一個協程功能:一個功能;
async def - 一個協程對象:通過調用協同程序函數返回的對象 。
Tasks
任務用於調度協同程序並發。
當一個協程被包裝到一個Task中時,會像asyncio.create_task()一樣 conroutine自動安排很快運行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Futures
A Future是一個特殊的低級別等待對象,它表示異步操作的最終結果。
當等待 Future對象時,它意味着協程將等到Future在其他地方解析。
需要asyncio中的未來對象以允許基於回調的代碼與async / await一起使用。
通常,不需要在應用程序級代碼中創建Future對象。
可以等待有時通過庫和一些asyncio API公開的未來對象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
返回Future對象的低級函數的一個很好的例子是loop.run_in_executor()。
Asyncio方法
1、運行asyncio程序
asyncio.run(coro,*,debug = False )
此函數運行傳遞的協同程序,負責管理asyncio事件循環並最終確定異步生成器。
當另一個asyncio事件循環在同一個線程中運行時,無法調用此函數。
如果是debugTrue,則事件循環將以調試模式運行。
此函數始終創建一個新的事件循環並在結束時將其關閉。它應該用作asyncio程序的主要入口點,理想情況下應該只調用一次。
版本3.7中的新功能:重要:此功能已臨時添加到Python 3.7中的asyncio中。
2、創建任務
asyncio.create_task(coro)
將coro coroutine包裝成a Task 並安排執行。返回Task對象。
任務在返回的循環中執行,如果當前線程中沒有運行循環get_running_loop(), RuntimeError則引發該任務。
Python 3.7中添加了此功能。在Python 3.7之前,asyncio.ensure_future()可以使用低級函數:
async def coro():
...
# In Python 3.7+
task = asyncio.create_task(coro())
...
# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
3、sleeping
coroutine asyncio.sleep(delay, result=None, *, loop=None)
阻止 delay seconds.。
如果提供了result ,則在協程完成時將其返回給調用者。
leep() 始終掛起當前任務,允許其他任務運行。
該loop 參數已被棄用,並定於去除在Python 3.10。
協程示例每秒顯示當前日期5秒:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
4、同時運行任務
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
同時在aws 序列中運行 awaitable objects
如果在aws中任何awaitable 是協程,它將自動安排為任務
如果所有等待成功完成,則結果是返回值的匯總列表。結果值的順序對應於aws中的等待順序
如果return_exceptions是False(默認),則第一個引發的異常會立即傳播到等待的任務gather()。
如果return_exceptions是True,異常的處理方式一樣成功的結果,並在結果列表匯總。
如果gather()被取消,所有提交的awaitables(尚未完成)也被取消。
如果aws序列中的Task or Future被取消,則將其視為已引發CancelledError- 在這種情況下不會取消gather() 呼叫。這是為了防止取消一個提交的Tasks/Futures 以導致其他任務/期貨被取消。
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
獲取返回結果,異常情況
import asyncio
async def factorial(name, number):
print(name)
if name == 'A':
return name
elif name == 'B':
raise SyntaxError(name)
await asyncio.sleep(number)
async def main():
# Schedule three calls *concurrently*:
result = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
return_exceptions=True
)
print(result)
asyncio.run(main())
# A
# B
# C
# ['A', SyntaxError('B'), None]
版本3.7中已更改:如果取消了聚集本身,則無論return_exceptions如何,都會傳播取消。
5、Shielding From Cancellation
awaitable asyncio.shield(aw, *, loop=None)
Protect an awaitable object from being cancelled.
If aw is a coroutine it is automatically scheduled as a Task.
The statement:
res = await shield(something())
等同於
res = await something()
除非取消包含它的協程,否則something()不會取消運行的任務。從觀點來看something(),取消沒有發生。雖然它的來電者仍然被取消,所以“等待”表達仍然提出了一個CancelledError。
如果something()通過其他方式取消(即從內部取消)也會取消shield()。
如果希望完全忽略取消(不推薦),則該shield()函數應與try / except子句結合使用,如下所示:
try:
res = await shield(something())
except CancelledError:
res = None
6、超時
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
Wait for the aw awaitable to complete with a timeout.
If aw is a coroutine it is automatically scheduled as a Task.
timeout可以是None或等待的float或int秒數。如果超時是None,將等到完成
如果發生超時,它將取消任務並加注 asyncio.TimeoutError。
要避免該任務cancellation,請將其包裝shield()。
該函數將一直等到將來實際取消,因此總等待時間可能會超過超時。
如果等待被取消,則未來的aw也會被取消。
該循環參數已被棄用,並定於去除在Python 3.10。
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# Expected output:
#
# timeout!
改變在3.7版本:當AW被取消,由於超時,wait_for等待AW被取消。以前,它asyncio.TimeoutError立即提出 。
7、超時原語
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時運行aws中的等待對象 並阻塞,直到return_when指定的條件為止。
如果在aws中任何等待的是協程,它將自動安排為任務。wait()直接傳遞協同程序對象 已被棄用,因為它會導致 混亂的行為。
返回兩組任務/期貨:。(done, pending)
用法:
done, pending = await asyncio.wait(aws)
該循環參數已被棄用,並定於去除在Python 3.10。
timeout(浮點數或整數),如果指定,可用於控制返回前等待的最大秒數。
請注意,此功能不會引發asyncio.TimeoutError。超時發生時未完成的期貨或任務僅在第二組中返回。
return_when表示此函數何時返回。它必須是以下常量之一:
| 不變 | 描述 |
|---|---|
FIRST_COMPLETED |
當任何未來完成或取消時,該函數將返回。 |
FIRST_EXCEPTION |
當任何未來通過引發異常完成時,函數將返回。如果沒有未來引發異常則等同於 ALL_COMPLETED。 |
ALL_COMPLETED |
所有期貨結束或取消時,該功能將返回。 |
不像wait_for(),wait()當發生超時不會取消期貨。
注意 wait()將協同程序自動調度為任務,然后在 集合中返回隱式創建的任務對象。因此,以下代碼將無法按預期方式工作:(done, pending)
async def foo():
return 42
coro = foo()
done, pending = await asyncio.wait({coro})
if coro in done:
# This branch will never be run!
以下是如何修復上述代碼段:
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})
if task in done:
# Everything will work as expected now.
wait()不推薦直接傳遞協程對象。
8、 Scheduling From Other Threads
asyncio.run_coroutine_threadsafe(coro, loop)
將協程提交給給定的事件循環。線程安全的。
返回a concurrent.futures.Future以等待另一個OS線程的結果。
此函數旨在從與運行事件循環的OS線程不同的OS線程調用。例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在協程中引發異常,則會通知返回的Future。它還可以用於取消事件循環中的任務:
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print(f'The coroutine raised an exception: {exc!r}')
else:
print(f'The coroutine returned: {result!r}')
請參閱 文檔的並發和多線程部分。
與其他asyncio函數不同,此函數需要 顯式傳遞循環參數。
3.5.1版中的新功能。
9、自省
asyncio.current_task(loop = None )
返回當前正在運行的Task實例,或者None沒有正在運行的任務。
如果loop是None get_running_loop()用來獲取loop。
版本3.7中的新功能。
asyncio.all_tasks(loop = None )
返回Task循環運行的一組尚未完成的對象。
如果loop是None,get_running_loop()則用於獲取當前循環。
版本3.7中的新功能。
任務對象
class asyncio.Task(coro,*,loop = None )
A Future-like object that runs a Python coroutine. Not thread-safe.
任務用於在事件循環中運行協同程序。如果一個協程在Future上等待,則Task暫停執行協程並等待Future的完成。當Future 完成后,包裝協程的執行將恢復。
事件循環使用協作調度:事件循環一次運行一個任務。當一個Task等待完成Future時,事件循環運行其他任務,回調或執行IO操作。
使用高級asyncio.create_task()功能創建任務,或低級別loop.create_task()或 ensure_future()功能。不鼓勵手動實例化任務。
要取消正在運行的任務,請使用該cancel()方法。調用它將導致Task將CancelledError異常拋出到包裝的協同程序中。如果在取消期間協程正在等待Future對象,則Future對象將被取消。
cancelled()可用於檢查任務是否被取消。True如果包裝的協程沒有抑制CancelledError異常並且實際上被取消,則該方法返回。
asyncio.Task繼承自Future其所有API,除了Future.set_result()和 Future.set_exception()。
任務支持該contextvars模塊。創建任務時,它會復制當前上下文,然后在復制的上下文中運行其協程。
版本3.7中已更改:添加了對contextvars模塊的支持。
cancel()
請求取消任務。
這會安排CancelledError在事件循環的下一個循環中將異常拋入包裝的協程。
協程則有機會通過抑制異常與清理,甚至拒絕請求try... ... ... ... ... ... 塊。因此,不同於,不保證任務將被取消,盡管完全抑制取消並不常見,並且積極地不鼓勵。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()
以下示例說明了協同程序如何攔截取消請求:
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
-
cancelled() -
True如果任務被 取消,則返回。 -
請求
取消時取消任務,
cancel()並且包裝的協同程序將CancelledError異常傳播 到其中。
-
done() -
True如果任務 完成則返回。 - 一個任務 完成時,包裹協程要么返回的值,引發異常,或者任務被取消。
-
result() - 返回任務的結果。
- 如果任務 完成,則返回包裝協程的結果(或者如果協程引發異常,則重新引發該異常。)
-
如果已
取消任務,則此方法會引發
CancelledError異常。 -
如果Task的結果尚不可用,則此方法會引發
InvalidStateError異常。
-
exception() - 返回Task的例外。
-
如果包裝的協同程序引發異常,則返回異常。如果包裝的協程正常返回,則此方法返回
None。 -
如果已
取消任務,則此方法會引發
CancelledError異常。 -
如果尚未
完成任務,則此方法會引發
InvalidStateError異常。
-
add_done_callback(回調,*,上下文=無) - 添加要在任務 完成時運行的回調。
- 此方法僅應在基於低級回調的代碼中使用。
-
有關
Future.add_done_callback()詳細信息,請參閱文檔。
-
remove_done_callback(回調) - 從回調列表中刪除 回調。
- 此方法僅應在基於低級回調的代碼中使用。
-
有關
Future.remove_done_callback()詳細信息,請參閱文檔。
-
get_stack(*,limit = None ) - 返回此任務的堆棧幀列表。
- 如果未完成包裝的協同程序,則會返回掛起它的堆棧。如果協程已成功完成或被取消,則返回一個空列表。如果協程被異常終止,則返回回溯幀列表。
- 幀始終從最舊到最新排序。
- 對於掛起的協程,只返回一個堆棧幀。
- 可選的 limit參數設置要返回的最大幀數; 默認情況下,返回所有可用的幀。返回列表的排序取決於是返回堆棧還是返回:返回堆棧的最新幀,但返回最舊的回溯幀。(這與回溯模塊的行為相匹配。)
-
print_stack(*,limit = None,file = None ) - 打印此任務的堆棧或回溯。
-
這會為檢索到的幀生成類似於回溯模塊的輸出
get_stack()。 -
該
極限參數傳遞給
get_stack()直接。 -
的
文件參數是其中輸出被寫入的I / O流; 默認輸出寫入
sys.stderr。
-
classmethod
all_tasks(loop = None ) - 返回一組事件循環的所有任務。
-
默認情況下,返回當前事件循環的所有任務。如果是
loop
None,則該get_event_loop()函數用於獲取當前循環。 -
不推薦使用此方法,將在Python 3.9中刪除。請改用此
asyncio.all_tasks()功能。
-
classmethod
current_task(loop = None ) -
返回當前正在運行的任務或
None。 -
如果是
loop
None,則該get_event_loop()函數用於獲取當前循環。 -
不推薦使用此方法,將在Python 3.9中刪除。請改用此
asyncio.current_task()功能。
其他
1、async for 運用
import asyncio
class AsyncIter:
def __init__(self, items):
self.items = items
async def __aiter__(self):
for item in self.items:
await asyncio.sleep(1)
yield item
async def print_iter(things):
async for item in things:
print(item)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
things = AsyncIter([1, 2, 3])
loop.run_until_complete(print_iter(things))
loop.close()
資料
Python異步IO實現全過程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ
Python異步IO實現全過程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA
Python異步IO實現全過程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ
