深入tornado中的ioLoop


本文所剖析的tornado源碼版本為4.4.2

ioloop是tornado的關鍵,是他的最底層。

ioloop就是對I/O多路復用的封裝,它實現了一個單例,將這個單例保存在IOLoop._instance中

ioloop實現了Reactor模型,將所有要處理的I/O事件注冊到一個中心I/O多路復用器上,同時主線程/進程阻塞在多路復用器上;一旦有I/O事件到來或是准備就緒(文件描述符或socket可讀、寫),多路復用器返回並將事先注冊的相應I/O事件分發到對應的處理器中。

另外,ioloop還被用來集中運行回調函數以及集中處理定時任務

一 准備知識:

  1 首先我們要了解Reactor模型

  2 其次,我們要了解I/O多路復用,由於本文假設系統為Linux,所以要了解epoll以及Python中的select模塊

  3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這

二  創建IOLoop實例

來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類

class IOLoop(Configurable):
    ......

這里就要結合Configurable類進行講解:

def __new__(cls, *args, **kwargs)
        '''
        解析出impl對象    
            1 cls是直屬配置子類時,impl就是該直屬配置子類的'執行類對象'
            2 cls是從屬配置子類時,impl就是該從屬配置子類自身
        然后實例化一個impl實例對象
        運行其initialize方法,並傳入合並后的參數
        返回該impl實例對象
        '''
        base = cls.configurable_base()   
        init_kwargs = {}
        if cls is base:
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        else:
            impl = cls
        init_kwargs.update(kwargs)
        instance = super(Configurable, cls).__new__(impl)
        instance.initialize(*args, **init_kwargs)
        return instance
Configurable中的__new__方法

1 首先實例化一個該直屬配置子類的'執行類對象',也就是調用該類的configurable_default方法並返回賦值給impl:

    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):     # 因為我們假設我們的系統為Linux,且支持epoll,所以這里為True
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop 
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

2 也就是impl是EPollIOLoop類對象,然后實例化該對象,運行其initialize方法

class EPollIOLoop(PollIOLoop):  # 該類只有這么短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入

  來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:

class PollIOLoop(IOLoop):  # 從屬配置子類

    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)                # 調用IOLoop的initialize方法
        self._impl = impl                               # self._impl = select.epoll()
        if hasattr(self._impl, 'fileno'):               # 文件描述符的close_on_exec屬性
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}                             # 文件描述符對應的fileno()作為key,(文件描述符對象,處理函數)作為value
        self._events = {}                               # 用來存儲epoll_obj.poll()返回的事件,也就是哪個fd發生了什么事件{(fd1, event1), (fd2, event2)……}
        self._callbacks = []
        self._callback_lock = threading.Lock()          # 添加線程鎖
        self._timeouts = []                             # 存儲定時任務
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None                       # 獲得當前線程標識符
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

  首先調用了IOLoop.initialize(self,**kwargs)方法:

    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                self.make_current()
        elif make_current:
            if IOLoop.current(instance=False) is not None:
                raise RuntimeError("current IOLoop already exists")
            self.make_current()
@staticmethod
def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self

    我們可以看到IOLoop.initialize()主要是對線程做了一些支持和操作。

3 返回該實例

三 剖析PollIOLoop

1 處理I/O事件以及其對應handler的相關屬性以及方法

    使用self._handlers用來存儲fd與handler的對應關系,文件描述符對應的fileno()作為key,元組(文件描述符對象,處理函數)作為value

  self._events 用來存儲epoll_obj.poll()返回的事件,也就是哪個fd發生了什么事件{(fd1, event1), (fd2, event2)……}

    add_handler方法用來添加handler

  update_handle方法用來更新handler

    remove_handler方法用來移除handler

    def add_handler(self, fd, handler, events):
        # 向epoll中注冊事件 , 並在self._handlers[fd]中為該文件描述符添加相應處理函數
        fd, obj = self.split_fd(fd)   # fd.fileno(),fd
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

2 處理回調函數的相關屬性以及方法

  self._callbacks用來存儲回調函數

  add_callback方法用來直接添加回調函數

  add_future方法用來間接的添加回調函數,future對象詳解在這

    def add_callback(self, callback, *args, **kwargs):
        # 因為Python的GIL的限制,導致Python線程並不算高效。加上tornado實現了多進程 + 協程的模式,所以我們略過源碼中的部分線程相關的一些操作
        if self._closing:
            return
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
    def add_future(self, future, callback):
        # 為future對象添加經過包裝后的回調函數,該回調函數會在future對象被set_done后添加至_callbacks中
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))

3 處理定時任務的相關屬性以及方法

  self._timeouts用來存儲定時任務

  self.add_timeout用來添加定時任務(self.call_later   self.call_at都是間接調用了該方法)

def add_timeout(self, deadline, callback, *args, **kwargs):
        """
            ``deadline``可能是一個數字,表示相對於當前時間的時間(與“IOLoop.time”通常為“time.time”相同的大小),或者是datetime.timedelta對象。 
            自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,因為它不需要timedelta對象。

        """
        if isinstance(deadline, numbers.Real):
            return self.call_at(deadline, callback, *args, **kwargs)
        elif isinstance(deadline, datetime.timedelta):
            return self.call_at(self.time() + timedelta_to_seconds(deadline),
                                callback, *args, **kwargs)
        else:
            raise TypeError("Unsupported deadline %r" % deadline)

4 啟動io多路復用器

  啟動也一般就意味着開始循環,那么循環什么呢?

    1 運行回調函數

    2 運行時間已到的定時任務

    3 當某個文件描述法發生事件時,運行該事件對應的handler

  使用start方法啟動ioloop,看一下其簡化版(去除線程相關,以及一些相對不重要的細節):

def start(self):
        try:
            while True:    
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 將時間已到的定時任務放置到due_timeouts中,過程省略
                for callback in callbacks:          # 執行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 執行定時任務
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)       
                callbacks = callback = due_timeouts = timeout = None    # 釋放內存
                # 根據情況設置poll_timeout的值,過程省略
                if not self._running:    # 終止ioloop運行時,在執行完了callback后結束循環
                    break
try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系統調用被信號處理函數中斷,進行下一次循環 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函數 handler_func(fd_obj, events) # 運行該事件處理函數 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉連接時會產生EPIPE錯誤 pass # 其他異常處理已經省略 fd_obj = handler_func = None # 釋放內存空間
def start(self):
        if self._running:
            raise RuntimeError("IOLoop is already running")
        self._setup_logging()
        if self._stopped:
            self._stopped = False
            return
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()     # 獲得當前線程標識符
        self._running = True
        old_wakeup_fd = None
        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
            # 需要Python2.6及以上版本,類UNIX系統,set_wake_up_fd存在。在windows系統上運行會崩潰
            try:
                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                if old_wakeup_fd != -1:
                    # Already set, restore previous value.  This is a little racy,
                    # but there's no clean get_wakeup_fd and in real use the
                    # IOLoop is just started once at the beginning.
                    signal.set_wakeup_fd(old_wakeup_fd)
                    old_wakeup_fd = None
            except ValueError:
                # Non-main thread, or the previous value of wakeup_fd
                # is no longer valid.
                old_wakeup_fd = None

        try:
            while True:
                # 防止多線程模型時產生臟數據
                with self._callback_lock:    
                    callbacks = self._callbacks
                    self._callbacks = []

                due_timeouts = []
                if self._timeouts:         # 將時間已到的定時任務放置到due_timeouts中
                    now = self.time()
                    while self._timeouts:
                        if self._timeouts[0].callback is None:
                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    if (self._cancellations > 512 and
                            self._cancellations > (len(self._timeouts) >> 1)):
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)

                for callback in callbacks:        # 執行callbacks
                    self._run_callback(callback)
                for timeout in due_timeouts:    # 執行timeout_callback
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                # 釋放內存
                callbacks = callback = due_timeouts = timeout = None

                if self._callbacks: # 如果在執行callbacks 或者 timeouts的過程中,他們執行了add_callbacks ,那么這時:self._callbacks就非空了,
                    # 為了盡快的執行其中的callbacks,我們需要將poll_timeout 設置為0,這樣我們就不需要等待fd事件發生,盡快運行callbacks了
                    poll_timeout = 0.0
                elif self._timeouts:
                    # If there are any timeouts, schedule the first one.
                    # Use self.time() instead of 'now' to account for time
                    # spent running callbacks.
                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:
                    # 如果沒有回調函數也沒有定時任務,我們就使用默認值
                    poll_timeout = _POLL_TIMEOUT

                if not self._running:    # 終止ioloop運行時,在執行完了callback后結束循環
                    break

                if self._blocking_signal_threshold is not None:
                    # clear alarm so it doesn't fire while poll is waiting for
                    # events.
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    # http://blog.csdn.net/benkaoya/article/details/17262053 解釋EINTR是什么。系統調用被信號處理函數中斷,進行下一次循環
                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                # 從一組待處理的fds中一次彈出一個fd並運行其處理程序。 
                # 由於該處理程序可能會對其他文件描述符執行操作,因此可能會重新調用此IOLoop來修改self._events
                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()             # 獲取一個fd以及對應事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 獲取該fd對應的事件處理函數
                        handler_func(fd_obj, events)                # 運行該事件處理函數
                    except (OSError, IOError) as e:         
                        if errno_from_exception(e) == errno.EPIPE:
                            # 當客戶端關閉連接時會產生EPIPE錯誤
                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                # 釋放內存空間
                fd_obj = handler_func = None                        

        finally:
            # reset the stopped flag so another start/stop pair can be issued
            self._stopped = False 
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)
start完整版

5 關閉io多路復用器

def close(self, all_fds=False):
        with self._callback_lock:
            self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:    # 該參數若為True,則表示會關閉所有文件描述符
            for fd, handler in self._handlers.values():
                self.close_fd(fd)
        self._waker.close()
        self._impl.close() 
        self._callbacks = None
        self._timeouts = None

 四 參考 

  https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/
  https://www.zhihu.com/question/20021164
  http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752
  http://blog.csdn.net/benkaoya/article/details/17262053


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM