先來看個例子,自己實現的模擬耗時操作
例1
import types
import select
import time
import socket
import functools
class Future:
def __init__(self, *, loop=None):
self._result = None
self._callbacks = []
self._loop = loop
def set_result(self, result):
self._result = result
callbacks = self._callbacks[:]
self._callbacks = []
for callback in callbacks:
loop._ready.append(callback)
def add_callback(self, callback):
self._callbacks.append(callback)
def __iter__(self):
print('enter Future ...')
print('foo 掛起在yield處 ')
yield self
print('foo 恢復執行')
print('exit Future ...')
return 'future'
__await__ = __iter__
class Task:
def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop
def _step(self):
cor = self.cor
try:
result = cor.send(None)
# 1. cor 協程執行完畢時,會拋出StopIteration,說明cor執行完畢了,這是關閉loop
except StopIteration as e:
self._loop.close()
# 2. 有異常時
except Exception as e:
"""處理異常邏輯"""
# 3. result為Future對象時
else:
if isinstance(result, Future):
result.add_callback(self._wakeup)
def _wakeup(self):
self._step()
class Loop:
def __init__(self):
self._stop = False
self._ready = []
self._scheduled = []
self._time = lambda: time.time()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
self._select = functools.partial(select.select, [sock], [], [])
def create_task(self, cor):
task = Task(cor, loop=self)
self._ready.append(task._step)
return task
def call_later(self, delay, callback, *args):
callback._when = delay
self._scheduled.append((callback, *args))
def run_until_complete(self, task):
assert isinstance(task, Task)
timeout = None
while not self._stop:
if self._ready:
timeout = 0
if self._scheduled:
callback, *args = self._scheduled.pop()
timeout = callback._when
self._ready.append(functools.partial(callback, *args))
# 通過select(timeout)來控制阻塞時間
self._select(timeout)
n = len(self._ready)
for i in range(n):
step = self._ready.pop()
step()
def close(self):
self._stop = True
@types.coroutine
def _sleep():
yield
# 自己實現一個sleep協程
async def sleep(s, result=None):
if s <= 0:
await _sleep()
return result
else:
future = Future(loop=loop)
future._loop.call_later(s, callback, future)
await future
return result
# 延遲回調函數
def callback(future):
# 時間到了就回調此函數
future.set_result(None)
async def foo():
print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
await sleep(3)
print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
if __name__ == '__main__':
f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)
執行結果:
enter foo at 2019-07-08 21:09:43
enter Future ...
foo 掛起在yield處
foo 恢復執行
exit Future ...
exit foo at 2019-07-08 21:09:46
在上一篇文章通過Loop, Task, Future三個類基本上實現了對協程的調度,在此基礎上做了一些修改實現了對協程中耗時操作的模擬。
首先我們分析一下async def foo協程中的await sleep(3),這里其實會進入到sleep中 await future這里,再進入到future對象的__await__方法中的yield self,foo協程此時被掛起,上一篇文章中我們分析知道,最終foo還是被這個future對象給分成了part1和part2兩部分邏輯。
- foo print('enter foo at ...')
- sleep
- future print('enter Future ...') # 以上是第一次f.send(None)執行的邏輯,命名為part1
- future yield self ---------------------------------------------------------------
- print('exit Future ...') #以下是第二次f.send(None)執行的邏輯,命名為part2
- sleep
- foo print('exit foo at ...')
part1 在 loop 循環的開始就執行了,返回一個 future 對象,把 part2 注冊到 future 中,然后掛起了,下半部分 part2 在什么時候執行呢?因為在 sleep 中我們通過注冊了一個3秒之后執行的回調函數 callback 到 loop 對象中,loop 對象在執行完 part1 后,會在下一輪的循環中執行 callback 回調函數,由於 loop._scheduled 不為空,timeout 被賦值成3,因此 select(3) 阻塞3秒后就繼續往下執行。也就是說 callback 函數的執行時機就是在 select(3) 阻塞3秒后執行,callback 回調函數中又會調用 future.set_result() ,在 set_result 中會把 part2 注冊到 loop 中,所以最終又在 loop 的下一輪循環中調用 part2 的邏輯,回到上次 foo 掛起的地方,繼續 foo 的流程,直到協程退出。
其實所謂的模擬耗時3秒,其實就是在執行完part1后通過 select 函數阻塞3秒,然后再次執行 part2 ,這樣就實現了所謂的等待3秒的操作。
要實現這個sleep協程的耗時模擬,主要是有2個關鍵點:
-
1.通過 select(timeout) 的 timeout來控制 select 函數的阻塞時間。
timeout=None 一直阻塞,直到有真實的IO事件到來,如socket的可讀可寫事件 timeout=0 無論此時是否有IO事件到來,都立馬返回 timeout=n 阻塞n秒,在這n秒內,只要有IO事件到來,就立馬返回,否則阻塞n秒才返回
-
2.當延遲時間到來時,通過 callback 函數中調用 future.set_result() 方法,來驅動 part2 的執行。
了解到這里之后,我們再來看一下 asyncio 的源碼
Loop類
class BaseEventLoop(events.AbstractEventLoop):
...
def __init__(self):
...
# 用來保存包裹task.step方法的handle對象的對端隊列
self._ready = collections.deque()
# 用來保存包裹延遲回調函數的handle對象的二叉堆,是一個最小二叉堆
self._scheduled = []
...
def create_task(self, coro):
"""Schedule a coroutine object.
Return a task object.
"""
self._check_closed()
# self._task_factory 默認是None
if self._task_factory is None:
# 創建一個task對象
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
# 返回這個task對象
return task
def call_soon(self, callback, *args):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
# 關鍵代碼callback就是task._step方法,args是task._step的參數
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args):
# 1 handle是一個包裹了task._step方法和args參數的對象
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
# 2 關鍵代碼,把handle添加到列表self._ready中
self._ready.append(handle)
return handle
def run_until_complete(self, future):
...
# future就是task對象,下面2句是為了確保future是一個Future類實例對象
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
# 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中,_run_until_complete_cb就是最后
# 把loop的_stop屬性設置為ture的,用來結束loop循環的
future.add_done_callback(_run_until_complete_cb)
try:
# 開啟無線循環
self.run_forever()
except:
...
raise
finally:
...
# 執行完畢返回cor的返回值
return future.result()
def run_forever(self):
...
try:
events._set_running_loop(self)
while True:
# 每次運行一次循環,判斷下_stopping是否為true,也就是是否結束循環
self._run_once()
if self._stopping:
break
finally:
...
def _run_once(self):
# loop的_scheduled是一個最小二叉堆,用來存放延遲執行的回調函數,根據延遲的大小,把這些回調函數構成一個最小堆,然后再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中,
# loop的_ready是雙端隊列,所有注冊到loop的回調函數,最終是要放入到這個隊列中,依次取出然后執行的
# 1. self._scheduled是否為空
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
# 2. 給timeout賦值,self._scheduled為空,timeout就為None
timeout = None
# 只要self._ready不為空,timeout就為0
if self._ready or self._stopping:
timeout = 0
# 只要self._scheduled不為空
elif self._scheduled:
# Compute the desired timeout.
# 用堆頂的回調函數的延遲時間作為timeout的等待時間,也就是說用等待時間最短的回調函數的時間作為timeout的等待時間
when = self._scheduled[0]._when
timeout = max(0, when - self.time())
、
if self._debug and timeout != 0:
...
# 3. 關注else分支,這是關鍵代碼
else:
# timeout=None --> 一直阻塞,只要有io事件產生,立馬返回event_list事件列表,否則一直阻塞着
# timeout=0 --> 不阻塞,有io事件產生,就立馬返回event_list事件列表,沒有也返空列表
# timeout=2 --> 阻塞等待2s,在這2秒內只要有io事件產生,立馬返回event_list事件列表,沒有io事件就阻塞2s,然后返回空列表
event_list = self._selector.select(timeout)
# 用來處理真正的io事件的函數,sleep調用時模擬IO耗時,並不涉及IO事件
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
# 4. 依次取出堆頂的回調函數handle添加到_ready隊列中
while self._scheduled:
handle = self._scheduled[0]
# 當_scheduled[]中有多個延遲回調時,通過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出,
# 也就是說,當有n個延遲回調時,會產生n個timeout,對應n次run_once循環的調用
if handle._when >= end_time:
break
# 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 5. 執行self._ready隊列中所有的回調函數handle對象
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
# handle._run()實際上就是執行task._step(),也就是執行cor.send(None)
handle._run()
handle = None # Needed to break cycles when an exception occurs.
Task類
class Task(futures.Future):
...
def _step(self, exc=None):
"""
_step方法可以看做是task包裝的coroutine對象中的代碼的直到yield的前半部分邏輯
"""
...
try:
if exc is None:
# 1.關鍵代碼,調用協程
result = coro.send(None)
else:
result = coro.throw(exc)
# 2. coro執行完畢會拋出StopIteration異常
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
# result為None時,調用task的callbasks列表中的回調方法,在調用loop.run_until_complite,結束loop循環
self.set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
# 3. result = coro.send(None)不拋出異常,說明協程被yield掛起
else:
# 4. 查看result是否含有_asyncio_future_blocking屬性
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
self._loop.call_soon(
self._step,
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
elif blocking:
if result is self:
self._loop.call_soon(
self._step,
RuntimeError(
'Task cannot await on itself: {!r}'.format(
self)))
# 4.1. 如果result是一個future對象時,blocking會被設置成true
else:
result._asyncio_future_blocking = False
# 把_wakeup回調函數設置到此future對象中,當此future對象調用set_result()方法時,就會調用_wakeup方法
result.add_done_callback(self._wakeup)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from '
'in task {!r} with {!r}'.format(self, result)))
# 5. 如果result是None,則注冊task._step到loop對象中去,在下一輪_run_once中被回調
elif result is None:
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self._step)
# --------下面的代碼可以暫時不關注了--------
elif inspect.isgenerator(result):
# Yielding a generator is just wrong.
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from for '
'generator in task {!r} with {}'.format(
self, result)))
else:
# Yielding something else is an error.
self._loop.call_soon(
self._step,
RuntimeError(
'Task got bad yield: {!r}'.format(result)))
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.
def _wakeup(self, future):
try:
future.result()
except Exception as exc:
# This may also be a cancellation.
self._step(exc)
else:
# 這里是關鍵代碼,上次的_step()執行到第一次碰到yield的地方掛住了,此時再次執行_step(),
# 也就是再次執行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續執行yield后面的邏輯
self._step()
self = None # Needed to break cycles when an exception occurs.
Future類
class Future:
...
def add_done_callback(self, fn, *, context=None):
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def __iter__(self):
# self.done()返回False,
if not self.done():
self._asyncio_future_blocking = True
# 把Future對象自己返回出去
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
sleep協程
#延遲回調函數,里面調用fut.set_result
def _set_result_unless_cancelled(fut, result):
if fut.cancelled():
return
# 關鍵是這一步,驅動協程從上次掛起的地方繼續執行
fut.set_result(result)
@types.coroutine
def __sleep0():
yield
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_event_loop()
# 創建一個future對象
future = loop.create_future()
# 注冊一個延遲回調函數到loop對象中
h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
try:
return await future
finally:
h.cancel()
關鍵地方我都寫了注釋,如果能耐着性子細心看下來,你會發現例1中的實現,就是模仿asyncio中的這幾個類去實現的。
asyncio的sleep中的延遲回調函數是_set_result_unless_cancelled與我寫的callback對應,關鍵都是要回調future.set_result方法,這樣才能驅動協程從上次掛起的地方開始繼續執行。
對於使用asyncio.sleep的例子
import asyncio
async def cor():
print('enter cor ...')
await asyncio.sleep(2)
print('exit cor ...')
return 'cor'
loop = asyncio.get_event_loop()
task = loop.create_task(cor())
rst = loop.run_until_complete(task)
print(rst)
await asyncio.sleep(2) 這句代碼同樣是把cor協程分為如下兩個部分:
- cor print('enter cor ...')
- sleep
- future print('enter Future ...') # 以上是第一次cor.send(None)執行的邏輯,命名為part1
- future yield self ---------------------------------------------------------------
- future print('exit Future ...') # 以下是第二次cor.send(None)執行的邏輯,命名為part2
- sleep
- cor print('exit foo ...')
總之,只要有要耗時的地方,就必須要有一個 future 用來 await future,然后協程就被分成了part1和part2,part1和part2就被分別封裝到了task._step和task._wakeup中,然后在loop循環中先調用part1,再通過select函數阻塞n秒之后,再執行part2,最后,協程執行完畢。