tornado異步原理
tornado有四類異步事件:立即事件,定時器異步事件,io異步事件,Future異步事件。
tornado 的ioloop管理所有的異步事件,並在適當的時機調用異步事件的回掉函數。
四類異步事件均在ioloop的start函數中調度。
立即事件:
場景:當前函數執行完后,下次ioloop調度時直接調度某函數
用法:ioloop.add_callback(callback, *args, **kwargs)
原理:立即事件全部存放在ioloop._callbacks中,IOLoop每次循環都會調用這些立即事件的回調函數
def start(self): while True: ncallbacks = len(self._callbacks) #self._callbacks用於存放所有的立即事件 due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) #循環調用所有的立即事件的回調函數 for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if self._callbacks: #如果在上面調用回調函數的過程中,又添加了新的立即事件,則將等待IO事件的時間設置為0,以便及時調用新的立即事件 poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events)
定時器異步事件:
場景:用戶希望在某一段時間后執行某函數
用法:ioloop.call_at(when, callback, *args, **kwargs), ioloop.call_later(delay, callback, *args, **kwargs)
原理:定時器事件存放在ioloop._timeouts中,IOLoop每次循環開始都會找出所有已經超時的定時器,並調用對應的回調函數
def start(self): while True: ncallbacks = len(self._callbacks) due_timeouts = [] #用於存放超時的事件 if self._timeouts: #self._timeouts用於存放所有定時器事件 now = self.time() while self._timeouts: if self._timeouts[0].callback is None: #如果定時器事件沒有回掉函數,則說明已經取消,直接丟棄 heapq.heappop(self._timeouts) #heapq是一個數據結構,它保證heapq[0]永遠是最小的一個元素 self._cancellations -= 1 elif self._timeouts[0].deadline <= now: #如果定時器已經超時,則取出並添加至due_timeouts中 due_timeouts.append(heapq.heappop(self._timeouts)) else: #因為heapq的特性,如果執行到這一步,說明剩下事件都沒有超時,退出循環 break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) #循環調用所有已超時定時器事件的回調函數 if self._callbacks: poll_timeout = 0.0 elif self._timeouts: #根據最小定時器事件的時間設置等待IO事件的時間 poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events)
IO異步事件:
場景:等待某個文件描述符的某個事件,如TCPserver等待socket的READ事件
用法:ioloop.add_handler(fd, callback, events)
原理:所有的文件描述符全部存放在ioloop._impl中,windows平台下_impl是tornado.platform.select.SelectIOLoop對象
在linux平台下_impl是tornado.platform.epoll.EPollIOLoop對象,作用都是同時監聽多個文件描述符
def start(self): while True: ncallbacks = len(self._callbacks) due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if self._callbacks: poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) #監聽所有文件描述符 self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) #循環調用所有文件描述符對應的回調函數
Future異步事件:
場景:等待某個異步事件結束后執行回掉函數
用法:ioloop.add_future(future, callback), future.add_done_callback(callback)
原理:異步事件結束后調用Future.set_result(),當執行set_result時將future所有的回掉函數添加為ioloop的立即事件
class Future(object): def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True if self._callbacks: from tornado.ioloop import IOLoop loop = IOLoop.current() for cb in self._callbacks: loop.add_callback(cb, self) #將所有的回掉函數設置為ioloop的立即事件 self._callbacks = None