引言:由於都是在工作當中抽出時間看源代碼,所以更新速度比較慢,但是還是希望通過對好的源碼的分析和探討,大家相互學習,發現不好的地方共同討論。
上次講了IOLoop中的幾個重要的方法,inistance() 和 add_handler() .. 今天看看Demo中一個最重要的方法,start(),順帶用stop()收尾
def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ if self._stopped: self._stopped = False return self._running = True while True: # Never use an infinite timeout here - it can stall epoll poll_timeout = 0.2 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. callbacks = self._callbacks self._callbacks = [] #先運行注冊了的回調函數 for callback in callbacks: self._run_callback(callback) if self._callbacks: poll_timeout = 0.0 #檢查超時事件 #方法是,在timeout這個bisect的排序的列表,每次取出頭部最小的一個 #將deadline與當前時間比較,如果 <= 當前時間,就認為超時,然后調用相應的超時處理的回調函數 #這里不好理解的是deadline <= 當前時間 , 如果說deadline 大於當前時間,就代表還沒有到 #超時條件 #循環檢查,直到超時事件處理完成 #值得一說的是在libevent中是使用了最小堆每次取出當前的最小deadline #由於最小堆的特性,每次從頭取出的都是最小的 #Nginx的網絡模塊是用的紅黑樹來做,原理也是一樣的 if self._timeouts: now = time.time() while self._timeouts and self._timeouts[0].deadline <= now: timeout = self._timeouts.pop(0) self._run_callback(timeout.callback) #處理完了超時時間之后,需要將epoll最大阻塞時間改為小於當前最小超時時間的絕對值 #不然可能在epoll返回后,本來不屬於超時事件的事件被超時 if self._timeouts: milliseconds = self._timeouts[0].deadline - now poll_timeout = min(milliseconds, poll_timeout) #判斷“反應堆”是否結束 #結束有兩個方式,一個是設置_running 標志位,第二個就是往寫管道寫入"x" if not self._running: break #從注釋中可以看出,每次進入epoll等待事件之前都需要把sigalrm清空,以免在 #epoll阻塞期間收到信號,在epoll完成后重新設置 if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) #進入epoll循環 try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: #在 epoll和 select 阻塞過程當中,經常會收到系統或者其他方式發過來的信號,這 #時候系統的 errno 會被設置為 EINTR ,如果將遇到這樣的情況,直接重啟epoll就可以 #如果不是這樣的錯誤,則看做是致命錯誤 # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise #將被阻塞的sigalarm 還原 , 第二個參數是最大阻塞閾值 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events #將新的事件加入到待處理隊列中,現代非阻塞的網絡庫都使用的是這種方式 self._events.update(event_pairs) #作者在寫這段代碼的過程當中不是使用的簡單的順序遍歷這個隊列,而使用的方式是 #將就緒事件逐個彈出,以防止在處理過程當中就緒事件發生改變 while self._events: fd, events = self._events.popitem() #在處理過程當中,常常會遇到客戶端異常終止的情況 #一般情況下如果讀取錯誤,服務端會產生一個 sigpipe信號 #這時候需要忽略這個信號 #這里我有一個疑問就是為什么在add_handler 的時候 handler是經過 context.wrap包裝過的 #而在這里是直接調用,按道理應該是通過_running_callback調用,不過這里顯然處理了異常情況了 try: self._handlers[fd](fd, events) except (KeyboardInterrupt, SystemExit): raise except (OSError, IOError), e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) # reset the stopped flag so another start/stop pair can be issued self._stopped = False #將定時事件清空 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0)
這段代碼中值得注意的部分就是在幾個方面:
1.超時事件的處理,timeout是一個排序后的列表,每次都是取得最前面最小的一個
2.在開始epoll循環的過程當中,設置阻塞sigalarm
3.在處理事件過程當中忽略sigpipe信號
4.在處理就緒事件過程當中,是通過每次pop一個來處理,而不是一次遍歷
stop()函數
def stop(self): """Stop the loop after the current event loop iteration is complete. If the event loop is not currently running, the next call to start() will return immediately. To use asynchronous methods from otherwise-synchronous code (such as unit tests), you can start and stop the event loop like this: ioloop = IOLoop() async_method(ioloop=ioloop, callback=ioloop.stop) ioloop.start() ioloop.start() will return after async_method has run its callback, whether that callback was invoked before or after ioloop.start. """ self._running = False self._stopped = True self._wake()
簡單的設置標志位后,向管道發送"x"停止事件循環
總結:IOLoop差不多就是這些內容,利用python簡單和高可讀性,看網絡模塊的實現會讓我們更加的專注於
實現,而不是繁瑣的基礎代碼的使用過程。
后面將看看IOStream類,是建立在IOLoop的一個上層封裝,實現了基本的buffer事件