python 協程庫gevent學習--gevent源碼學習(二)


在進行gevent源碼學習一分析之后,我還對兩個比較核心的問題抱有疑問:

  1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用。

  2. 關於在使用monkey_patchall()之后隱式切換的問題。

下面我將繼續通過分析源碼及其行為來加以理解和掌握。

 

1. 關於gevent.Greenlet.join()(以下簡稱join)先來看一個例子:

import gevent


def xixihaha(msg):
    print(msg)
    gevent.sleep(0)
    print msg

g1 = gevent.spawn(xixihaha, 'xixi')
gevent.sleep(0)

先分析一波

1. 初始化一個Greenlet實例g1,將該Greenlet.switch注冊到hub上。

2. 然后調用gevent.sleep(0)將當前greenlet保存下來放在Waiter()中並向hub注冊該回調。這里再貼一次實現的代碼跟着走一遍強調一下實現,這對一會兒理解join實現非常有幫助:

    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

這里我們將second設置為0,所以走第一層判斷,初始化一個Waiter()對象給waiter,隨后注冊waiter.switch方法到hub,調用waiter.get()去調用hub的switch()方法(注意這里的self.hub.switch()方法並沒有切換這個概念。這只是Hub類中自己實現的一個switch方法而已):

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent()
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None

 

然后hub.switch(self)會返回一個greenlet.switch(self)這里才是切換 然后他會調用自己的run方法(greenlet底層實現)。

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None)
        if switch_out is not None:
            switch_out()
        return greenlet.switch(self)

 

    def run(self):
        """
        Entry-point to running the loop. This method is called automatically
        when the hub greenlet is scheduled; do not call it directly.

        :raises LoopExit: If the loop finishes running. This means
           that there are no other scheduled greenlets, and no active
           watchers or servers. In some situations, this indicates a
           programming error.
        """
        assert self is getcurrent(), 'Do not call Hub.run() directly'
        while True:
            loop = self.loop
            loop.error_handler = self
            try:
                loop.run()
            finally:
                loop.error_handler = None  # break the refcount cycle
            self.parent.throw(LoopExit('This operation would block forever', self))
        # this function must never return, as it will cause switch() in the parent greenlet
        # to return an unexpected value
        # It is still possible to kill this greenlet with throw. However, in that case
        # switching to it is no longer safe, as switch will return immediatelly

這里開始就進入事件loop循環了,run()會調用到注冊過來的回調。這里開始g1注冊過來的回調就會被調用了。

之后的流程就是運行g1注冊的回調,然后運行Waiter()注冊的回調,然后回到外層最后結束掉。打印結果:

xixi

 

 

那還有一個msg沒有打印呢!怎么就退出來了!!這不科學。所以這就是join可以辦到的事情了,來看源碼:

    def join(self, timeout=None):
        """Wait until the greenlet finishes or *timeout* expires.
        Return ``None`` regardless.
        """
        if self.ready():
            return

        switch = getcurrent().switch
        self.rawlink(switch)
        try:
            t = Timeout._start_new_or_dummy(timeout)
            try:
                result = self.parent.switch()
                if result is not self:
                    raise InvalidSwitchError('Invalid switch into Greenlet.join(): %r' % (result, ))
            finally:
                t.cancel()
        except Timeout as ex:
            self.unlink(switch)
            if ex is not t:
                raise
        except:
            self.unlink(switch)
            raise

將當前的greenlet.switch方法賦值給switch然后調用rawlink方法:

    def rawlink(self, callback):
        """Register a callable to be executed when the greenlet finishes execution.

        The *callback* will be called with this instance as an argument.

        .. caution:: The callable will be called in the HUB greenlet.
        """
        if not callable(callback):
            raise TypeError('Expected callable: %r' % (callback, ))
        self._links.append(callback)
        if self.ready() and self._links and not self._notifier:
            self._notifier = self.parent.loop.run_callback(self._notify_links)

rawlink其實也沒做什么,他將當前greenlet的switch存進了一個雙端鏈表中,就是self._links.append(callback)這一句。保存了起來並沒有像sleep那樣像hub上注冊回調,所以hub在回調鏈里是沒有這家伙的。

然后還是跟上面一樣的流程用self.parent.switch()回到hub中調用greenlet.switch(self)運行run函數進入loop循環。然后運行第一個注冊進來的回調也就是運行xixihaha並打印第一個msg。這個時候調用gevent.sleep注冊一個Waiter()事件到hub,然后依然會回來。然后再執行最后一個msg 因為整個回調鏈上只有他自己只能又切回來。當運行完之后我們來看下如何回到最外面main:

    def run(self):
        try:
            self.__cancel_start()
            self._start_event = _start_completed_event

            try:
                result = self._run(*self.args, **self.kwargs)
            except:
                self._report_error(sys.exc_info())
                return
            self._report_result(result)
        finally:
            self.__dict__.pop('_run', None)
            self.__dict__.pop('args', None)
            self.__dict__.pop('kwargs', None)

當xixihaha回調也結束之后也就是第二個msg也運行完了之后會返回調用他的那個Greenlet.run方法繼續向下執行,然后會執行到self._report_result(result)

    def _report_result(self, result):
        self._exc_info = (None, None, None)
        self.value = result
        if self._has_links() and not self._notifier:
            self._notifier = self.parent.loop.run_callback(self._notify_links)

這里我們直接看判斷這里,會去hub上注冊self._notify_links。 self._notify_links方法是什么來看:

    def _notify_links(self):
        while self._links:
            link = self._links.popleft()
            try:
                link(self)
            except:
                self.parent.handle_error((link, self), *sys.exc_info())

self._links.popleft()會讓你把前面賦值給self._links的回調吐出來賦值給link然后運行這個回調。結果當然是愉快的回到了main里面。

然后彈出亂七八糟的東西最后結束run。

總結:

可以看到join和joinall()類似的都是沒有把自己注冊到hub主循環之中,而是等所有的greenlet都運行完了之后,再調用自己回到原來注冊過去的greenlet中,就不會在因為提前切到主函數main中導致整個過程提前結束。

 

2. 關於在使用monkey_patchall()之后隱式切換的問題:

這個我不准備再拿特別大篇幅來分析講解了,大致說下我的理解,首先必須要知道一點就是gevent的底層是libev,這也是為什么他如此高效的原因。hub里面的loop下就封裝了各種各樣對應的libev事件。就拿gevent.sleep()來舉例子,里面使用的self.parent.loop.timer就是注冊timer事件,而網絡請求的切換只是將時間到期事件變成了io讀寫事件。每當io讀取事件,寫事件發生的時候,就會觸發對應的事件gevent就是通過這些事件的觸發來決定自身什么時候該切換到哪里進行哪些事件的處理。理解了這個,也就明白了隱式切換真正的實現原理。然后再去看源碼可能就沒有那么一臉萌比的感覺了。

 

對gevent的源碼分析到這里,目前也足夠我使用了。下一篇關於gevent的文章將分析和實踐一些高級應用和特性,畢竟我們學庫都是拿來使用的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM