【異步】:asyncio


異步asyncio

asyncio是一個使用async / await語法編寫並發代碼的庫

asyncio用作多個Python異步框架的基礎,這些框架提供高性能的網絡和Web服務器,數據庫連接庫,分布式任務隊列等。

asyncio通常非常適合IO綁定和高級 結構化網絡代碼。

 

asyncio提供了一組高級 API:

此外,還有一些用於庫和框架開發人員的低級 API 

 

Conroutines

使用async / await語法聲明的協同程序是編寫asyncio應用程序的首選方法。例如,以下代碼片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

asyncio.run(main())

# hello
# world

上面代碼等同於下面(不推薦使用基於生成器的協同程序的支持,並計划在Python 3.10中刪除。

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

asyncio.run(main())

asyncio實際等同於下面的工作(參數為An asyncio.Future, a coroutine or an awaitable is required)

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())

# hello
# world
 1     This function runs the passed coroutine, taking care of
 2     managing the asyncio event loop and finalizing asynchronous
 3     generators.
 4 
 5     This function cannot be called when another asyncio event loop is
 6     running in the same thread.
 7 
 8     If debug is True, the event loop will be run in debug mode.
 9 
10     This function always creates a new event loop and closes it at the end.
11     It should be used as a main entry point for asyncio programs, and should
12     ideally only be called once.
asyncio.run功能介紹

實際運行協程asyncio提供了三種主要機制:

1、The asyncio.run()函數來運行頂層入口點“main()”函數(見上面的例子)

2、Awaiting on a coroutine 以下代碼片段將在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” 

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 11:54:48
# hello
# world
# finished at 11:54:51

3、asyncio.create_task()與asyncio同時運行協同程序功能Tasks讓我們修改上面的例子並同時運行兩個say_after協同程序 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)

    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24

 稍微改變一下形式,可以理解的更多

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)

    # await task1
    # await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44

  

 Awaitables

我們說如果一個對象可以在表達式中使用,那么它就是一個等待對象await許多asyncio API旨在接受等待。

有三種主要類型的等待對象:coroutinesTasks, and Futures.

Coroutines

Python coroutines are awaitables and therefore can be awaited from other coroutines:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

# 42

重要

在本文檔中,術語“coroutine”可用於兩個密切相關的概念:

  • 一個協程功能:一個功能;async def
  • 一個協程對象:通過調用協同程序函數返回的對象 

Tasks

任務用於調度協同程序並發。

當一個協程被包裝到一個Task中時,會像asyncio.create_task()一樣  conroutine自動安排很快運行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future是一個特殊的低級別等待對象,它表示異步操作最終結果

等待 Future對象時,它意味着協程將等到Future在其他地方解析。

需要asyncio中的未來對象以允許基於回調的代碼與async / await一起使用。

通常,不需要在應用程序級代碼中創建Future對象。

可以等待有時通過庫和一些asyncio API公開的未來對象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回Future對象的低級函數的一個很好的例子是loop.run_in_executor()

 

Asyncio方法

1、運行asyncio程序 

 asyncio.runcoro*debug = False 

  此函數運行傳遞的協同程序,負責管理asyncio事件循環並最終確定異步生成器

  當另一個asyncio事件循環在同一個線程中運行時,無法調用此函數。

  如果debugTrue,則事件循環將以調試模式運行。

  此函數始終創建一個新的事件循環並在結束時將其關閉。它應該用作asyncio程序的主要入口點,理想情況下應該只調用一次。

  版本3.7中的新功能重要:此功能已臨時添加到Python 3.7中的asyncio中

 

2、創建任務 

asyncio.create_task(coro)

  將coro coroutine包裝成a Task 並安排執行。返回Task對象。

  任務在返回的循環中執行,如果當前線程中沒有運行循環get_running_loop(), RuntimeError則引發該任務

  Python 3.7中添加了此功能在Python 3.7之前,asyncio.ensure_future()可以使用低級函數:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

 

3、sleeping 

coroutine asyncio.sleep(delayresult=None*loop=None)

  阻止 delay seconds.

  如果提供了result ,則在協程完成時將其返回給調用者。

  leep() 始終掛起當前任務,允許其他任務運行。

  該loop 參數已被棄用,並定於去除在Python 3.10。

  協程示例每秒顯示當前日期5秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

  

4、同時運行任務

awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

  同時aws 序列中運行 awaitable objects

  如果在aws中任何awaitable 是協程,它將自動安排為任務

  如果所有等待成功完成,則結果是返回值的匯總列表。結果值的順序對應於aws中的等待順序

  如果return_exceptionsFalse(默認),則第一個引發的異常會立即傳播到等待的任務gather()

  如果return_exceptionsTrue,異常的處理方式一樣成功的結果,並在結果列表匯總。

  如果gather()取消,所有提交的awaitables(尚未完成)也被取消

  如果aws序列中的Task or Future取消,則將其視為已引發CancelledError在這種情況下不會取消gather() 呼叫這是為了防止取消一個提交的Tasks/Futures 以導致其他任務/期貨被取消。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

 獲取返回結果,異常情況

import asyncio

async def factorial(name, number):

    print(name)
    if name == 'A':
        return name
    elif name == 'B':
        raise SyntaxError(name)
    await asyncio.sleep(number)


async def main():
    # Schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )
    print(result)

asyncio.run(main())

# A
# B
# C
# ['A', SyntaxError('B'), None]

版本3.7中已更改:如果取消聚集本身,則無論return_exceptions如何,都會傳播取消

 

5、Shielding From Cancellation

awaitable asyncio.shield(aw*loop=None)

  Protect an awaitable object from being cancelled.

  If aw is a coroutine it is automatically scheduled as a Task.

  The statement:

res = await shield(something())

  等同於

res = await something()

  除非取消包含它的協程,否則something()不會取消運行的任務從觀點來看something(),取消沒有發生。雖然它的來電者仍然被取消,所以“等待”表達仍然提出了一個CancelledError

  如果something()通過其他方式取消(即從內部取消)也會取消shield()

  如果希望完全忽略取消(不推薦),則該shield()函數應與try / except子句結合使用,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

 

6、超時

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

  Wait for the aw awaitable to complete with a timeout.

  If aw is a coroutine it is automatically scheduled as a Task.

  timeout可以是None或等待的float或int秒數。如果超時None,將等到完成

  如果發生超時,它將取消任務並加注 asyncio.TimeoutError

  要避免該任務cancellation,請將其包裝shield()

  該函數將一直等到將來實際取消,因此總等待時間可能會超過超時

  如果等待被取消,則未來的aw也會被取消。

  該循環參數已被棄用,並定於去除在Python 3.10。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

改變在3.7版本:AW被取消,由於超時,wait_for等待AW被取消。以前,它asyncio.TimeoutError立即提出 

 

7、超時原語

coroutine asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

  同時運行aws中的等待對象 並阻塞,直到return_when指定的條件為止

  如果在aws中任何等待的是協程,它將自動安排為任務。wait()直接傳遞協同程序對象 已被棄用,因為它會導致 混亂的行為

  返回兩組任務/期貨:(done, pending)

  用法:

done, pending = await asyncio.wait(aws)

  該循環參數已被棄用,並定於去除在Python 3.10。

  timeout(浮點數或整數),如果指定,可用於控制返回前等待的最大秒數。

  請注意,此功能不會引發asyncio.TimeoutError超時發生時未完成的期貨或任務僅在第二組中返回。

  return_when表示此函數何時返回。它必須是以下常量之一:

不變 描述
FIRST_COMPLETED 當任何未來完成或取消時,該函數將返回。
FIRST_EXCEPTION 當任何未來通過引發異常完成時,函數將返回。如果沒有未來引發異常則等同於 ALL_COMPLETED
ALL_COMPLETED 所有期貨結束或取消時,該功能將返回。

  不像wait_for()wait()當發生超時不會取消期貨。   

  注意 wait()將協同程序自動調度為任務,然后在 集合中返回隱式創建的任務對象。因此,以下代碼將無法按預期方式工作:(done, pending)

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

  以下是如何修復上述代碼段:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

  wait()不推薦直接傳遞協程對象。

 

8、 Scheduling From Other Threads

asyncio.run_coroutine_threadsafe(coroloop)

  將協程提交給給定的事件循環。線程安全的。

  返回a concurrent.futures.Future以等待另一個OS線程的結果。

  此函數旨在從與運行事件循環的OS線程不同的OS線程調用。例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

  如果在協程中引發異常,則會通知返回的Future。它還可以用於取消事件循環中的任務:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

  請參閱 文檔並發和多線程部分。

  與其他asyncio函數不同,此函數需要 顯式傳遞循環參數。

  3.5.1版中的新功能。

 

9、自省

asyncio.current_taskloop = None 

  返回當前正在運行的Task實例,或者None沒有正在運行的任務。

  如果loopNone get_running_loop()用來獲取loop。

  版本3.7中的新功能。

asyncio.all_tasksloop = None 

  返回Task循環運行的一組尚未完成的對象。

  如果loopNoneget_running_loop()則用於獲取當前循環。

  版本3.7中的新功能。

 

任務對象

class asyncio.Task(coro,*,loop = None 

Future-like object that runs a Python coroutine. Not thread-safe.

任務用於在事件循環中運行協同程序。如果一個協程在Future上等待,則Task暫停執行協程並等待Future的完成。當Future 完成后,包裝協程的執行將恢復。

事件循環使用協作調度:事件循環一次運行一個任務。當一個Task等待完成Future時,事件循環運行其他任務,回調或執行IO操作。

使用高級asyncio.create_task()功能創建任務,或低級別loop.create_task()或 ensure_future()功能。不鼓勵手動實例化任務。

要取消正在運行的任務,請使用該cancel()方法。調用它將導致Task將CancelledError異常拋出到包裝的協同程序中。如果在取消期間協程正在等待Future對象,則Future對象將被取消。

cancelled()可用於檢查任務是否被取消。True如果包裝的協程沒有抑制CancelledError異常並且實際上被取消,則該方法返回

asyncio.Task繼承自Future其所有API,除了Future.set_result()和 Future.set_exception()

任務支持該contextvars模塊。創建任務時,它會復制當前上下文,然后在復制的上下文中運行其協程。

版本3.7中已更改:添加了對contextvars模塊的支持

cancel

  請求取消任務。

  這會安排CancelledError在事件循環的下一個循環中將異常拋入包裝的協程。

  協程則有機會通過抑制異常與清理,甚至拒絕請求try... ... ... ... ... ... 塊。因此,不同於不保證任務將被取消,盡管完全抑制取消並不常見,並且積極地不鼓勵。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()

  以下示例說明了協同程序如何攔截取消請求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled
   True如果任務被 取消,則返回。
  請求 取消時取消任務,  cancel()並且包裝的協同程序將 CancelledError異常傳播 到其中。
done
   True如果任務 完成則返回。
  一個任務 完成時,包裹協程要么返回的值,引發異常,或者任務被取消。
result
  返回任務的結果。
  如果任務 完成,則返回包裝協程的結果(或者如果協程引發異常,則重新引發該異常。)
  如果已 取消任務,則此方法會引發 CancelledError異常。
  如果Task的結果尚不可用,則此方法會引發 InvalidStateError異常。
exception
  返回Task的例外。
  如果包裝的協同程序引發異常,則返回異常。如果包裝的協程正常返回,則此方法返回 None
  如果已 取消任務,則此方法會引發  CancelledError異常。
  如果尚未 完成任務,則此方法會引發  InvalidStateError異常。
add_done_callback回調*上下文=無
  添加要在任務 完成時運行的回調。
  此方法僅應在基於低級回調的代碼中使用。 
  有關 Future.add_done_callback() 詳細信息,請參閱文檔。
remove_done_callback回調
  從回調列表中刪除 回調
  此方法僅應在基於低級回調的代碼中使用。
  有關 Future.remove_done_callback() 詳細信息,請參閱文檔。
get_stack*limit = None 
  返回此任務的堆棧幀列表。
  如果未完成包裝的協同程序,則會返回掛起它的堆棧。如果協程已成功完成或被取消,則返回一個空列表。如果協程被異常終止,則返回回溯幀列表。
  幀始終從最舊到最新排序。
  對於掛起的協程,只返回一個堆棧幀。
  可選的 limit參數設置要返回的最大幀數; 默認情況下,返回所有可用的幀。返回列表的排序取決於是返回堆棧還是返回:返回堆棧的最新幀,但返回最舊的回溯幀。(這與回溯模塊的行為相匹配。)
print_stack*limit = Nonefile = None 
  打印此任務的堆棧或回溯。
  這會為檢索到的幀生成類似於回溯模塊的輸出 get_stack()
  該 極限參數傳遞給 get_stack()直接。
  的 文件參數是其中輸出被寫入的I / O流; 默認輸出寫入 sys.stderr
classmethod all_tasks(loop = None 
  返回一組事件循環的所有任務。
  默認情況下,返回當前事件循環的所有任務。如果是 loop None,則該 get_event_loop()函數用於獲取當前循環。
   不推薦使用此方法,將在Python 3.9中刪除。請改用此 asyncio.all_tasks()功能。
classmethod current_task(loop = None 
  返回當前正在運行的任務或 None
  如果是 loop None,則該 get_event_loop()函數用於獲取當前循環。
   不推薦使用此方法,將在Python 3.9中刪除。請改用此 asyncio.current_task()功能。

 其他

1、async for 運用

import asyncio


class AsyncIter:
    def __init__(self, items):
        self.items = items

    async def __aiter__(self):
        for item in self.items:
            await asyncio.sleep(1)
            yield item


async def print_iter(things):
    async for item in things:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    things = AsyncIter([1, 2, 3])
    loop.run_until_complete(print_iter(things))
    loop.close()

  

 

 

資料

Python異步IO實現全過程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ

Python異步IO實現全過程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA

Python異步IO實現全過程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM