總算還是要來梳理一下這幾天深入研究之后學習到的東西了。
這幾天一直在看以前跟jd對接的項目寫的那個gevent代碼。為了查錯,基本上深入淺出了一次gevent幾個重要部件的實現和其工作的原理。
這里用一個簡單demo依次分析運行流程和介紹相關概念最后得出結論:
import gevent def test_1(): print '切換不出去' print '切換出去我不是循環' gevent.sleep(1) def test_2(): print '切換' print '切換出去我去' gevent.sleep(3) gevent.spawn(test_1) gevent.spawn(test_2) gevent.sleep(1)
在具體介紹各部分具體怎么運轉得時候我想要先提幾個一定會用到的gevent類:
gevent hub:可以在圖上明顯看到,hub.loop其實就是gevent的事件循環核心。這也是所有Greenlet實例的parents。想要實現回調,需要去hub上注冊一個你當前棧的回調,以讓hub在處理完其他事情之后能使用greenlet.switch(注意大寫的Greenlet是gevent重新實現的類繼承了greenlet。區別文中的大小寫對於理解很重要)回到原來的棧中。
Waiter類:其實我覺得Waiter類要理解到他的功能之后,才會覺得比較簡單。我們可以把Waiter實例化之后將他的switch方法注冊到hub中。這里看一段代碼:
result = Waiter() timer = get_hub().loop.timer(5) timer.start(result.switch, 'hello from Waiter') print result.get()
也就是上面的第三行。這里的第二行可以實現向主循環中注冊一個5秒等待事件。注冊之后就開始計時了。最后調用get方法去切換到hub主循環。下面上get的代碼:
def get(self): """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) self.greenlet = getcurrent() try: return self.hub.switch() finally: self.greenlet = None
這里會執行self.greenlet = getcurrent(), 將當前棧的保存到self.greenlet中,以保證后面可以通過開始向主循環中注冊的Waiter().switch切換回來。然后調用self.hub.switch()方法:
def switch(self): switch_out = getattr(getcurrent(), 'switch_out', None) if switch_out is not None: switch_out() return greenlet.switch(self)
這里最后一句像主循環進行切換之后,運行hub的run方法:
def run(self): assert self is getcurrent(), 'Do not call Hub.run() directly' while True: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle self.parent.throw(LoopExit('This operation would block forever')) # this function must never return, as it will cause switch() in the parent greenlet # to return an unexpected value # It is still possible to kill this greenlet with throw. However, in that case # switching to it is no longer safe, as switch will return immediatelly
執行loop.run方法,就可以開始依次運行回調了。在第一次執行的時候到這里啟用hub的loop循環然后執行了loop.run之后就是依次運行注冊的回調。
下次再有的回調運行的都是在loop循環里執行不會再運行到hub調用run方法了這里注意。
執行注冊過來的Waiter().switch回調切換到Waiter.switch中進行執行繼續看代碼:
def switch(self, value=None): """Switch to the greenlet if one's available. Otherwise store the value.""" greenlet = self.greenlet if greenlet is None: self.value = value self._exception = None else: assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" switch = greenlet.switch try: switch(value) except: self.hub.handle_error(switch, *sys.exc_info())
將當時Waiter()里面保存的greenlet拿出來,帶上value參數切換回去。這里相當於我們切換回main。回到當時切換到hub的地方:
try: return self.hub.switch() finally: self.greenlet = None
return self.hub.switch()帶回來的值然后執行finally清空Waiter()的greenlet的值為None結束運行。下面我會用不同的例子來展示Waiter的用法。
greenlet: greenlet 提供了一種在不同的調用棧之間自由跳躍的功能。
libev: 這里用到了loop watcher.timer,其實真正在處理io事件的時候這個才是更重要的。
下面開始講解最頂上貼出的例子:
貼出gevent.spawn的源碼:
@classmethod def spawn(cls, *args, **kwargs): """Return a new :class:`Greenlet` object, scheduled to start. The arguments are passed to :meth:`Greenlet.__init__`. """ g = cls(*args, **kwargs) g.start() return g
運行到gevent.spawn(test1)的時候會實例化一個Greenlet實例g。g調用了start方法,將g.switch注冊到hub中。以下是start方法的源碼:
def start(self): """Schedule the greenlet to run in this loop iteration""" if self._start_event is None: self._start_event = self.parent.loop.run_callback(self.switch)
調用loop.run_callback注冊greenlet.switch方法到主循環hub中。
總的來說gevent.spawn()干的事情就是生成一個Greenlet實例,然后將這個實例的self.switch方法注冊到主循環回調中。test2同理,直接看到gevent.sleep(1)。
gevent.sleep()是非常重要也非常有用的實現,我覺得這是理解gevent顯式切換(explicit switch)的關鍵。我不想精簡代碼,所以貼上所有源碼慢慢分析:
def sleep(seconds=0, ref=True): """Put the current greenlet to sleep for at least *seconds*. *seconds* may be specified as an integer, or a float if fractional seconds are desired. If *ref* is false, the greenlet running sleep() will not prevent gevent.wait() from exiting. """ hub = get_hub() loop = hub.loop if seconds <= 0: waiter = Waiter() loop.run_callback(waiter.switch) waiter.get() else: hub.wait(loop.timer(seconds, ref=ref))
還是先獲得hub,然后將hub.loop保存給loop,之后判斷有沒有傳seconds參數我們傳遞了seconds參數為1s,於是調用hub的wait方法將watcher loop.timer()做參數傳遞進去。
這里watcher的叫法來源於libev事件驅動庫,hub.loop中對底層的libev庫做了一一對應的封裝。這里我們使用的是一個timer的watcher,那么當我們處理io事件的時候,使用的事件驅動可能就會變成io的watcher了。繼續往下看hub.wait函數:
def wait(self, watcher): waiter = Waiter() unique = object() watcher.start(waiter.switch, unique) try: result = waiter.get() assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique) finally: watcher.stop()
實例化Waiter()類。然后向loop.timer的watcher注冊一個waiter.switch,帶了個參數unique。然后執行waiter.get()
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) self.greenlet = getcurrent() try: return self.hub.switch() finally: self.greenlet = None
這里會執行self.greenlet = getcurrent(), 將當前棧的保存到self.greenlet中,以保證后面可以通過開始向主循環中注冊的Waiter().switch切換回來,然后調用self.hub.switch()。
def switch(self): switch_out = getattr(getcurrent(), 'switch_out', None) if switch_out is not None: switch_out() return greenlet.switch(self)
然后到hub.switch()函數中切換攜程到Hub中執行run()。
def run(self): assert self is getcurrent(), 'Do not call Hub.run() directly' while True: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle self.parent.throw(LoopExit('This operation would block forever')) # this function must never return, as it will cause switch() in the parent greenlet # to return an unexpected value # It is still possible to kill this greenlet with throw. However, in that case # switching to it is no longer safe, as switch will return immediatelly
繼續執行loop.run。這里loop.run就會開始執行剛才注冊上來的回調,我們第一個注冊上來的回調是_run=test1的Greenlet回調。這里注意這里執行的run方法其實就已經是Greenlet的run方法了,回調回來的時候Greenlet繼承的greenlet底層實現了執行的時候會執行他的run方法,我們也可以通過重寫_run方法自己定義這里會執行的邏輯。
def run(self): try: if self._start_event is None: self._start_event = _dummy_event else: self._start_event.stop() try: result = self._run(*self.args, **self.kwargs) except: self._report_error(sys.exc_info()) return self._report_result(result) finally: self.__dict__.pop('_run', None) self.__dict__.pop('args', None) self.__dict__.pop('kwargs', None)
當執行到這一句 result = self._run(*self.args, **self.kwargs)的時候,我們會執行最開始保存在Greenlet中的_run函數也就是test1。然后就會去執行test1函數了。
執行test1函數之后我們繼續使用gevent.sleep(1)向hub注冊回調。注意這個回調肯定要排在main函數也就是最外層函數的后面,不管外面函數休眠多少秒這里都會等待,因為很重要的一點是hub本身其實是順序且阻塞的。
然后這個時候會繼續運行到test2所在的Greenlet對象執行之后繼續用gevent.sleep(1)注冊回調。然后下一次再運行到self.hub.switch時greenlet.switch(self)方法就會切換到第一個注冊的Waiter().switch()上了,也就是我們在main上面使用gevent.sleep(1)注冊的那個回調。這個回調會讓我們順利返回到main上的那個greenlet.至此就結束了整個過程。
其實切換比較難以理解的我覺得還是思路和那可惡的到處都是的switch。而且各類的switch方法還都叫switch。。。一不小心就弄錯,絕對是難理解的關鍵。 我本人不用本子記的時候看得非常暈。這些東西我相信只有多看才能夠理解。 后面的文章我會繼續探索,隱式切換的方方面面 以及寫一些實例來控制切換。gevent 這家伙給我埋坑太深,我已經下決心要完全摸透了。
Reference:
http://blog.csdn.net/yueguanghaidao/article/details/24281751 gevent源碼分析
https://segmentfault.com/a/1190000000613814 gevent源碼分析
http://xlambda.com/gevent-tutorial/#_2 gevent指南
http://blog.csdn.net/yueguanghaidao/article/details/39122867 [gevent源碼分析] gevent兩架馬車-libev和greenlet