深入tornado中的協程


tornado使用了單進程(當然也可以多進程) + 協程 + I/O多路復用的機制,解決了C10K中因為過多的線程(進程)的上下文切換 而導致的cpu資源的浪費。

tornado中的I/O多路復用前面已經講過了。本文不做詳細解釋。

來看一下tornado中的協程模塊:tornado.gen:

tornado.gen是根據生成器(generator)實現的,用來更加簡單的實現異步。

先來說一下tornado.gen.coroutine的實現思路:

  我們知道generator中的yield語句可以使函數暫停執行,而send()方法則可以恢復函數的執行。

  tornado將那些異步操作放置到yield語句后,當這些異步操作完成后,tornado會將結果send()至generator中恢復函數執行。

在tornado的官方文檔中有這么一句話:

Most asynchronous functions in Tornado return a Future; yielding this object returns its result.

就是說:

  在tornado中大多數的異步操作返回一個Future對象

  yield Future對象 會返回該異步操作的結果,這句話的意思就是說 假如 ret = yield some_future_obj 當some_future_obj所對應的異步操作完成后會自動的將該異步操作的結果賦值給 ret

那么,Future對象到底是什么?

一  Future對象

先來說說Future對象:

Future對象可以概括為: 一個異步操作的占位符,當然這個占位符有些特殊,它特殊在:

  1 這個占位符是一個對象

  2 這個對象包含了很多屬性,包括_result 以及 _callbacks,分別用來存儲異步操作的結果以及回調函數

  3 這個對象包含了很多方法,比如添加回調函數,設置異步操作結果等。

  4 當這個對象對應的異步操作完成后,該對象會被set_done,然后遍歷並運行_callbacks中的回調函數

來看一下Future的簡化版

class Future(object):
    '''
        Future對象主要保存一個回調函數列表_callbacks與一個執行結果_result,當我們set_result時,就會執行_callbacks中的函數
        如果set_result或者set_done,就會遍歷_callbacks列表並執行callback(self)函數
    '''
    def __init__(self):
        self._result = None    # 執行的結果
        self._callbacks = []    # 用來保存該future對象的回調函數

    def result(self, timeout=None):
        # 如果操作成功,返回結果。如果失敗則拋出異常
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def _set_done(self):
        # 執行結束(成功)后的操作。
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('Exception in callback %r for %r', cb, self)
        self._callbacks = None

完整源碼:

class Future(object):
    '''
        Future對象主要保存一個回調函數列表_callbacks與一個執行結果_result,當我們set_result時,就會執行_callbacks中的函數
    '''
    def __init__(self):
        self._done = False  # 是否執行完成
        self._result = None    # 執行的結果
        self._exc_info = None    # 執行的異常信息

        self._log_traceback = False   # Used for Python >= 3.4
        self._tb_logger = None        # Used for Python <= 3.3

        self._callbacks = []    # 用來保存該future對象的回調函數

    # Implement the Python 3.5 Awaitable protocol if possible
    # (we can't use return and yield together until py33).
    if sys.version_info >= (3, 3):
        exec(textwrap.dedent("""
        def __await__(self):
            return (yield self)
        """))
    else:
        # Py2-compatible version for use with cython.
        def __await__(self):
            result = yield self
            # StopIteration doesn't take args before py33,
            # but Cython recognizes the args tuple.
            e = StopIteration()
            e.args = (result,)
            raise e

    def cancel(self):
        """Cancel the operation, if possible. 如果可能的話取消操作
        tornado對象不支持取消操作,所以總是返回False
        """
        return False

    def cancelled(self):
        # 同上
        return False

    def running(self):
        """Returns True if this operation is currently running."""
        return not self._done

    def done(self):
        """Returns True if the future has finished running."""
        return self._done

    def _clear_tb_log(self):
        self._log_traceback = False
        if self._tb_logger is not None:
            self._tb_logger.clear()
            self._tb_logger = None

    def result(self, timeout=None):
        """If the operation succeeded, return its result.  If it failed,
        re-raise its exception. 如果操作成功,返回結果。如果失敗則拋出異常

        This method takes a ``timeout`` argument for compatibility with
        `concurrent.futures.Future` but it is an error to call it
        before the `Future` is done, so the ``timeout`` is never used.
        """
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def exception(self, timeout=None):
        """If the operation raised an exception, return the `Exception`
        object.  Otherwise returns None.

        This method takes a ``timeout`` argument for compatibility with
        `concurrent.futures.Future` but it is an error to call it
        before the `Future` is done, so the ``timeout`` is never used.
        """
        self._clear_tb_log()
        if self._exc_info is not None:
            return self._exc_info[1]
        else:
            self._check_done()
            return None

    def add_done_callback(self, fn):
        """Attaches the given callback to the `Future`. 將callback附加到

        It will be invoked with the `Future` as its argument when the Future
        has finished running and its result is available.  In Tornado
        consider using `.IOLoop.add_future` instead of calling
        `add_done_callback` directly.
        """
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        """Sets the result of a ``Future``. 將 result 設置為該future對象的結果

        It is undefined to call any of the ``set`` methods more than once
        on the same object.
        """
        self._result = result
        self._set_done()

    def set_exception(self, exception):
        """Sets the exception of a ``Future.``"""
        self.set_exc_info(
            (exception.__class__,
             exception,
             getattr(exception, '__traceback__', None)))

    def exc_info(self):
        """Returns a tuple in the same format as `sys.exc_info` or None.

        .. versionadded:: 4.0
        """
        self._clear_tb_log()
        return self._exc_info

    def set_exc_info(self, exc_info):
        """Sets the exception information of a ``Future.``

        Preserves tracebacks on Python 2.

        .. versionadded:: 4.0
        """
        self._exc_info = exc_info
        self._log_traceback = True
        if not _GC_CYCLE_FINALIZERS:
            self._tb_logger = _TracebackLogger(exc_info)

        try:
            self._set_done()
        finally:
            # Activate the logger after all callbacks have had a
            # chance to call result() or exception().
            if self._log_traceback and self._tb_logger is not None:
                self._tb_logger.activate()
        self._exc_info = exc_info

    def _check_done(self):
        if not self._done:
            raise Exception("DummyFuture does not support blocking for results")

    def _set_done(self):
        # 執行結束(成功)后的操作。
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('Exception in callback %r for %r', cb, self)
        self._callbacks = None

    # On Python 3.3 or older, objects with a destructor part of a reference
    # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to
    # the PEP 442.
    if _GC_CYCLE_FINALIZERS:
        def __del__(self):
            if not self._log_traceback:
                # set_exception() was not called, or result() or exception()
                # has consumed the exception
                return

            tb = traceback.format_exception(*self._exc_info)

            app_log.error('Future %r exception was never retrieved: %s',
                          self, ''.join(tb).rstrip())
Future源碼

 

二  gen.coroutine裝飾器

tornado中的協程是通過tornado.gen中的coroutine裝飾器實現的:

def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)
_make_coroutine_wrapper :
def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        '''
            大體過程:
            future = TracebackFuture()  
            result = func(*args, **kwargs)
            if isinstance(result, GeneratorType):
                yielded = next(result)
                Runner(result, future, yielded)
            return future
        '''
        future = TracebackFuture()                   # TracebackFuture = Future

        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)           # 執行func,若func中包含yield,則返回一個generator對象
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):      # 判斷其是否為generator對象
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)            # 第一次執行
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)  # Runner(result, future, yield) try:
                    return future            
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

先來看一下大體過程:

  1  首先生成一個Future對象

  2  運行該被裝飾函數並將結果賦值給result。 在這里因為tornado的'異步'實現是基於generator的,所以一般情況下 result是一個generator對象

  3  yielded = next(result)  執行到被裝飾函數的第一次yield,將結果賦值給yielded。一般情況下,yielded很大情況下是一個Future對象。

  4  Runner(result, future, yielded)

  5  return future

除了第4步以外其他都很好理解,所以來了解一下第四步Runner()干了些啥:

三  Runner()類

1 為什么要有Runner()?或者說Runner()的作用是什么?

Runner()可以自動的將異步操作的結果send()至生成器中止的地方

tornado的協程或者說異步是基於generator實現的,generator較為常用的有兩個方法:send() next() ,關於這兩個方法的流程分析在這

很多情況下會有generator的嵌套。比如說經常會yield 一個generator。當A生成器yield B生成器時,分兩步:

  1 我們首先中止A的執行轉而執行B

  2 當B執行完成后,我們需要將B的結果send()至A中止的地方,繼續執行A

Runner()主要就是來做這些的,也就是控制生成器的執行與中止,並在合適的情況下使用send()方法同時傳入B生成器的結果喚醒A生成器。

來看一個簡單例子:

def run():
    print('start running')
    yield 2     # 跑步用時2小時

def eat():
    print('start eating')
    yield 1     # 吃飯用時1小時

def time():
    run_time = yield run()
    eat_time = yield eat()
    print(run_time+eat_time)

def Runner(gen):
    r = next(gen)
    return r

t = time()
try:
    action = t.send(Runner(next(t)))
    t.send(Runner(action))
except StopIteration:
    pass
View Code

上例中的Runner()僅僅完成了第一步,我們還需要手動的執行第二步,而tornado的gen的Runner()則做了全套奧!

2 剖析Runner()

在Runner()中主要有三個方法__init__  handle_yield  run:

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen                        # 一個generator對象
        self.result_future = result_future    # 一個Future對象
        self.future = _null_future            # 一個剛初始化的Future對象  _null_future = Future(); _null_future.set_result(None)
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        if self.handle_yield(first_yielded):
            self.run()
    
    ………… 部分方法省略
    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()

                    if exc_info is not None:
                        yielded = self.gen.throw(*exc_info)
                        exc_info = None
                    else:
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        # If we ran cleanly without waiting on all callbacks
                        # raise an error (really more of a warning).  If we
                        # had an exception then some callbacks may have been
                        # orphaned, so skip the check in that case.
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_exc_info(sys.exc_info())
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

    def handle_yield(self, yielded):
        if _contains_yieldpoint(yielded):    # 檢查其中是否包含YieldPoint
            yielded = multi(yielded)

        if isinstance(yielded, YieldPoint):        # Base class for objects that may be yielded from the generator
            self.future = TracebackFuture()        # 一個剛剛初始化的Future對象

            def start_yield_point():
                try:
                    yielded.start(self)
                    if yielded.is_ready():
                        self.future.set_result(yielded.get_result())
                    else:
                        self.yield_point = yielded
                except Exception:
                    self.future = TracebackFuture()
                    self.future.set_exc_info(sys.exc_info())

            if self.stack_context_deactivate is None:
                with stack_context.ExceptionStackContext(self.handle_exception) as deactivate:
                    self.stack_context_deactivate = deactivate
                    
                    def cb():
                        start_yield_point()
                        self.run()
                    self.io_loop.add_callback(cb)
                    return False
            else:
                start_yield_point()
        else:
            try:
                self.future = convert_yielded(yielded)
            except BadYieldError:
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())

        if not self.future.done() or self.future is moment:  # moment = Future()
            self.io_loop.add_future(self.future, lambda f: self.run()) # 為該future添加callback
            return False
        return True
Runner()

2.1 __init__方法

__init__ 里面執行了一些初始化的操作,最主要是最后兩句:

if self.handle_yield(first_yielded): # 運行
    self.run()

2.2 handle_yield方法

handle_yield(self, yielded) 函數,這個函數顧名思義,就是用來處理yield返回的對象的。

首先我們假設yielded是一個Future對象(因為這是最常用的情況),這樣的話代碼就縮減了很多

def handle_yield(self, yielded):
        self.future = convert_yielded(yielded)                         # 如果yielded是Future對象則原樣返回 if not self.future.done() or self.future is moment:            # moment是tornado初始化時就建立的一個Future對象,且被set_result(None)
            self.io_loop.add_future(self.future, lambda f: self.run()) # 為該future添加callback
            return False
        return True

也就是干了三步:

  首先解析出self.future  

  然后判斷self.future對象是否已經被done(完成),如果沒有的話為其添加回調函數,這個回調函數會執行self.run()

  返回self.future對象是否被done

總體來說,handle_yield返回yielded對象是否被set_done,如果沒有則為yielded對象添加回調函數,這個回調函數執行self.run()

還有一個有趣的地方,就是上面代碼的第四行:  self.io_loop.add_future(self.future, lambda f: self.run()) 

def add_future(self, future, callback):
    # 為future添加一個回調函數,這個回調函數的作用是:將參數callback添加至self._callbacks中
    # 大家思考一個問題: 如果某個Future對象被set_done,那么他的回調函數應該在什么時候執行? 
    # 是立即執行亦或者是將回調函數添加到IOLoop實例的_callbacks中進行統一執行? 
    # 雖然前者更簡單,但導致回調函數的執行過於混亂,我們應該讓所有滿足執行條件的回調函數統一執行。顯然后者更合理
    # 而add_future()的作用就是這樣
    future.add_done_callback(lambda future: self.add_callback(callback, future))
        
def add_callback(self, callback, *args, **kwargs):
    # 將callback添加至_callbacks列表中
    self._callbacks.append(functools.partial(callback, *args, **kwargs))

 

2.3 run方法

再來看self.run()方法。這個方法實際上就是一個循環,不停的執行generator的send()方法,發送的值就是yielded的result。

我們可以將run()方法簡化一下:

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready. 循環向generator中傳遞值,直到某個yield返回的yielded還沒有被done
        """
        try:
            self.running = True 
            while True:
                future = self.future  
                if not future.done():
                    return
                self.future = None      # 清空self.future
                value = future.result()   # 獲取future對象的結果 try:    
                    yielded = self.gen.send(value)  # send該結果,並將self.gen返回的值賦值給yielded(一般情況下這也是個future對象) except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                if not self.handle_yield(yielded):  # 運行self.handler_yield(yielded),如果yielded對象沒有被done,則直接返回;否則繼續循環 return
        finally:
            self.running = False

 

總結:

  1 每一個Future對應一個異步操作

  2 該Future對象可以添加回調函數,當該異步操作完成后,需要對該Future對象設置set_done或者set_result,然后執行其所有的回調函數

  3 凡是使用了coroutine裝飾器的generator函數都會返回一個Future對象,同時會不斷為該generator,該generator每一次運行send()或者next()的返回結果yielded以及future對象運行Runner()

  4 Runner()會對generator不斷進行send()或者next()操作。具體步驟是:上一個next()或者send()操作返回的yielded(一般是一個Future對象)被set_done后,將該yielded對象的結果send()至generator中,不斷循環該操作,直到產生StopIteration或者Return異常(這表示該generator執行結束),這時會為該generator對應的Future對象set_result。

    我們可以看到tornado的協程是基於generator的,generator可以通過yield關鍵字暫停執行,也可以通過next()或者send()恢復執行,同時send()可以向generator中傳遞值。

    而將協程連接起來的紐帶則是Future對象,每一個Future對象都對應着一個異步操作,我們可以為該對象添加許多回調函數,當異步操作完成后通過對Future對象進行set_done或者set_result就可以執行相關的回調函數。

    提供動力的則是Runner(),他不停的將generator所yield的每一個future對象的結果send()至generator,當generator運行結束,他會進行最后的包裝工作,對該generator所對應的Future對象執行set_result操作。

 

參考:

  http://blog.csdn.net/wyx819/article/details/45420017

  http://www.cnblogs.com/apexchu/p/4226784.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM