gevent調度流程解析


 

  gevent是目前應用非常廣泛的網絡庫,高效的輪詢IO庫libev加上greenlet實現的協程(coroutine),使得gevent的性能非常出色,尤其是在web應用中。本文介紹gevent的調度流程,主要包括gevent對greenlet的封裝和使用,以及greenlet與libev的協作。閱讀本文需要對greenlet有一定的認識,可以參考這篇文章,另外,本文分析的gevent版本為1.2,可以通過gevent.version_info查看版本號。

gevent簡介:

   gevent是基於協程( greenlet)的網絡庫,底層的事件輪詢基於 libev(早期是 libevent),gevent的API概念和Python標准庫一致(如事件,隊列)。gevent有一個很有意思的東西-monkey-patch,能夠使python標准庫中的阻塞操作變成異步,如socket的讀寫。

    gevent來源於eventlet,自稱比后者實現更簡單、API更方便且性能更好,許多開源的web服務器也使用了gevent,如gunicorn、paste,當然gevent本生也可以作為一個python web服務器使用。這篇文章對常見的wsgi server進行性能對比,gevent不管在http1.0還是http1.1都表現非常出色。下圖是目前常用的http1.1標准下的表現:

  

  gevent高效的秘訣就是greenlet和libev啦,greenlet在之前的博文有介紹,gevent對greenlet的使用比較限制,只能在兩層協程之間切換,簡單也不容易出錯。libev使用輪訓非阻塞的方式進行事件處理,比如unix下的epoll。早期gevent使用libevent,后來替換成libev,因為libev“提供更少的核心功能以求更改的效率”,這里有libev和libevent的性能對比

  

greenlet回顧:

  如果想了解gevent的調度流程,最重要的是對greenlet有基本的了解。下面總結一些個人認為比較重要的點:

  1. 每一個greenlet.greenlet實例都有一個parent(可指定,默認為創生新的greenlet.greenlet所在環境),當greenlet.greenlet實例執行完邏輯正常結束、或者拋出異常結束時,執行邏輯切回到其parent
  2. 可以繼承greenlet.greenlet,子類需要實現run方法,當調用greenlet.switch方法時會調用到這個run方法

  在gevent中,有兩個類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。后文中,如果是greenlet.greenlet這種寫法,那么指的是原生的類庫greentlet,如果是greenlet(或者Greenlet)那么指gevent封裝后的greenlet。

 

greenlet調度流程:

  首先,給出總結性的結論,后面再結合實例和源碼一步步分析。

  每個gevent線程都有一個hub,前面提到hub是greenlet.greenlet的實例。hub實例在需要的時候創生(Lazy Created),那么其parent是main greenlet。之后任何的Greenlet(注意是greenlet.greenlet的子類)實例的parent都設置成hub。hub調用libev提供的事件循環來處理Greenlet代表的任務,當Greenlet實例結束(正常或者異常)之后,執行邏輯又切換到hub。

gevent調度示例1:

  我們看下面最簡單的代碼:

>>> import gevent
>>> gevent.sleep(1)  
>>>

  上面的代碼很簡單,但事實上gevent的核心都包含在其中,接下來結合源碼進行分析。

  首先看sleep函數(gevent.hub.sleep):

1 def sleep(seconds=0, ref=True):
2     hub = get_hub()
3     loop = hub.loop
4     if seconds <= 0:
5         waiter = Waiter()
6         loop.run_callback(waiter.switch)
7         waiter.get()
8     else:
9         hub.wait(loop.timer(seconds, ref=ref))

 

   首先是獲取hub(第2行),然后在hub上wait這個定時器事件(第9行)。get_hub源碼如下(gevent.hub.get_hub):

 1 def get_hub(*args, **kwargs):
 2     """
 3     Return the hub for the current thread.
 4 
 5     """
 6     hub = _threadlocal.hub
 7     if hub is None:
 8         hubtype = get_hub_class()
 9         hub = _threadlocal.hub = hubtype(*args, **kwargs)
10     return hub

 

  可以看到,hub是線程內唯一的,之前也提到過greenlet是線程獨立的,每個線程有各自的greenlet棧。hubtype默認就是gevent.hub.Hub,在hub的初始化函數(__init__)中,會創建loop屬性,默認也就是libev的python封裝。

  回到sleep函數定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數非常關鍵,對於任何阻塞性操作,比如timer、io都會調用這個函數,其作用一句話概括:從當前協程切換到hub,直到watcher對應的事件就緒再從hub切換回來。wait函數源碼如下(gevent.hub.Hub.wait):

 1     def wait(self, watcher):
 2         """
 3         Wait until the *watcher* (which should not be started) is ready.
 4 
 5         """
 6         waiter = Waiter()
 7         unique = object()
 8         watcher.start(waiter.switch, unique)
 9         try:
