本文所剖析的tornado源碼版本為4.4.2
ioloop是tornado的關鍵,是他的最底層。
ioloop就是對I/O多路復用的封裝,它實現了一個單例,將這個單例保存在IOLoop._instance中
ioloop實現了Reactor模型,將所有要處理的I/O事件注冊到一個中心I/O多路復用器上,同時主線程/進程阻塞在多路復用器上;一旦有I/O事件到來或是准備就緒(文件描述符或socket可讀、寫),多路復用器返回並將事先注冊的相應I/O事件分發到對應的處理器中。
另外,ioloop還被用來集中運行回調函數以及集中處理定時任務。
一 准備知識:
1 首先我們要了解Reactor模型
2 其次,我們要了解I/O多路復用,由於本文假設系統為Linux,所以要了解epoll以及Python中的select模塊
3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這。
二 創建IOLoop實例
來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類
class IOLoop(Configurable): ......
這里就要結合Configurable類進行講解:

def __new__(cls, *args, **kwargs) ''' 解析出impl對象 1 cls是直屬配置子類時,impl就是該直屬配置子類的'執行類對象' 2 cls是從屬配置子類時,impl就是該從屬配置子類自身 然后實例化一個impl實例對象 運行其initialize方法,並傳入合並后的參數 返回該impl實例對象 ''' base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) instance.initialize(*args, **init_kwargs) return instance
1 首先實例化一個該直屬配置子類的'執行類對象',也就是調用該類的configurable_default方法並返回賦值給impl:
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): # 因為我們假設我們的系統為Linux,且支持epoll,所以這里為True from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
2 也就是impl是EPollIOLoop類對象,然后實例化該對象,運行其initialize方法
class EPollIOLoop(PollIOLoop): # 該類只有這么短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。 def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入
來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:
class PollIOLoop(IOLoop): # 從屬配置子類 def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) # 調用IOLoop的initialize方法 self._impl = impl # self._impl = select.epoll() if hasattr(self._impl, 'fileno'): # 文件描述符的close_on_exec屬性 set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} # 文件描述符對應的fileno()作為key,(文件描述符對象,處理函數)作為value self._events = {} # 用來存儲epoll_obj.poll()返回的事件,也就是哪個fd發生了什么事件{(fd1, event1), (fd2, event2)……} self._callbacks = [] self._callback_lock = threading.Lock() # 添加線程鎖 self._timeouts = [] # 存儲定時任務 self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None # 獲得當前線程標識符 self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
首先調用了IOLoop.initialize(self,**kwargs)方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current()
@staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self
我們可以看到IOLoop.initialize()主要是對線程做了一些支持和操作。
3 返回該實例
三 剖析PollIOLoop
1 處理I/O事件以及其對應handler的相關屬性以及方法
使用self._handlers用來存儲fd與handler的對應關系,文件描述符對應的fileno()作為key,元組(文件描述符對象,處理函數)作為value
self._events 用來存儲epoll_obj.poll()返回的事件,也就是哪個fd發生了什么事件{(fd1, event1), (fd2, event2)……}
add_handler方法用來添加handler
update_handle方法用來更新handler
remove_handler方法用來移除handler
def add_handler(self, fd, handler, events): # 向epoll中注冊事件 , 並在self._handlers[fd]中為該文件描述符添加相應處理函數 fd, obj = self.split_fd(fd) # fd.fileno(),fd self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
2 處理回調函數的相關屬性以及方法
self._callbacks用來存儲回調函數
add_callback方法用來直接添加回調函數
add_future方法用來間接的添加回調函數,future對象詳解在這
def add_callback(self, callback, *args, **kwargs): # 因為Python的GIL的限制,導致Python線程並不算高效。加上tornado實現了多進程 + 協程的模式,所以我們略過源碼中的部分線程相關的一些操作 if self._closing: return self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs)) def add_future(self, future, callback): # 為future對象添加經過包裝后的回調函數,該回調函數會在future對象被set_done后添加至_callbacks中 assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
3 處理定時任務的相關屬性以及方法
self._timeouts用來存儲定時任務
self.add_timeout用來添加定時任務(self.call_later self.call_at都是間接調用了該方法)
def add_timeout(self, deadline, callback, *args, **kwargs): """ ``deadline``可能是一個數字,表示相對於當前時間的時間(與“IOLoop.time”通常為“time.time”相同的大小),或者是datetime.timedelta對象。 自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,因為它不需要timedelta對象。 """ if isinstance(deadline, numbers.Real): return self.call_at(deadline, callback, *args, **kwargs) elif isinstance(deadline, datetime.timedelta): return self.call_at(self.time() + timedelta_to_seconds(deadline), callback, *args, **kwargs) else: raise TypeError("Unsupported deadline %r" % deadline)
4 啟動io多路復用器
啟動也一般就意味着開始循環,那么循環什么呢?
1 運行回調函數
2 運行時間已到的定時任務
3 當某個文件描述法發生事件時,運行該事件對應的handler
使用start方法啟動ioloop,看一下其簡化版(去除線程相關,以及一些相對不重要的細節):
def start(self): try: while True: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] # 將時間已到的定時任務放置到due_timeouts中,過程省略 for callback in callbacks: # 執行callback self._run_callback(callback) for timeout in due_timeouts: # 執行定時任務 if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None # 釋放內存 # 根據情況設置poll_timeout的值,過程省略 if not self._running: # 終止ioloop運行時,在執行完了callback后結束循環 break
try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系統調用被信號處理函數中斷,進行下一次循環 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函數 handler_func(fd_obj, events) # 運行該事件處理函數 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉連接時會產生EPIPE錯誤 pass # 其他異常處理已經省略 fd_obj = handler_func = None # 釋放內存空間

def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 獲得當前線程標識符 self._running = True old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': # 需要Python2.6及以上版本,類UNIX系統,set_wake_up_fd存在。在windows系統上運行會崩潰 try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: # Already set, restore previous value. This is a little racy, # but there's no clean get_wakeup_fd and in real use the # IOLoop is just started once at the beginning. signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: # Non-main thread, or the previous value of wakeup_fd # is no longer valid. old_wakeup_fd = None try: while True: # 防止多線程模型時產生臟數據 with self._callback_lock: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] if self._timeouts: # 將時間已到的定時任務放置到due_timeouts中 now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: # 執行callbacks self._run_callback(callback) for timeout in due_timeouts: # 執行timeout_callback if timeout.callback is not None: self._run_callback(timeout.callback) # 釋放內存 callbacks = callback = due_timeouts = timeout = None if self._callbacks: # 如果在執行callbacks 或者 timeouts的過程中,他們執行了add_callbacks ,那么這時:self._callbacks就非空了, # 為了盡快的執行其中的callbacks,我們需要將poll_timeout 設置為0,這樣我們就不需要等待fd事件發生,盡快運行callbacks了 poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # 如果沒有回調函數也沒有定時任務,我們就使用默認值 poll_timeout = _POLL_TIMEOUT if not self._running: # 終止ioloop運行時,在執行完了callback后結束循環 break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # http://blog.csdn.net/benkaoya/article/details/17262053 解釋EINTR是什么。系統調用被信號處理函數中斷,進行下一次循環 if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # 從一組待處理的fds中一次彈出一個fd並運行其處理程序。 # 由於該處理程序可能會對其他文件描述符執行操作,因此可能會重新調用此IOLoop來修改self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函數 handler_func(fd_obj, events) # 運行該事件處理函數 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉連接時會產生EPIPE錯誤 pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) # 釋放內存空間 fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd)
5 關閉io多路復用器
def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: # 該參數若為True,則表示會關閉所有文件描述符 for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None
四 參考
https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/
https://www.zhihu.com/question/20021164
http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752
http://blog.csdn.net/benkaoya/article/details/17262053