micropython學習筆記之uasyncio --- 異步 I/O


先解釋下幾個名詞:

異步:異步是什么意思?這不是一個嚴格的定義,從下面兩個方面來理解:

  1. 異步程序可以在等待其最終結果的同時“暫停”並讓其他程序同時運行。
  2. 通過上述機制,異步代碼有助於並發執行。換句話說,異步代碼表現出了並發的特點。

異步IO:一種與語言無關的范例(模型) ,很多編程語言都有這種實現,它是一種單線程,單進程設計:它使用協作多任務處理,盡管在單個進程中使用單個線程,異步 IO 仍具有並發的感覺。

uasyncio 是MicroPython中用來編寫 並發 代碼的庫,使用 async/await 語法。

uasyncio 被用作多個提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫連接庫,分布式任務隊列等等。

uasyncio 往往是構建 IO 密集型和高層級 結構化 網絡代碼的最佳選擇。

新版本的uasyncio V3預裝在V1.13開始的每日固件版本中。

MicroPython語言基於cpython3.4。uasyncio庫現在支持cpython3.8異步庫的一個子集。有非標准的擴展來優化服務,比如毫秒級定時。它的設計重點是高性能。在沒有RAM分配的情況下安排運行。

uasyncio庫支持以下功能:

  • async def 與 await 語法
  • 等待類(使用__iter__而不是__await__)
  • 異步上下文管理器
  • 異步迭代器
  • uasyncio.sleep(seconds)
  • 超時 (uasyncio.wait_for)
  • 任務取消(Task.cancel)
  • 集合

它支持毫秒級計時,具有以下功能:

  • uasyncio.sleep_ms(time)

它包括以下與CPython兼容的同步原語:

  • Event
  • Lock
  • gather

Future類不被支持,也不支持event_loop的call_soon、call_later、call_at方法。

 

核心概念

uasyncio里面主要有3個需要關注的基本概念

Event Loop

Event Loop(事件循環)可以說是asyncio應用的核心,是中央總控。Event Loop實例提供了注冊、取消和執行任務和回調的方法。

把一些異步函數(就是Task任務,下面會說到)注冊到這個事件循環上,事件循環會循環執行這些函數(但同時只能執行一個),當執行到某個函數時,如果它正在等待I/O返回,事件循環會暫停它的執行去執行其他的函數;當某個函數完成I/O后會恢復,下次循環到它的時候繼續執行。因此,這些異步函數可以協同(Cooperative)運行:這就是事件循環的目標。

Coroutine

Coroutine(協程)本質上是一個函數,特點是在代碼塊中可以在到達返回值之前暫停其執行,並且可以將控制權間接傳遞給另一個協程一段時間:

❯ cat coro1.py
import asyncio

async def a():
    print('Suspending a')
    await asyncio.sleep(0)
    print('Resuming a')

async def b():
    print('In b')

async def main():
    await asyncio.gather(a(), b())

if __name__ == '__main__':
    asyncio.run(main())

這里面有幾個重要關鍵點:

  1. 協程要用async def聲明,Python 3.5時的裝飾器寫法已經過時,我就不列出來了。
  2. asyncio.gather用來並發運行任務,在這里表示協同的執行a和b兩個協程
  3. 在協程a中,有一句await asyncio.sleep(0),await表示調用協程,asyncio.sleep()非阻塞延時(而time.sleep()則表示任何耗時的阻塞函數調用),它可將 CPU 的控制權交給下一個協程。這兒的asyncio.sleep(0)並不會真的sleep(因為時間為0),但是卻可以把控制權交出去。
  4. asyncio.run是Python 3.7新加的接口,要不然你得這么寫:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

好了,我們先運行一下看看:

❯ python coro1.py
Suspending a
In b
Resuming a

看到了吧,在並發執行中,協程a被掛起又恢復過。

 

Async IO 的規則

理解了 async,await 的規則,對掌握異步/等待功能非常重要。關鍵字 async def 可以定義一個協程函數或一個異步生成器函數。關鍵字 await 將功能控制傳回事件循環。比如:

async def g() :
    # Pause here and come back to g()  when f()  is ready
    r = await f() 
    return r

這里的 await 掛起了本次協程的執行。如果 Python 在 g() 范圍內遇到 await f() 表達式,那就意味着,“暫停 g() 的執行,直到等待f() 返回結果。同時,讓其他協程運行。”。當然也有一些規則要求什么地方可以使用 async/await 關鍵字,什么地方不能用:

  • 使用 async def 定義的函數是一個協程,它內部可以使用 await,return,yield,也可以都不用。
    • 使用 await 或 return 創建一個coroutine函數。要調用 coroutine 函數,你必須使用 await 關鍵字。
    • 很少情況下會在 async def 的代碼塊中使用 yield ,如果用了,會產生一個異步的生成器。
    • 任何 async def 內都不能使用 yield from,會拋出語法錯誤。
  • 就像不能在函數外面使用 yield 一樣,也不能在 async def 外使用 await。會拋出語法錯誤。

下面是一些例子:

async def f(x) :
    y = await z(x)   # OK - `await` and `return` allowed in coroutines
    return y

async def g(x) :
    yield x  # OK - this is an async generator

uasyncio 異步I/O調度程序

這個模塊實現了相應CPython的模塊一個子集,如下所述。有關詳細信息,請參閱原始CPython文檔:asyncio

例子:

 

import uasyncio async def blink(led, period_ms): while True: led.on() await uasyncio.sleep_ms(5) led.off() await uasyncio.sleep_ms(period_ms) async def main(led1, led2): uasyncio.create_task(blink(led1, 700)) uasyncio.create_task(blink(led2, 400)) await uasyncio.sleep_ms(10_000) # Running on a pyboard from pyb import LED uasyncio.run(main(LED(1), LED(2))) # Running on a generic board from machine import Pin uasyncio.run(main(Pin(1), Pin(2)))

 

  

核心功能

uasyncio.create_task(coro)

從給定的協同程序創建一個新任務並安排它運行。

返回相應的Task對象

uasyncio.current_task()

返回當前與之關聯正在運行的任務Task對象。

uasyncio.run(coro)

從給定的協同程序創建一個新任務,並運行它直到它完成。

返回coro返回的值

uasyncio.sleep(t)

休眠t秒(可以是浮

這是一協同程序

uasyncio.sleep_ms(t)

休眠t毫秒

這是一個協同程序,也是一個MicroPython擴展。

附加功能

uasyncio.wait_for(awaitable,timeout)

等待awaitable完成,但如果花費的時間超過timeout秒數,將取消它。如果awaitable不是一個任務,則將從中創建任務。

如果發生超時,則取消任務並引發asyncio.TimeoutError:這應該被調用者捕獲。‎任務接收到的asyncio.CancelledError可能被忽略或使用try...excepttry...finally運行清理代碼捕獲。‎

返回awaitable返回值

這是一個協同程序

uasyncio.wait_for_ms(awaitable,timeout)

類似wait_for但是超時是以毫秒為單位的整數。

這是一個協同程序,也是一個MicroPython擴展。

uasyncio.gather(*awaitables,return_exceptions=False)

並發運行awaitables序列中的可等待對象。任何awaitabls不屬於任務的將升級為任務。

如果return_exceptionsFalse(默認),所引發的首個異常會立即傳播給等待gather()的任務。awaitables序列中的其他可等待對象不會被取消並將繼續運行。

如果return_exceptionsTrue,異常會和成功的結果一樣處理,並聚合至結果列表。

返回所有可等待對象返回值的列表

這是一個協同程序

任務

class uasyncio.Task

此對象將協同程序包裝到正在運行的任務中。任務可以等待使用await task,它將等待任務完成並返回任務的返回值。

任務不應該直接創建,而是使用create_task創建它們

Task.cancel()

通過注入CancelledError來取消任務。任務可能會忽略此異常。清理代碼可以通過捕獲它來運行,也可以通過try ... finally運行。

Event

class uasyncio.Event

創建可用於同步任務的新事件。事件以清除狀態開始。

Event.is_set()

如果設置了事件返回True,否則返回False

Event.set()

設置事件。任何等待事件的任務都將被安排運行。

注意:這必須從任務中調用。從IRQ、調度程序回調或其他線程調用它是不安全的。ThreadSafeFlag

Event.clear()

清除事件

Event.wait()

等待事件設置。如果事件已設置,則它會立即返回。

這是一個協同程序

ThreadSafeFlag

class uasyncio.ThreadSafeFlag

創建一個新標志,該標志可用於將任務與異步循環外運行的代碼(如其它線程、中斷回調調度)同步。標志以清除狀態開始。

ThreadSafeFlag.set()

設置標志。如果有一個任務正在等待事件,它將被調度運行。

ThreadSafeFlag.wait()

等待標志被設置。如果該標志已設置,則它將立即返回。

一次只能由單個任務等待標志。

這是一個協同程序

Lock

class uasyncio.Lock

創建可用於協調任務的新鎖。鎖在解鎖狀態下啟動。

除了下面的方法外,鎖還可以用於async with聲明

Lock.locked()

如果鎖已鎖定返回True否則返回False

Lock.acquire()

等待鎖處於解鎖狀態,然后以原子方式將其鎖定。一次只能有一個任務獲得鎖。

這是一個協同程序

Lock.release()

釋放鎖。如果有任務正在等待鎖,那么隊列中的下一個任務將被調度運行,並且鎖保持鎖定狀態。否則,沒有任務在等待鎖解鎖。

TCP流連接

uasyncio.open_connection(host,port)

打開給定hostportTCP連接這個host地址將使用socket.getaddrinfo解析,為阻塞調用。

返回一對流對象reader流和writer流。如果無法解析主機或無法建立連接,將引發特定於套接字的OSError

這是一個協同程序

uasyncio.start_server(callback,host,port,backlog=5)

啟動給定hostport上的TCP服務當一個新的客戶端連接被建立時,回調函數 callback 會被調用。該函數會接收到一對參數 (reader,writer) ,reader是類 StreamReader 的實例,而writer是類 StreamWriter 的實例。

callback 即可以是普通的可調用對象也可以是一個 協程函數; 如果它是一個協程函數,它將自動作為 Task 被調度。

返回Server對象

這是一個協同程序

class uasyncio.Stream

這表示TCP流連接。為了最大限度地減少代碼,這個類同時實現了一個讀取器和一個編寫器,以及這個類的兩者別名StreamReader StreamWriter。

Stream.get_extra_info(v)

獲取有關流的額外信息,由v給出,v的有效值是:peername 。

Stream.close()

關閉流

Stream.wait_closed()

等待流關閉

這是一個協同程序

Stream.read(n=-1)

讀取到 n 個字節並返回它們。如果沒有提供 n 或 -1,則讀取所有字節,直到 EOF。如果在讀取任何字節之前遇到 EOF,則返回的值將是一個空字節對象。

這是一個協同程序

Stream.readinto(buf)

將最多 n 個字節讀入 buf,n 等於 buf 的長度。

返回讀入緩沖區的字節數。

這是一個協同程序,也是一個 MicroPython 擴展。

Stream.readexactly(n)

正確讀取 n 個字節並將它們作為字節對象返回。

如果流在讀取 n 個字節之前結束,則引發 EOFError 異常。

這是一個協同程序

Stream.readline()

讀一行並返回

這是一個協同程序

Stream.write(buf)

累積的buf到輸出緩沖區。僅當調用 ‎‎Stream.drain‎‎ 時,才會刷新數據。建議在調用此函數后立即調用Stream.drain

Stream.drain()

將所有已緩沖的輸出數據輸出(寫入)到流中。

這是一個協同程序

class uasyncio.Server

這表示從‎‎start_server‎‎返回的服務器類。它可以在async with語句中用於在退出時關閉服務器。

Server.close()

關閉服務器

Server.wait_closed()

等待服務器關閉

這是一個協同程序

 

Event Loop事件循環

uasyncio.get_event_loop()

返回用於調度和運行任務的事件循環。Loop

uasyncio.new_event_loop()

重置事件循環並返回它。

注意:由於MicroPython只有一個事件循環,所以這個函數只是重置循環的狀態,它不會創建新的事件循環。

class uasyncio.Loop

這表示調度和運行任務的對象。不能被創建,使用get_event_loop替代。

Loop.create_task(coro)

從給定的coro協程創建一個任務並返回新的Task對象

Loop.run_forever()

運行事件循環直到stop()調用。

Loop.run_until_complete(awaitable)

運行給定的awaitable直到它完成。如果awaitable不是任務就不會被提升

Loop.stop()

停止事件循環

Loop.close()

關閉事件循環

當這個函數被調用的時候,循環必須處於非運行狀態。pending狀態的回調將被丟棄。

此方法清除所有的隊列並立即關閉執行器,不會等待執行器完成。 

Loop.set_exception_handler(handler)

handler設置為新的事件循環異常處理程序。

如果handlerNone,將設置默認的異常處理程序。在其他情況下,handler必須是一個可調用對象且簽名匹配(loop,context),其中loop是對活動事件循環的引用,而context是一個包含異常詳情的字典對象(請查看call_exception_handler()文檔來獲取關於上下文的更多信息)。

Loop.get_exception_handler()

返回當前異常處理程序如果沒有設置異常處理程序則返回None

Loop.default_exception_handler(context)

默認異常處理程序。

此方法會在發生異常且未設置異常處理程序時被調用。此方法也可以由想要具有不同於默認處理程序的行為的自定義異常處理程序來調用。

context 參數和 call_exception_handler() 中的同名參數完全相同。

Loop.call_exception_handler(context)

調用當前事件循環的異常處理程序。參數context是個包含下列鍵的字典對象'message','exception','future'

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM