[原]tornado源碼分析系列(二)[網絡層 IOLoop類]


引言:上一章起了個頭,講了tornado的源碼結構和IOLoop的簡單Demo,這一章就IOLoop類的方法來看看IOLoop提供了哪些功能。

看看IOLoop的類組織結構

	|---IOLoop
			---__init__(self, impl=None)
			---instance(cls)
			---initialized(cls)
			---add_handler(self, fd, handler, events)
			---update_handler(self, fd, events)
			---remove_handler(self, fd)
			---set_blocking_signal_threshold(self, seconds, action)
			---set_blocking_log_threshold(self, seconds)
			---log_stack(self, signal, frame)
			---start(self)
			---stop(self)
			---running(self)
			---add_timeout(self, deadline, callback)
			---remove_timeout(self, timeout)
			---add_callback(self, callback)
			---_wake(self)
			---_run_callback(self, callback)
			---handle_callback_exception(self, callback)
			---_read_waker(self, fd, events)
			---_set_nonblocking(self, fd)
			---_set_close_exec(self, fd)
	---|

從上一章的Demo里面可以看到最重要的對外提供的方法有

0.instance() @classmethod

1.add_handler(...)

2.start()

類似於傳統的事件驅動方式,這里的使用方式也很簡單

 

從IOLoop類中看起:

先是自己定義了幾個EPOLL的宏,就是EPOLL的事件類型

#epoll 的事件類型,類似於這里的宏定義
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)

# Our events map exactly to the epoll events
#將這幾個事件類型重定義一番
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

常用的就是三種,READ,WRITE,ERROR

#ioloop的構造函數
    def __init__(self, impl=None):
        #選擇異步事件循環監聽方式,默認是epoll,后面的_impl都是指的是epoll
        self._impl = impl or _poll()
        #自省,查看 self._impl 中是否有 fileno
        #如果有,就關閉起exec性質
        if hasattr(self._impl, 'fileno'):
            self._set_close_exec(self._impl.fileno())
        # _set_close_exec 是一個類方法,下面有定義
  	# 當 FD_CLOEXEC 設置了以后,exec() 函數執行的時候會自動關閉描述符
"""     def _set_close_exec(self, fd):
            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
            fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)   """
        #handlers 是一個函數集字典
        self._handlers = {}
        self._events = {}
        #回調函數使用的是列表
        self._callbacks = []
        #用來記錄鏈接超時
        self._timeouts = []
        self._running = False
        self._stopped = False
        self._blocking_signal_threshold = None

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        #判斷是否是 NT 操作系統
        if os.name != 'nt':
            #創建一個管道 ,返回的為讀寫兩端的文件描述符
            r, w = os.pipe()
            #設置為非阻塞
            self._set_nonblocking(r)
            self._set_nonblocking(w)
           
            self._set_close_exec(r)
            self._set_close_exec(w)
            #分別以讀方式和寫方式打開管道
            self._waker_reader = os.fdopen(r, "rb", 0)
            self._waker_writer = os.fdopen(w, "wb", 0)
        else:
            #如若不是 NT 系統,改用win32 支持的管道類型
            self._waker_reader = self._waker_writer = win32_support.Pipe()
            r = self._waker_writer.reader_fd
        #將 管道的  read端與 函數 _read_waker  關聯,事件類型為 READ
        #這里也是IO 多路復用的一種機制,將管道的描述符也添加進多路復用的IO 管理
        self.add_handler(r, self._read_waker, self.READ)

注意最后的幾點,將管道描述符的讀端也加入事件循環檢查,並設置相應的回調函數,這樣做的好處是以便事件循環阻塞而沒有相應描述符出現,需要在最大timeout時間之前返回,就可以向這個管道發送一個字符,用來終止阻塞在監聽階段的事件循環監聽函數。

看看waker是這樣定義的:

    def _wake(self):
        try:
            self._waker_writer.write("x")
        except IOError:
            pass

需要喚醒阻塞中的事件循環監聽函數的時候,只需要向管道寫入一個字符,就可以提前結束循環

instance就是簡單的返回一個實例:

    def instance(cls):
        """Returns a global IOLoop instance.

        Most single-threaded applications have a single, global IOLoop.
        Use this method instead of passing around IOLoop instances
        throughout your code.

        A common pattern for classes that depend on IOLoops is to use
        a default argument to enable programs with multiple IOLoops
        but not require the argument for simpler applications:

            class MyClass(object):
                def __init__(self, io_loop=None):
                    self.io_loop = io_loop or IOLoop.instance()
        """
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

instance()是一個靜態方法,代表此IOLoop是一個單實例方法,一個進程只有一個

在add_handler()里面

    #將文件描述符發生相應的事件時的回調函數對應
    def add_handler(self, fd, handler, events):
        """Registers the given handler to receive the given events for fd."""
        self._handlers[fd] = stack_context.wrap(handler)
        #在 epoll 中注冊對應事件
        #epoll_ctl
        self._impl.register(fd, events | self.ERROR)
    #更新相應的事件類型

可以看到,使用字典的方式,每一個fd就對應一個handler,下次事件循環返回的時候按照返回后的fd列表,依次調用相應的callback

|------

在tornado中,函數是通過stack_context.wrap()包裝過,可以用來記錄上下文

如果需要調用被包裝過的函數,需要調用方法

_run_callback(self, callback)  

這個函數將包裝過的callback作為參數出入,然后執行函數

    def _run_callback(self, callback):
        try:
            callback()
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handle_callback_exception(callback)

當函數執行發生異常時,可以記錄下函數執行狀態  

-------|

_impl.register就是被封裝過的epoll的epoll_ctl,參數是EPOLL_CTL_ADD

見同一個文件下的_EPoll類

class _EPoll(object):
    """An epoll-based event loop using our C module for Python 2.5 systems"""
    _EPOLL_CTL_ADD = 1
    _EPOLL_CTL_DEL = 2
    _EPOLL_CTL_MOD = 3

    def __init__(self):
        self._epoll_fd = epoll.epoll_create()

    def fileno(self):
        return self._epoll_fd

    def register(self, fd, events):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)

    def modify(self, fd, events):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)

    def unregister(self, fd):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)

    def poll(self, timeout):
        return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))

  

總結:這一章講了IOLoop中的幾個重要函數,后面依次會有分析其他方法,還有其中一些細節值得平常注意的。  

  

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM