micropython学习笔记之uasyncio --- 异步 I/O


先解释下几个名词:

异步:异步是什么意思?这不是一个严格的定义,从下面两个方面来理解:

  1. 异步程序可以在等待其最终结果的同时“暂停”并让其他程序同时运行。
  2. 通过上述机制,异步代码有助于并发执行。换句话说,异步代码表现出了并发的特点。

异步IO:一种与语言无关的范例(模型) ,很多编程语言都有这种实现,它是一种单线程,单进程设计:它使用协作多任务处理,尽管在单个进程中使用单个线程,异步 IO 仍具有并发的感觉。

uasyncio 是MicroPython中用来编写 并发 代码的库,使用 async/await 语法。

uasyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

uasyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

新版本的uasyncio V3预装在V1.13开始的每日固件版本中。

MicroPython语言基于cpython3.4。uasyncio库现在支持cpython3.8异步库的一个子集。有非标准的扩展来优化服务,比如毫秒级定时。它的设计重点是高性能。在没有RAM分配的情况下安排运行。

uasyncio库支持以下功能:

  • async def 与 await 语法
  • 等待类(使用__iter__而不是__await__)
  • 异步上下文管理器
  • 异步迭代器
  • uasyncio.sleep(seconds)
  • 超时 (uasyncio.wait_for)
  • 任务取消(Task.cancel)
  • 集合

它支持毫秒级计时,具有以下功能:

  • uasyncio.sleep_ms(time)

它包括以下与CPython兼容的同步原语:

  • Event
  • Lock
  • gather

Future类不被支持,也不支持event_loop的call_soon、call_later、call_at方法。

 

核心概念

uasyncio里面主要有3个需要关注的基本概念

Event Loop

Event Loop(事件循环)可以说是asyncio应用的核心,是中央总控。Event Loop实例提供了注册、取消和执行任务和回调的方法。

把一些异步函数(就是Task任务,下面会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。

Coroutine

Coroutine(协程)本质上是一个函数,特点是在代码块中可以在到达返回值之前暂停其执行,并且可以将控制权间接传递给另一个协程一段时间:

❯ cat coro1.py
import asyncio

async def a():
    print('Suspending a')
    await asyncio.sleep(0)
    print('Resuming a')

async def b():
    print('In b')

async def main():
    await asyncio.gather(a(), b())

if __name__ == '__main__':
    asyncio.run(main())

这里面有几个重要关键点:

  1. 协程要用async def声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。
  2. asyncio.gather用来并发运行任务,在这里表示协同的执行a和b两个协程
  3. 在协程a中,有一句await asyncio.sleep(0),await表示调用协程,asyncio.sleep()非阻塞延时(而time.sleep()则表示任何耗时的阻塞函数调用),它可将 CPU 的控制权交给下一个协程。这儿的asyncio.sleep(0)并不会真的sleep(因为时间为0),但是却可以把控制权交出去。
  4. asyncio.run是Python 3.7新加的接口,要不然你得这么写:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

好了,我们先运行一下看看:

❯ python coro1.py
Suspending a
In b
Resuming a

看到了吧,在并发执行中,协程a被挂起又恢复过。

 

Async IO 的规则

理解了 async,await 的规则,对掌握异步/等待功能非常重要。关键字 async def 可以定义一个协程函数或一个异步生成器函数。关键字 await 将功能控制传回事件循环。比如:

async def g() :
    # Pause here and come back to g()  when f()  is ready
    r = await f() 
    return r

这里的 await 挂起了本次协程的执行。如果 Python 在 g() 范围内遇到 await f() 表达式,那就意味着,“暂停 g() 的执行,直到等待f() 返回结果。同时,让其他协程运行。”。当然也有一些规则要求什么地方可以使用 async/await 关键字,什么地方不能用:

  • 使用 async def 定义的函数是一个协程,它内部可以使用 await,return,yield,也可以都不用。
    • 使用 await 或 return 创建一个coroutine函数。要调用 coroutine 函数,你必须使用 await 关键字。
    • 很少情况下会在 async def 的代码块中使用 yield ,如果用了,会产生一个异步的生成器。
    • 任何 async def 内都不能使用 yield from,会抛出语法错误。
  • 就像不能在函数外面使用 yield 一样,也不能在 async def 外使用 await。会抛出语法错误。

下面是一些例子:

async def f(x) :
    y = await z(x)   # OK - `await` and `return` allowed in coroutines
    return y

async def g(x) :
    yield x  # OK - this is an async generator

uasyncio 异步I/O调度程序

这个模块实现了相应CPython的模块一个子集,如下所述。有关详细信息,请参阅原始CPython文档:asyncio

例子:

 

import uasyncio async def blink(led, period_ms): while True: led.on() await uasyncio.sleep_ms(5) led.off() await uasyncio.sleep_ms(period_ms) async def main(led1, led2): uasyncio.create_task(blink(led1, 700)) uasyncio.create_task(blink(led2, 400)) await uasyncio.sleep_ms(10_000) # Running on a pyboard from pyb import LED uasyncio.run(main(LED(1), LED(2))) # Running on a generic board from machine import Pin uasyncio.run(main(Pin(1), Pin(2)))

 

  

核心功能

uasyncio.create_task(coro)

从给定的协同程序创建一个新任务并安排它运行。

返回相应的Task对象

uasyncio.current_task()

返回当前与之关联正在运行的任务Task对象。

uasyncio.run(coro)

从给定的协同程序创建一个新任务,并运行它直到它完成。

返回coro返回的值

uasyncio.sleep(t)

休眠t秒(可以是浮

这是一协同程序

uasyncio.sleep_ms(t)

休眠t毫秒

这是一个协同程序,也是一个MicroPython扩展。

附加功能

uasyncio.wait_for(awaitable,timeout)

等待awaitable完成,但如果花费的时间超过timeout秒数,将取消它。如果awaitable不是一个任务,则将从中创建任务。

如果发生超时,则取消任务并引发asyncio.TimeoutError:这应该被调用者捕获。‎任务接收到的asyncio.CancelledError可能被忽略或使用try...excepttry...finally运行清理代码捕获。‎

返回awaitable返回值

这是一个协同程序

uasyncio.wait_for_ms(awaitable,timeout)

类似wait_for但是超时是以毫秒为单位的整数。

这是一个协同程序,也是一个MicroPython扩展。

uasyncio.gather(*awaitables,return_exceptions=False)

并发运行awaitables序列中的可等待对象。任何awaitabls不属于任务的将升级为任务。

如果return_exceptionsFalse(默认),所引发的首个异常会立即传播给等待gather()的任务。awaitables序列中的其他可等待对象不会被取消并将继续运行。

如果return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。

返回所有可等待对象返回值的列表

这是一个协同程序

任务

class uasyncio.Task

此对象将协同程序包装到正在运行的任务中。任务可以等待使用await task,它将等待任务完成并返回任务的返回值。

任务不应该直接创建,而是使用create_task创建它们

Task.cancel()

通过注入CancelledError来取消任务。任务可能会忽略此异常。清理代码可以通过捕获它来运行,也可以通过try ... finally运行。

Event

class uasyncio.Event

创建可用于同步任务的新事件。事件以清除状态开始。

Event.is_set()

如果设置了事件返回True,否则返回False

Event.set()

设置事件。任何等待事件的任务都将被安排运行。

注意:这必须从任务中调用。从IRQ、调度程序回调或其他线程调用它是不安全的。ThreadSafeFlag

Event.clear()

清除事件

Event.wait()

等待事件设置。如果事件已设置,则它会立即返回。

这是一个协同程序

ThreadSafeFlag

class uasyncio.ThreadSafeFlag

创建一个新标志,该标志可用于将任务与异步循环外运行的代码(如其它线程、中断回调调度)同步。标志以清除状态开始。

ThreadSafeFlag.set()

设置标志。如果有一个任务正在等待事件,它将被调度运行。

ThreadSafeFlag.wait()

等待标志被设置。如果该标志已设置,则它将立即返回。

一次只能由单个任务等待标志。

这是一个协同程序

Lock

class uasyncio.Lock

创建可用于协调任务的新锁。锁在解锁状态下启动。

除了下面的方法外,锁还可以用于async with声明

Lock.locked()

如果锁已锁定返回True否则返回False

Lock.acquire()

等待锁处于解锁状态,然后以原子方式将其锁定。一次只能有一个任务获得锁。

这是一个协同程序

Lock.release()

释放锁。如果有任务正在等待锁,那么队列中的下一个任务将被调度运行,并且锁保持锁定状态。否则,没有任务在等待锁解锁。

TCP流连接

uasyncio.open_connection(host,port)

打开给定hostportTCP连接这个host地址将使用socket.getaddrinfo解析,为阻塞调用。

返回一对流对象reader流和writer流。如果无法解析主机或无法建立连接,将引发特定于套接字的OSError

这是一个协同程序

uasyncio.start_server(callback,host,port,backlog=5)

启动给定hostport上的TCP服务当一个新的客户端连接被建立时,回调函数 callback 会被调用。该函数会接收到一对参数 (reader,writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

callback 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task 被调度。

返回Server对象

这是一个协同程序

class uasyncio.Stream

这表示TCP流连接。为了最大限度地减少代码,这个类同时实现了一个读取器和一个编写器,以及这个类的两者别名StreamReader StreamWriter。

Stream.get_extra_info(v)

获取有关流的额外信息,由v给出,v的有效值是:peername 。

Stream.close()

关闭流

Stream.wait_closed()

等待流关闭

这是一个协同程序

Stream.read(n=-1)

读取到 n 个字节并返回它们。如果没有提供 n 或 -1,则读取所有字节,直到 EOF。如果在读取任何字节之前遇到 EOF,则返回的值将是一个空字节对象。

这是一个协同程序

Stream.readinto(buf)

将最多 n 个字节读入 buf,n 等于 buf 的长度。

返回读入缓冲区的字节数。

这是一个协同程序,也是一个 MicroPython 扩展。

Stream.readexactly(n)

正确读取 n 个字节并将它们作为字节对象返回。

如果流在读取 n 个字节之前结束,则引发 EOFError 异常。

这是一个协同程序

Stream.readline()

读一行并返回

这是一个协同程序

Stream.write(buf)

累积的buf到输出缓冲区。仅当调用 ‎‎Stream.drain‎‎ 时,才会刷新数据。建议在调用此函数后立即调用Stream.drain

Stream.drain()

将所有已缓冲的输出数据输出(写入)到流中。

这是一个协同程序

class uasyncio.Server

这表示从‎‎start_server‎‎返回的服务器类。它可以在async with语句中用于在退出时关闭服务器。

Server.close()

关闭服务器

Server.wait_closed()

等待服务器关闭

这是一个协同程序

 

Event Loop事件循环

uasyncio.get_event_loop()

返回用于调度和运行任务的事件循环。Loop

uasyncio.new_event_loop()

重置事件循环并返回它。

注意:由于MicroPython只有一个事件循环,所以这个函数只是重置循环的状态,它不会创建新的事件循环。

class uasyncio.Loop

这表示调度和运行任务的对象。不能被创建,使用get_event_loop替代。

Loop.create_task(coro)

从给定的coro协程创建一个任务并返回新的Task对象

Loop.run_forever()

运行事件循环直到stop()调用。

Loop.run_until_complete(awaitable)

运行给定的awaitable直到它完成。如果awaitable不是任务就不会被提升

Loop.stop()

停止事件循环

Loop.close()

关闭事件循环

当这个函数被调用的时候,循环必须处于非运行状态。pending状态的回调将被丢弃。

此方法清除所有的队列并立即关闭执行器,不会等待执行器完成。 

Loop.set_exception_handler(handler)

handler设置为新的事件循环异常处理程序。

如果handlerNone,将设置默认的异常处理程序。在其他情况下,handler必须是一个可调用对象且签名匹配(loop,context),其中loop是对活动事件循环的引用,而context是一个包含异常详情的字典对象(请查看call_exception_handler()文档来获取关于上下文的更多信息)。

Loop.get_exception_handler()

返回当前异常处理程序如果没有设置异常处理程序则返回None

Loop.default_exception_handler(context)

默认异常处理程序。

此方法会在发生异常且未设置异常处理程序时被调用。此方法也可以由想要具有不同于默认处理程序的行为的自定义异常处理程序来调用。

context 参数和 call_exception_handler() 中的同名参数完全相同。

Loop.call_exception_handler(context)

调用当前事件循环的异常处理程序。参数context是个包含下列键的字典对象'message','exception','future'

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM