Python Tornado框架(ioloop對象分析)


網上都說nginx和lighthttpd是高性能web服務器,而tornado也是著名的高抗負載應用,它們間有什么相似處呢?上節提到的ioloop對象是如何循環的呢?往下看。

首先關於TCP服務器的開發上節已經提過,很明顯那個三段式的示例是個效率很低的(因為只有一個連接被端開新連接才能被接受)。要想開發高性能的服務器,就得在這accept上下功夫。

首先,新連接的到來一般是經典的三次握手,只有當服務器收到一個SYN時才說明有一個新連接(還沒建立),這時監聽fd是可讀的可以調用accept,此前服務器可以干點別的,這就是SELECT/POLL/EPOLL的思路。而只有三次握手成功后,accept才會返回,此時監聽fd是讀完成狀態,似乎服務器在此之前可以轉身去干別的,等到讀完成再調用accept就不會有延遲了,這就是AIO的思路,不過在*nix平台上好像支持不是很廣。。。再有,accept得到的新fd,不一定是可讀的(客戶端請求還沒到達),所以可以等新fd可讀時在read()(可能會有一點延遲),也可以用AIO等讀完后再read就不會延遲了。同樣類似,對於write,close也有類似的事件。

總的思路就是,在我們關心的fd上注冊關心的多個事件,事件發生了就啟動回調,沒發生就看點別的。這是單線程的,多線程的復雜一點,但差不多。nginx和lightttpd以及tornado都是類似的方式,只不過是多進程和多線程或單線程的區別而已。為簡便,我們只分析tornado單線程的情況。

關於ioloop.py的代碼,主要有兩個要點。一個是configurable機制,一個就是epoll循環。先看epoll循環吧。IOLoop 類的start是循環所在,但它必須被子類覆蓋實現,因此它的start在PollIOLoop里。略過循環外部的多線程上下文環境的保存與恢復,單看循環:

while True:
poll_timeout = 3600.0

# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
	callbacks = self._callbacks
	self._callbacks = []
for callback in callbacks:
	self._run_callback(callback)

if self._timeouts:
	now = self.time()
	while self._timeouts:
		if self._timeouts[0].callback is None:
			# the timeout was cancelled
			heapq.heappop(self._timeouts)
		elif self._timeouts[0].deadline <= now:
			timeout = heapq.heappop(self._timeouts)
			self._run_callback(timeout.callback)
		else:
			seconds = self._timeouts[0].deadline - now
			poll_timeout = min(seconds, poll_timeout)
			break

if self._callbacks:
	# If any callbacks or timeouts called add_callback,
	# we don't want to wait in poll() before we run them.
	poll_timeout = 0.0

if not self._running:
	break

if self._blocking_signal_threshold is not None:
	# clear alarm so it doesn't fire while poll is waiting for
	# events.
	signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:
	event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
	# Depending on python version and IOLoop implementation,
	# different exception types may be thrown and there are
	# two ways EINTR might be signaled:
	# * e.errno == errno.EINTR
	# * e.args is like (errno.EINTR, 'Interrupted system call')
	if (getattr(e, 'errno', None) == errno.EINTR or
		(isinstance(getattr(e, 'args', None), tuple) and
		 len(e.args) == 2 and e.args[0] == errno.EINTR)):
		continue
	else:
		raise

if self._blocking_signal_threshold is not None:
	signal.setitimer(signal.ITIMER_REAL,
					 self._blocking_signal_threshold, 0)

# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
	fd, events = self._events.popitem()
	try:
		self._handlers[fd](fd, events)
	except (OSError, IOError) as e:
		if e.args[0] == errno.EPIPE:
			# Happens when the client closes the connection
			pass
		else:
			app_log.error("Exception in I/O handler for fd %s",
						  fd, exc_info=True)
	except Exception:
		app_log.error("Exception in I/O handler for fd %s",
					  fd, exc_info=True)

首先是設定超時時間。然后在互斥鎖下取出上次循環遺留下的回調列表(在add_callback添加對象),把這次列表置空,然后依次執行列表里的回調。這里的_run_callback就沒什么好分析的了。緊接着是檢查上次循環遺留的超時列表,如果列表里的項目有回調而且過了截止時間,那肯定超時了,就執行對應的超時回調。然后檢查是否又有了事件回調(因為很多回調函數里可能會再添加回調),如果是,則不在poll循環里等待,如注釋所述。接下來最關鍵的一句是event_pairs = self._impl.poll(poll_timeout),這句里的_impl是epoll,在platform/epoll.py里定義,總之就是一個等待函數,當有事件(超時也算)發生就返回。然后把事件集保存下來,對於每個事件,self._handlers[fd](fd, events)根據fd找到回調,並把fd和事件做參數回傳。如果fd是監聽的fd,那么這個回調handler就是accept_handler函數,詳見上節代碼。如果是新fd可讀,一般就是_on_headers 或者 _on_requet_body了,詳見前幾節。我好像沒看到可寫時的回調?以上,就是循環的流程了。可能還是看的糊里糊塗的,因為很多對象怎么來的都不清楚,configurable也還沒有看。看完下面的分析,應該就可以了。

Configurable類在util.py里被定義。類里有一段注釋,已經很明確的說明了它的設計意圖和用法。它是可配置接口的父類,可配置接口對外提供一致的接口標識,但它的子類實現可以在運行時進行configure。一般在跨平台時由於子類實現有多種選擇,這時候就可以使用可配置接口,例如select和epoll。首先注意 Configurable 的兩個函數: configurable_base 和 configurable_default, 兩函數都需要被子類(即可配置接口類)覆蓋重寫。其中,base函數一般返回接口類自身,default返回接口的默認子類實現,除非接口指定了__impl_class。IOLoop及其子類實現都沒有初始化函數也沒有構造函數,其構造函數繼承於Configurable,如下:

def __new__(cls, **kwargs):
	base = cls.configurable_base()
	args = {}
	if cls is base:
		impl = cls.configured_class()
		if base.__impl_kwargs:
			args.update(base.__impl_kwargs)
	else:
		impl = cls
	args.update(kwargs)
	instance = super(Configurable, cls).__new__(impl)
	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
	# singleton magic.  If we get rid of that we can switch to __init__
	# here too.
	instance.initialize(**args)
	return instance

 

當子類對象被構造時,子類__new__被調用,因此參數里的cls指的是Configurabel的子類(可配置接口類,如IOLoop)。先是得到base,查看IOLoop的代碼發現它返回的是自身類。由於base和cls是一樣的,所以調用configured_class()得到接口的子類實現,其實就是調用base(現在是IOLoop)的configurable_default,總之就是返回了一個子類實現(epoll/kqueue/select之一),順便把__impl_kwargs合並到args里。接着把kwargs並到args里。然后調用Configurable的父類(Object)的__new__方法,生成了一個impl的對象,緊接着把args當參數調用該對象的initialize(繼承自PollIOloop,其initialize下段進行分析),返回該對象。

所以,當構造IOLoop對象時,實際得到的是EPollIOLoop或其它相似子類。另外,Configurable 還提供configure方法來給接口指定實現子類和參數。可以看的出來,Configurable類主要提供構造方法,相當於對象工廠根據配置來生產對象,同時開放configure接口以供配置。而子類按照約定調整配置即可得到不同對象,代碼得到了復用。

解決了構造,來看看IOLoop的instance方法。先檢查類是否有成員_instance,一開始肯定沒有,於是就構造了一個IOLoop對象(即EPollIOLoop對象)。以后如果再調用instance,得到的則是已有的對象,這樣就確保了ioloop在全局是單例。再看epoll循環時注意到self._impl,Configurable 和 IOLoop 里都沒有, 這是在哪兒定義的呢? 為什么IOLoop的start跑到PollIOLoop里,應該是EPollIOLoop才對啊。 對,應該看出來了,EPollIOLoop 就是PollIOLoop的子類,所以方法被繼承了是很常見的哈。

從上一段的構造流程里可以看到,EPollIOLoop對象的initialize方法被調用了,看其代碼發現它調用了其父類(PollIOLoop)的它方法, 並指定了impl=select.epoll(), 然后在父類的方法里就把它保存了下來,所以self._impl.poll就等效於select.epoll().poll().PollIOLoop里還有一些注冊,修改,刪除監聽事件的方法,其實就是對self._impl的封裝調用。就如上節的 add_accept_handler 就是調用ioloop的add_handler方法把監聽fd和accept_handler方法進行關聯。

IOLoop基本是個事件循環,因此它總是被其它模塊所調用。而且為了足夠通用,基本上對回調沒多大限制,一個可執行對象即可。事件分發就到此結束了,和IO事件密切相關的另一個部分是IOStream,看看它是如何讀寫的。

IOLoop instance()方法的講解

Tornado 的源碼寫得有點難懂,需要你理解好 socket、epoll 這樣的東西才能充分理解。需要深入到 Tornado 的源碼,ioloop.py 這個文件很關鍵。

接下來,我們繼續讀 ioloop.py 這個文件。

IOLoop 是基於 epoll 實現的底層網絡I/O的核心調度模塊,用於處理 socket 相關的連接、響應、異步讀寫等網絡事件。每個 Tornado 進程都會初始化一個全局唯一的 IOLoop 實例,在 IOLoop 中通過靜態方法 instance() 進行封裝,獲取 IOLoop 實例直接調用此方法即可。

@staticmethod
def instance():
	"""Returns a global `IOLoop` instance.

	Most applications have a single, global `IOLoop` running on the
	main thread.  Use this method to get this instance from
	another thread.  To get the current thread's `IOLoop`, use `current()`.
	"""
	if not hasattr(IOLoop, "_instance"):
		with IOLoop._instance_lock:
			if not hasattr(IOLoop, "_instance"):
				# New instance after double check
				IOLoop._instance = IOLoop()
	return IOLoop._instance

 

Tornado 服務器啟動時會創建監聽 socket,並將 socket 的 file descriptor 注冊到 IOLoop 實例中,IOLoop 添加對 socket 的IOLoop.READ 事件監聽並傳入回調處理函數。當某個 socket 通過 accept 接受連接請求后調用注冊的回調函數進行讀寫。接下來主要分析IOLoop 對 epoll 的封裝和 I/O 調度具體實現。

epoll是Linux內核中實現的一種可擴展的I/O事件通知機制,是對POISX系統中 select 和 poll 的替代,具有更高的性能和擴展性,FreeBSD中類似的實現是kqueue。Tornado中基於Python C擴展實現的的epoll模塊(或kqueue)對epoll(kqueue)的使用進行了封裝,使得IOLoop對象可以通過相應的事件處理機制對I/O進行調度。具體可以參考前面小節的 預備知識:我讀過的對epoll最好的講解 。

IOLoop模塊對網絡事件類型的封裝與epoll一致,分為READ / WRITE / ERROR三類,具體在源碼里呈現為:

# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP

 回到前面章節的 開始用Tornado:從Hello World開始 里面的示例,

http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()

 

前兩句是啟動服務器,啟動服務器之后,還需要啟動 IOLoop 的實例,這樣可以啟動事件循環機制,配合非阻塞的 HTTP Server 工作。更多關於 IOLoop的與Http服務器的細節,在 Tornado對Web請求與響應的處理機制 這里有介紹到。

這就是 IOLoop 的 instance() 方法的一些細節,接下來我們再看看 start() 的細節。

 IOLoop start()里的核心調度

IOLoop的初始化

初始化過程中選擇 epoll 的實現方式,Linux 平台為 epoll,BSD 平台為 kqueue,其他平台如果安裝有C模塊擴展的 epoll 則使用 tornado對 epoll 的封裝,否則退化為 select。

def __init__(self, impl=None):
    self._impl = impl or _poll()
    #省略部分代碼
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = stack_context.wrap(handler)
    self._impl.register(fd, events | self.ERROR)

 

在 IOLoop 初始化的過程中創建了一個 Waker 對象,將 Waker 對象 fd 的讀端注冊到事件循環中並設定相應的回調函數(這樣做的好處是當事件循環阻塞而沒有響應描述符出現,需要在最大 timeout 時間之前返回,就可以向這個管道發送一個字符)。

Waker 的使用:一種是在其他線程向 IOLoop 添加 callback 時使用,喚醒 IOLoop 同時會將控制權轉移給 IOLoop 線程並完成特定請求。喚醒的方法向管道中寫入一個字符'x'。另外,在 IOLoop的stop 函數中會調用self._waker.wake(),通過向管道寫入'x'停止事件循環。

add_handler 函數使用了stack_context 提供的 wrap 方法。wrap 返回了一個可以直接調用的對象並且保存了傳入之前的堆棧信息,在執行時可以恢復,這樣就保證了函數的異步調用時具有正確的運行環境。

IOLoop的start方法

IOLoop 的核心調度集中在 start() 方法中,IOLoop 實例對象調用 start 后開始 epoll 事件循環機制,該方法會一直運行直到 IOLoop 對象調用 stop 函數、當前所有事件循環完成。start 方法中主要分三個部分:一個部分是對超時的相關處理;一部分是 epoll 事件通知阻塞、接收;一部分是對 epoll 返回I/O事件的處理。

  • 為防止 IO event starvation,將回調函數延遲到下一輪事件循環中執行。
  • 超時的處理 heapq 維護一個最小堆,記錄每個回調函數的超時時間(deadline)。每次取出 deadline 最早的回調函數,如果callback標志位為 True 並且已經超時,通過 _run_callback 調用函數;如果沒有超時需要重新設定 poll_timeout 的值。
  • 通過 self._impl.poll(poll_timeout) 進行事件阻塞,當有事件通知或超時時 poll 返回特定的 event_pairs。
  • epoll 返回通知事件后將新事件加入待處理隊列,將就緒事件逐個彈出,通過stack_context.wrap(handler)保存的可執行對象調用事件處理。
while True:
    poll_timeout = 3600.0

    with self._callback_lock:
        callbacks = self._callbacks
        self._callbacks = []
    for callback in callbacks:
        self._run_callback(callback)

    # 超時處理
    if self._timeouts:
        now = time.time()
        while self._timeouts:
            if self._timeouts[0].callback is None:
                # the timeout was cancelled
                heapq.heappop(self._timeouts)
            elif self._timeouts[0].deadline <= now:
                timeout = heapq.heappop(self._timeouts)
                self._run_callback(timeout.callback)
            else:
                seconds = self._timeouts[0].deadline - now
                poll_timeout = min(seconds, poll_timeout)
                break

    if self._callbacks:
        # If any callbacks or timeouts called add_callback,
        # we don't want to wait in poll() before we run them.
        poll_timeout = 0.0

    if not self._running:
        break

    if self._blocking_signal_threshold is not None:
        # clear alarm so it doesn't fire while poll is waiting for events.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

    # epoll阻塞,當有事件通知或超時返回event_pairs
    try:
        event_pairs = self._impl.poll(poll_timeout)
    except Exception, e:
        # 異常處理,省略

    # 對epoll返回event_pairs事件的處理
    self._events.update(event_pairs)
    while self._events:
        fd, events = self._events.popitem()
        try:
            self._handlers[fd](fd, events)
        except Exception e:
            # 異常處理,省略

 

3.0后的一些改動

Tornado3.0以后 IOLoop 模塊的一些改動。

IOLoop 成為 util.Configurable 的子類,IOLoop 中絕大多數成員方法都作為抽象接口,具體實現由派生類 PollIOLoop 完成。IOLoop 實現了 Configurable 中的 configurable_base 和 configurable_default 這兩個抽象接口,用於初始化過程中獲取類類型和類的實現方法(即 IOLoop 中 poller 的實現方式)。

在 Tornado3.0+ 中針對不同平台,單獨出 poller 相應的實現,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均繼承於 PollIOLoop。下邊的代碼是 configurable_default 方法根據平台選擇相應的 epoll 實現。初始化 IOLoop 的過程中會自動根據平台選擇合適的 poller 的實現方法。

@classmethod
def configurable_default(cls):
	if hasattr(select, "epoll"):
		from tornado.platform.epoll import EPollIOLoop
		return EPollIOLoop
	if hasattr(select, "kqueue"):
		# Python 2.6+ on BSD or Mac
		from tornado.platform.kqueue import KQueueIOLoop
		return KQueueIOLoop
	from tornado.platform.select import SelectIOLoop
	return SelectIOLoop

 

IOLoop與Configurable類

IOLoop 是 tornado 的核心。程序中主函數通常調用 tornado.ioloop.IOLoop.instance().start() 來啟動IOLoop,但是看了一下 IOLoop 的實現,start 方法是這樣的:

def start(self):
	"""Starts the I/O loop.

	The loop will run until one of the callbacks calls `stop()`, which
	will make the loop stop after the current event iteration completes.
	"""
	raise NotImplementedError()

 

也就是說 IOLoop 是個抽象的基類,具體工作是由它的子類負責的。由於是 Linux 平台,所以應該用 Epoll,對應的類是 PollIOLoop。PollIOLoop 的 start 方法開始了事件循環。

問題來了,tornado.ioloop.IOLoop.instance() 是怎么返回 PollIOLoop 實例的呢?剛開始有點想不明白,后來看了一下 IOLoop 的代碼就豁然開朗了。

IOLoop 繼承自 Configurable,后者位於 tornado/util.py。

A configurable interface is an (abstract) class whose constructor acts as a factory function for one of its implementation subclasses. The implementation subclass as well as optional keyword arguments to its initializer can be set globally at runtime with configure.

Configurable 類實現了一個工廠方法,也就是設計模式中的“工廠模式”,看一下__new__函數的實現:

def __new__(cls, **kwargs):
	base = cls.configurable_base()
	args = {}
	if cls is base:
		impl = cls.configured_class()
		if base.__impl_kwargs:
			args.update(base.__impl_kwargs)
	else:
		impl = cls
	args.update(kwargs)
	instance = super(Configurable, cls).__new__(impl)
	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
	# singleton magic.  If we get rid of that we can switch to __init__
	# here too.
	instance.initialize(**args)
	return instance

 當創建一個Configurable類的實例的時候,其實創建的是configurable_class()返回的類的實例。

@classmethod
def configured_class(cls):
	"""Returns the currently configured class."""
	base = cls.configurable_base()
	if cls.__impl_class is None:
		base.__impl_class = cls.configurable_default()
	return base.__impl_class

 最后,就是返回的configurable_default()。此函數在IOLoop中的實現如下:

@classmethod
def configurable_default(cls):
	if hasattr(select, "epoll"):
		from tornado.platform.epoll import EPollIOLoop
		return EPollIOLoop
	if hasattr(select, "kqueue"):
		# Python 2.6+ on BSD or Mac
		from tornado.platform.kqueue import KQueueIOLoop
		return KQueueIOLoop
	from tornado.platform.select import SelectIOLoop
	return SelectIOLoop

 EPollIOLoop 是 PollIOLoop 的子類。至此,這個流程就理清楚了。

 

對socket封裝的IOStream機制概覽

IOStream對socket讀寫進行了封裝,分別提供讀、寫緩沖區實現對socket的異步讀寫。當socket被accept之后HTTPServer的_handle_connection會被回調並初始化IOStream對象,進一步通過IOStream提供的功能接口完成socket的讀寫。文章接下來將關注IOStream實現讀寫的細節。

 

IOStream的初始化

IOStream初始化過程中主要完成以下操作:

  1. 綁定對應的socket
  2. 綁定ioloop
  3. 創建讀緩沖區_read_buffer,一個python deque容器
  4. 創建寫緩沖區_write_buffer,同樣也是一個python deque容器

IOStream提供的主要功能接口

主要的讀寫接口包括以下四個:

class IOStream(object):
	def read_until(self, delimiter, callback): 
	def read_bytes(self, num_bytes, callback, streaming_callback=None): 
	def read_until_regex(self, regex, callback): 
	def read_until_close(self, callback, streaming_callback=None): 
	def write(self, data, callback=None):

 

  • read_until和read_bytes是最常用的讀接口,它們工作的過程都是先注冊讀事件結束時調用的回調函數,然后調用_try_inline_read方法。_try_inline_read首先嘗試_read_from_buffer,即從上一次的讀緩沖區中取數據,如果有數據直接調用 self._run_callback(callback, self._consume(data_length)) 執行回調函數,_consume消耗掉了_read_buffer中的數據;否則即_read_buffer之前沒有未讀數據,先通過_read_to_buffer將數據從socket讀入_read_buffer,然后再執行_read_from_buffer操作。read_until和read_bytes的區別在於_read_from_buffer過程中截取數據的方法不同,read_until讀取到delimiter終止,而read_bytes則讀取num_bytes個字節終止。執行過程如下圖所示

 

  • read_until_regex相當於delimiter為某一正則表達式的read_until。
  • read_until_close主要用於IOStream流關閉前后的讀取:如果調用read_until_close時stream已經關閉,那么將會_consume掉_read_buffer中的所有數據;否則_read_until_close標志位設為True,注冊_streaming_callback回調函數,調用_add_io_state添加io_loop.READ狀態。
  • write首先將data按照數據塊大小WRITE_BUFFER_CHUNK_SIZE分塊寫入write_buffer,然后調用handle_write向socket發送數據。

其他內部功能接口

  • def _handle_events(self, fd, events): 通常為IOLoop對象add_handler方法傳入的回調函數,由IOLoop的事件機制來進行調度。
  • def _add_io_state(self, state): 為IOLoop對象的handler注冊IOLoop.READ或IOLoop.WRITE狀態,handler為IOStream對象的_handle_events方法。
  • def _consume(self, loc): 合並讀緩沖區loc個字節,從讀緩沖區刪除並返回這些數據。

Tornado的多進程管理分析

Tornado的多進程管理我們可以參看process.py這個文件。

在編寫多進程的時候我們一般都用python自帶的multiprocessing,使用方法和threading基本一致,只需要繼承里面的Process類以后就可以編寫多進程程序了,這次我們看看tornado是如何實現他的multiprocessing,可以說實現的功能不多,但是更加簡單高效。

我們只看fork_process里面的代碼:

global _task_id
    assert _task_id is None
    if num_processes is None or num_processes <= 0:
        num_processes = cpu_count()
    if ioloop.IOLoop.initialized():
        raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                           "has already been initialized. You cannot call "
                           "IOLoop.instance() before calling start_processes()")
    logging.info("Starting %d processes", num_processes)
    children = {}

 這一段很簡單,就是在沒有傳入進程數的時候使用默認的cpu個數作為將要生成的進程個數。

def start_child(i):
	pid = os.fork()
	if pid == 0:
		# child process
		_reseed_random()
		global _task_id
		_task_id = i
		return i
	else:
		children[pid] = i
		return None

 

這是一個內函數,作用就是生成子進程。fork是個很有意思的方法,他會同時返回兩種狀態,為什么呢?其實fork相當於在原有的一條路(父進程)旁邊又修了一條路(子進程)。如果這條路修成功了,那么在原有的路上(父進程)你就看到旁邊來了另外一條路(子進程),所以也就是返回新生成的那條路的名字(子進程的pid),但是在另外一條路上(子進程),你看到的是自己本身修建成功了,也就返回自己的狀態碼(返回結果是0)。

所以if pid==0表示這時候cpu已經切換到子進程了,相當於我們在新生成的這條路上面做事(返回任務id);else表示又跑到原來的路上做事了,在這里我們記錄下新生成的子進程,這時候children[pid]=i里面的pid就是新生成的子進程的pid,而 i 就是剛才在子進程里面我們返回的任務id(其實就是用來代碼子進程的id號)。

for i in range(num_processes):
	id = start_child(i)
	if id is not None:
		return id

 if id is not None表示如果我們在剛剛生成的那個子進程的上下文里面,那么就什么都不干,直接返回子進程的任務id就好了,啥都別想了,也別再折騰。如果還在父進程的上下文的話那么就繼續生成子進程。

num_restarts = 0
    while children:
        try:
            pid, status = os.wait()
        except OSError, e:
            if e.errno == errno.EINTR:
                continue
            raise
        if pid not in children:
            continue
        id = children.pop(pid)
        if os.WIFSIGNALED(status):
            logging.warning("child %d (pid %d) killed by signal %d, restarting",
                            id, pid, os.WTERMSIG(status))
        elif os.WEXITSTATUS(status) != 0:
            logging.warning("child %d (pid %d) exited with status %d, restarting",
                            id, pid, os.WEXITSTATUS(status))
        else:
            logging.info("child %d (pid %d) exited normally", id, pid)
            continue
        num_restarts += 1
        if num_restarts > max_restarts:
            raise RuntimeError("Too many child restarts, giving up")
        new_id = start_child(id)
        if new_id is not None:
            return new_id

 

剩下的這段代碼都是在父進程里面做的事情(因為之前在子進程的上下文的時候已經返回了,當然子進程並沒有結束)。

pid, status = os.wait()的意思是等待任意子進程退出或者結束,這時候我們就把它從我們的children表里面去除掉,然后通過status判斷子進程退出的原因。

如果子進程是因為接收到kill信號或者拋出exception了,那么我們就重新啟動一個子進程,用的當然還是剛剛退出的那個子進程的任務號。如果子進程是自己把事情做完了才退出的,那么就算了,等待別的子進程退出吧。

我們看到在重新啟動子進程的時候又使用了

if new_id is not None:
    return new_id

 主要就是退出子進程的空間,只在父進程上面做剩下的事情,不然剛才父進程的那些代碼在子進程里面也會同樣的運行,就會形成無限循環了,我沒試過,不如你試試?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM