一,前言
本文將會講述Python 3.5之后出現的async/await的使用方法,我從上看到一篇不錯的博客,自己對其進行了梳理。該文章原地址https://www.cnblogs.com/dhcn/p/9032461.html
二,Python常見的函數形式
2.1 普通函數
def fun(): return 1 if __name__ == '__main__': fun()
普通函數,沒有什么特別的,直接函數名加括號調用即可。
2.2 生成器函數
def generator_fun(): yield 1 if __name__ == '__main__': print(generator_fun()) #<generator object generator_fun at 0x00000000005E6E08> print(generator_fun().send(None)) for i in generator_fun(): print(i)
與普通函數不同的是,生成器函數返回的是一個生成器對象。我們可以通過send()方法或者是for循環的方法來對其進行取值。
2.3 (異步)協程函數
async def async_fun(): return 1 if __name__ == '__main__': print(async_fun()) # <coroutine object fun at 0x0000000002156E08> print(async_fun().send(None)) # StopIteration: 1 try: async_fun().send(None) except StopIteration as e: print(e.value)
異步函數的調用返回的是一個協程對象,若改對象通過send()進行調用,會報一個StopIteration錯,若要取值,就需要捕獲該異常,e.value的形式進行獲取。
在協程函數中,可以通過await語法來掛起自身的協程,並等待另一個協程完成直到返回結果:
async def async_function(): return 1 async def await_coroutine(): result = await async_function() print(result) run(await_coroutine()) # 1
2.4 異步生成器
async def async_fun(): async for i in generator_async_fun(): print(i) async def generator_async_fun(): yield 1 if __name__ == '__main__': async_fun().send(None)
異步生成器的調用比較特殊,它需要依賴別的異步函數進行調用。
三,await的使用
await語法可以掛起自聲協程,等待另一個協程完成直到返回結果。但是有一些地方需要注意:
3.1 await注意事項
await語法只能出現在通過async修飾的函數中,否則會報SyntaxError錯誤。
而且await后面的對象需要是一個Awaitable,或者實現了相關的協議。
3.2 關於await的源代碼解析
查看Awaitable抽象類的代碼,表明了只要一個類實現了__await__方法,那么通過它構造出來的實例就是一個Awaitable:
class Awaitable(metaclass=ABCMeta): __slots__ = () @abstractmethod def __await__(self): yield @classmethod def __subclasshook__(cls, C): if cls is Awaitable: return _check_methods(C, "__await__") return NotImplemented
而且可以看到,Coroutine類也繼承了Awaitable,而且實現了send,throw和close方法。所以await一個調用異步函數返回的協程對象是合法的。
class Coroutine(Awaitable): __slots__ = () @abstractmethod def send(self, value): ... @abstractmethod def throw(self, typ, val=None, tb=None): ... def close(self): ... @classmethod def __subclasshook__(cls, C): if cls is Coroutine: return _check_methods(C, '__await__', 'send', 'throw', 'close') return NotImplemented
3.3 關於異步生成器的實例
案例:假如我要到一家超市去購買土豆,而超市貨架上的土豆數量是有限的:
class Potato: @classmethod def make(cls, num, *args, **kws): potatos = [] for i in range(num): potatos.append(cls.__new__(cls, *args, **kws)) return potatos all_potatos = Potato.make(5)
現在我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:
def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: sleep(.1) else: potato = all_potatos.pop() yield potato count += 1 if count == num: break def buy_potatos(): bucket = [] for p in take_potatos(50): bucket.append(p)
對應到代碼中,就是迭代一個生成器的模型,顯然,當貨架上的土豆不夠的時候,這時只能夠死等,而且在上面例子中等多長時間都不會有結果(因為一切都是同步的),也許可以用多進程和多線程解決,而在現實生活中,更應該像是這樣的:
async def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: await ask_for_potato() potato = all_potatos.pop() yield potato count += 1 if count == num: break
當貨架上的土豆沒有了之后,我可以詢問超市請求需要更多的土豆,這時候需要等待一段時間直到生產者完成生產的過程:
async def ask_for_potato(): await asyncio.sleep(random.random()) all_potatos.extend(Potato.make(random.randint(1, 10)))
當生產者完成和返回之后,這是便能從await掛起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程:
async def buy_potatos(): bucket = [] async for p in take_potatos(50): bucket.append(p) print(f'Got potato {id(p)}...')
async for語法表示我們要后面迭代的是一個異步生成器。
def main(): import asyncio loop = asyncio.get_event_loop() res = loop.run_until_complete(buy_potatos()) loop.close()
用asyncio運行這段代碼,結果是這樣的:
Got potato 4338641384... Got potato 4338641160... Got potato 4338614736... Got potato 4338614680... Got potato 4338614568... Got potato 4344861864...
既然是異步的,在請求之后不一定要死等,而是可以做其他事情。比如除了土豆,我還想買番茄,這時只需要在事件循環中再添加一個過程:
def main(): import asyncio loop = asyncio.get_event_loop() res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()])) loop.close()
再來運行這段代碼:
Got potato 4423119312... Got tomato 4423119368... Got potato 4429291024... Got potato 4421640768... Got tomato 4429331704... Got tomato 4429331760... Got tomato 4423119368...
3.4 AsyncGenerator的定義
它需要實現__aiter__和__anext__兩個核心方法,以及asend,athrow,aclose方法。
class AsyncGenerator(AsyncIterator): __slots__ = () async def __anext__(self): ... @abstractmethod async def asend(self, value): ... @abstractmethod async def athrow(self, typ, val=None, tb=None): ... async def aclose(self): ... @classmethod def __subclasshook__(cls, C): if cls is AsyncGenerator: return _check_methods(C, '__aiter__', '__anext__', 'asend', 'athrow', 'aclose') return NotImplemented
異步生成器是在3.6之后才有的特性,同樣的還有異步推導表達式,因此在上面的例子中,也可以寫成這樣:
bucket = [p async for p in take_potatos(50)]
類似的,還有await表達式:
result = [await fun() for fun in funcs if await condition()]
四,async的其他用法
4.1 async修飾類普通方法
除了函數之外,類實例的普通方法也能用async語法修飾:
class ThreeTwoOne: async def begin(self): print(3) await asyncio.sleep(1) print(2) await asyncio.sleep(1) print(1) await asyncio.sleep(1) return async def game(): t = ThreeTwoOne() await t.begin() print('start')
實例方法的調用同樣是返回一個coroutine:
function = ThreeTwoOne.begin method = function.__get__(ThreeTwoOne, ThreeTwoOne()) import inspect assert inspect.isfunction(function) assert inspect.ismethod(method) assert inspect.iscoroutine(method())
4.2 async 修飾類方法
class ThreeTwoOne: @classmethod async def begin(cls): print(3) await asyncio.sleep(1) print(2) await asyncio.sleep(1) print(1) await asyncio.sleep(1) return async def game(): await ThreeTwoOne.begin() print('start')
4.3 async的上下文管理器應用
根據PEP 492中,async也可以應用到上下文管理器中,__aenter__和__aexit__需要返回一個Awaitable:
class GameContext: async def __aenter__(self): print('game loading...') await asyncio.sleep(1) async def __aexit__(self, exc_type, exc, tb): print('game exit...') await asyncio.sleep(1) async def game(): async with GameContext(): print('game start...') await asyncio.sleep(2)
在3.7版本,contextlib中會新增一個asynccontextmanager裝飾器來包裝一個實現異步協議的上下文管理器:
from contextlib import asynccontextmanager @asynccontextmanager async def get_connection(): conn = await acquire_db_connection() try: yield finally: await release_db_connection(conn)
五,await和yield from
Python3.3的yield from語法可以把生成器的操作委托給另一個生成器,生成器的調用方可以直接與子生成器進行通信:
def sub_gen(): yield 1 yield 2 yield 3 def gen(): return (yield from sub_gen()) def main(): for val in gen(): print(val) # 1 # 2 # 3
利用這一特性,使用yield from能夠編寫出類似協程效果的函數調用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from語法來創建協程:
# https://docs.python.org/3.4/library/asyncio-task.html import asyncio @asyncio.coroutine def compute(x, y): print("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0) return x + y @asyncio.coroutine def print_sum(x, y): result = yield from compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
然而,用yield from容易在表示協程和生成器中混淆,沒有良好的語義性,所以在Python 3.5推出了更新的async/await表達式來作為協程的語法。因此類似以下的調用是等價的:
async with lock: ... with (yield from lock): ... ###################### def main(): return (yield from coro()) def main(): return (await coro())
那么,怎么把生成器包裝為一個協程對象呢?這時候可以用到types包中的coroutine裝飾器(如果使用asyncio做驅動的話,那么也可以使用asyncio的coroutine裝飾器),@types.coroutine裝飾器會將一個生成器函數包裝為協程對象:
import asyncio import types @types.coroutine def compute(x, y): print("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
盡管兩個函數分別使用了新舊語法,但他們都是協程對象,也分別稱作native coroutine以及generator-based coroutine,因此不用擔心語法問題。下面觀察一個asyncio中Future的例子:
import asyncio future = asyncio.Future() async def coro1(): await asyncio.sleep(1) future.set_result('data') async def coro2(): print(await future) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1(), coro2() ])) loop.close()
兩個協程在在事件循環中,協程coro1在執行第一句后掛起自身切到asyncio.sleep,而協程coro2一直等待future的結果,讓出事件循環,計時器結束后coro1執行了第二句設置了future的值,被掛起的coro2恢復執行,打印出future的結果'data'。
future可以被await證明了future對象是一個Awaitable,進入Future類的源碼可以看到有一段代碼顯示了future實現了__await__協議:
class Future: ... def __iter__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too. if compat.PY35: __await__ = __iter__ # make compatible with 'await' expression
當執行await future這行代碼時,future中的這段代碼就會被執行,首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
當future執行set_result方法時,會觸發以下的代碼,設置結果,標記future已經完成:
def set_result(self, result): ... if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks()
最后future會調度自身的回調函數,觸發Task._step()告知Task驅動future從之前掛起的點恢復執行,不難看出,future會執行下面的代碼:
class Future: ... def __iter__(self): ... assert self.done(), "yield from wasn't used with future" return self.result() # May raise too.
最終返回結果給調用方。