10             result = waiter.get()
11             if result is not unique:
12                 raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
13         finally:
14             watcher.stop()

  形參watcher就是loop.timer實例,其cython描述在corecext.pyx,我們簡單理解成是一個定時器事件就行了。上面的代碼中,創建了一個Waiter(gevent.hub.Waiter)對象,這個對象起什么作用呢,這個類的doc寫得非常清楚

Waiter.__doc__  

A low level communication utility for greenlets.

Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:

* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception

   簡而言之,是對greenlet.greenlet類switch 和 throw函數的分裝,用來存儲返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。我們知道,在原生的greenlet中,如果一個greenlet拋出了異常,那么該異常將會展開至其parent greenlet。

  回到Hub.wait函數,第8行 watcher.start(waiter.switch, unique) 注冊了一個回調,在一定時間(1s)之后調用回調函數waiter.switch。注意,waiter.switch此時並沒有執行。然后第10行調用waiter.get。看看這個get函數(gevent.hub.Waiter.get):

 1     def get(self):
 2         """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
 3         if self._exception is not _NONE:
 4             if self._exception is None:
 5                 return self.value
 6             else:
 7                 getcurrent().throw(*self._exception)
 8         else:
 9             if self.greenlet is not None:
10                 raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
11             self.greenlet = getcurrent() # 存儲當前協程,之后從hub switch回來的時候使用
12             try:
13                 return self.hub.switch() # switch到hub
14             finally:
15                 self.greenlet = None

 

  核心的邏輯在第11到15行,11行中,getcurrent獲取當前的greenlet(在這個測試代碼中,是main greenlet,即最原始的greenlet),將其復制給waiter.greenlet。然后13行switch到hub,在greenlet回顧章節的第二條提到,greenlet.greenlet的子類需要重寫run方法,當調用子類的switch時會調用到該run方法。Hub的run方法實現如下:

 1     def run(self):
 2         """
 3         Entry-point to running the loop. This method is called automatically
 4         when the hub greenlet is scheduled; do not call it directly.
 5 
 6         :raises LoopExit: If the loop finishes running. This means
 7            that there are no other scheduled greenlets, and no active
 8            watchers or servers. In some situations, this indicates a
 9            programming error.
10         """
11         assert self is getcurrent(), 'Do not call Hub.run() directly'
12         while True:
13             loop = self.loop
14             loop.error_handler = self
15             try:
16                 loop.run()
17             finally:
18                 loop.error_handler = None  # break the refcount cycle
19             self.parent.throw(LoopExit('This operation would block forever', self))

 

  loop自然是libev的事件循環。doc中提到,這個loop理論上會一直循環,如果結束,那么表明沒有任何監聽的事件(包括IO 定時等)。之前在Hub.wait函數中注冊了定時器,那么在這個run中,如果時間到了,那么會調用定時器的callback,也就是之前的waiter.switch, 我們再來看看這個函數(gevent.hub.Waiter.switch):

 1     def switch(self, value=None):
 2         """Switch to the greenlet if one's available. Otherwise store the value."""
 3         greenlet = self.greenlet
 4         if greenlet is None:
 5             self.value = value
 6             self._exception = None
 7         else:
 8             assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
 9             switch = greenlet.switch
10             try:
11                 switch(value)
12             except:
13                 self.hub.handle_error(switch, *sys.exc_info())

 

  核心代碼在第8到13行,第8行保證調用到該函數的時候一定在hub這個協程中,這是很自然的,因為這個函數一定是在Hub.run中被調用。第11行switch到waiter.greenlet這個協程,在講解waiter.get的時候就提到了waiter.greenlet是main greenlet。注意,這里得switch會回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個邏輯也就恢復到main greenlet繼續執行。

   總結:sleep的作用很簡單,觸發一個阻塞的操作,導致調用hub.wait,從當前greenlet.greenlet切換至Hub,超時之后再從hub切換到之前的greenlet繼續執行通過這個例子可以知道,gevent將任何阻塞性的操作封裝成一個Watcher,然后從調用阻塞操作的協程切換到Hub,等到阻塞操作完成之后,再從Hub切換到之前的協程

gevent調度示例2:

  上面這個例子,雖然能夠理順gevent的調度流程,但事實上並沒有體現出gevent 協作的優勢。接下來看看gevent tutorial的例子:

 1 import gevent
 2 
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7 
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12 
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])
17 
18 # output
19 Running in foo
20 Explicit context to bar
21 Explicit context switch to foo again
22 Implicit context switch back to bar

 

  從輸出可以看到, foo和bar依次輸出,顯然是在gevent.sleep的時候發生了執行流程切換,gevent.sleep再前面已經介紹了,那么這里主要關注spawn和joinall函數

  gevent.spawn本質調用了gevent.greenlet.Greenlet的類方法spawn:

1     @classmethod
2     def spawn(cls, *args, **kwargs):
3         g = cls(*args, **kwargs)
4         g.start()
5         return g

  這個類方法調用了Greenlet的兩個函數,__init__ 和 start. init函數中最為關鍵的是這段代碼:  

1     def __init__(self, run=None, *args, **kwargs):
2         greenlet.__init__(self, None, get_hub()) # 將新創生的greenlet實例的parent一律設置成hub
3 
4         if run is not None:
5             self._run = run

  start函數的定義也很簡單(gevent.greenlet.Greenlet.start):

1   def start(self):
2         """Schedule the greenlet to run in this loop iteration"""
3         if self._start_event is None:
4             self._start_event = self.parent.loop.run_callback(self.switch)

  注冊回調事件self.switch到hub.loop,注意Greenlet.switch最終會調用到Greenlet._run, 也就是spawn函數傳入的callable對象(foo、bar)。這里僅僅是注冊,但還沒有開始事件輪詢,gevent.joinall就是用來啟動事件輪詢並等待運行結果的。

  joinall函數會一路調用到gevent.hub.iwait函數:

 1 def iwait(objects, timeout=None, count=None):
 2     """
 3     Iteratively yield *objects* as they are ready, until all (or *count*) are ready
 4     or *timeout* expired.
 5     """
 6     # QQQ would be nice to support iterable here that can be generated slowly (why?)
 7     if objects is None:
 8         yield get_hub().join(timeout=timeout)
 9         return
10 
11     count = len(objects) if count is None else min(count, len(objects))
12     waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子類
13     switch = waiter.switch
14 
15     if timeout is not None:
16         timer = get_hub().loop.timer(timeout, priority=-1)
17         timer.start(switch, _NONE)
18 
19     try:
20         for obj in objects:
21             obj.rawlink(switch) # 這里往hub.loop注冊了回調
22  
23         for idx in xrange(count):
24             print 'for in iwait', idx
25             item = waiter.get() # 這里會切換到hub
26             print 'come here ', item, getcurrent()
27             waiter.clear()
28             if item is _NONE:
29                 return
30             yield item
31     finally:
32         if timeout is not None:
33             timer.stop()
34         for obj in objects:
35             unlink = getattr(obj, 'unlink', None)
36             if unlink:
37                 try:
38                     unlink(switch)
39                 except:
40                     traceback.print_exc()

 

  然后iwait函數第23行開始的循環,逐個調用waiter.get。這里的waiter是_MultipleWaiter(Waiter)的實例,其get函數最終調用到Waiter.get。前面已經詳細介紹了Waiter.get,簡而言之,就是switch到hub。我們利用greenlet的tracing功能可以看到整個greenlet.greenlet的switch流程,修改后的代碼如下:

 1 import gevent
 2 import greenlet
 3 def callback(event, args):
 4     print event, args[0], '===:>>>>', args[1]
 5 
 6 def foo():
 7     print('Running in foo')
 8     gevent.sleep(0)
 9     print('Explicit context switch to foo again')
10 
11 def bar():
12     print('Explicit context to bar')
13     gevent.sleep(0)
14     print('Implicit context switch back to bar')
15 
16 print 'main greenlet info: ', greenlet.greenlet.getcurrent()
17 print 'hub info', gevent.get_hub()
18 oldtrace = greenlet.settrace(callback)
19         
20 gevent.joinall([
21     gevent.spawn(foo),
22     gevent.spawn(bar),
23 ])
24 greenlet.settrace(oldtrace)

  切換流程及原因見下圖:

  

  總結:gevent.spawn創建一個新的Greenlet,並注冊到hub的loop上,調用gevent.joinall或者Greenlet.join的時候開始切換到hub。

 

  本文通過兩個簡單的例子並結合源碼分析了gevent的協程調度流程。gevent的使用非常方便,尤其是在web server中,基本上應用App什么都不用做就能享受gevent帶來的好處。筆者閱讀gevent源碼最重要的原因在於想了解gevent對greenlet的封裝和使用,greenlet很強大,強大到容易出錯,而gevent保證在兩層協程之間切換,值得借鑒!

 

references:

http://www.cnblogs.com/xybaby/p/6337944.html

http://www.gevent.org/

https://pypi.python.org/pypi/greenlet

http://software.schmorp.de/pkg/libev.html

http://libevent.org/

http://eventlet.net/

http://nichol.as/benchmark-of-python-web-servers

http://libev.schmorp.de/bench.html

http://sdiehl.github.io/gevent-tutorial/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM