gevent拾遺


 

  在前文已經介紹過了gevent的調度流程,本文介紹gevent一些重要的模塊,包括Timeout,Event\AsynResult, Semphore, socket patch,這些模塊都涉及當前協程與hub的切換。本文分析的gevent版本為1.2

Timeout

  這個類在gevent.timeout模塊,其作用是超時后在當前協程拋出異常,這樣執行流程也強制回到了當前協程。看一個簡單的例子:

 1 SLEEP = 6
 2 TIMEOUT = 5
 3 
 4 timeout = Timeout(TIMEOUT)
 5 timeout.start()
 6 
 7 def wait():
 8     gevent.sleep(SLEEP)
 9     print('log in wait')
10 
11 begin = time.time()
12 try:
13     gevent.spawn(wait).join()
14 except Timeout:
15     print('after %s catch Timeout Exception' % (time.time() - begin))
16 finally:    
17     timeout.cancel()

  輸出為:after 5.00100016594 catch Timeout Exception。可以看出,在5s之后在main協程拋出了Timeout異常(繼承自BaseException)。Timeout的實現很簡單,核心在start函數:

 1     def start(self):
 2         """Schedule the timeout."""
 3         assert not self.pending, '%r is already started; to restart it, cancel it first' % self
 4         if self.seconds is None:  # "fake" timeout (never expires)
 5             return
 6 
 7         if self.exception is None or self.exception is False or isinstance(self.exception, string_types):
 8             # timeout that raises self
 9             self.timer.start(getcurrent().throw, self)
10         else:  # regular timeout with user-provided exception
11             self.timer.start(getcurrent().throw, self.exception)

 

  從源碼可以看到,在超時之后調用了getcurrent().throw(),throw方法會切換協程,並拋出異常(在上面的代碼中默認拋出Timeout異常)。使用Timeout有兩點需要注意:

  第一:一定要記得在finally調用cancel,否則如果協程先於TIMEOUT時間恢復,之后還會拋出異常,例如下面的代碼:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 4
 5 TIMEOUT = 5
 6 
 7 timeout = Timeout(TIMEOUT)
 8 timeout.start()
 9 
10 def wait():
11     gevent.sleep(SLEEP)
12     print('log in wait')
13 
14 begin = time.time()
15 try:
16     gevent.spawn(wait).join()
17 except Timeout:
18     print('after %s catch Timeout Exception'  % (time.time() - begin))
19 # finally:    
20 #     timeout.cancel()
21 
22 gevent.sleep(2)
23 print 'program will finish'
協程先於超時恢復

  上述的代碼運行會拋出Timeout異常,在這個例子中,協程先於超時恢復(SLEEP < TIMEOUT),且沒有在finally中調用Timeout.cancel。最后的兩行保證程序不要過早結束退出,那么在hub調度的時候會重新拋出異常。

  由於Timeout實現了with協議(__enter__和__exit__方法),更好的寫法是將TImeout寫在with語句中,如下面的代碼:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 4
 5 TIMEOUT = 5
 6 
 7 
 8 def wait():
 9     gevent.sleep(SLEEP)
10     print('log in wait')
11 
12 with Timeout(TIMEOUT):
13     begin = time.time()
14     try:
15         gevent.spawn(wait).join()
16     except Timeout:
17         print('after %s catch Timeout Exception'  % (time.time() - begin))
18 
19 gevent.sleep(2)
20 print 'program will finish'
Timeout with

 

  第二:Timeout只是切換到當前協程,並不會取消已經注冊的協程(上面通過spawn發起的協程),我們改改代碼:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 6
 5 TIMEOUT = 5
 6 
 7 timeout = Timeout(TIMEOUT)
 8 timeout.start()
 9 
10 def wait():
11     gevent.sleep(SLEEP)
12     print('log in wait')
13 
14 begin = time.time()
15 try:
16     gevent.spawn(wait).join()
17 except Timeout:
18     print('after %s catch Timeout Exception'  % (time.time() - begin))
19 finally:    
20     timeout.cancel()
21 
22 gevent.sleep(2)
23 print 'program will finish'
24 # output:
25 # after 5.00100016594 catch Timeout Exception
26 # log in wait
27 # program will finish
Timeout不影響發起的協程

  從輸出可以看到,即使因為超時切回了main greenlet,但spawn發起的協程並不受影響。如果希望超時取消之前發起的協程,那么可以在捕獲到異常之后調用 Greenlet.kill

   第三:gevent對可能導致當前協程掛起的函數都提供了timeout參數,用於在指定時間到達之后恢復被掛起的協程。在函數內部會捕獲Timeout異常,並不會拋出。例如:

 1 SLEEP = 6
 2 TIMEOUT = 5
 3 
 4 
 5 def wait():
 6     gevent.sleep(SLEEP)
 7     print('log in wait')
 8 
 9 begin = time.time()
10 try:
11     gevent.spawn(wait).join(TIMEOUT)
12 except Timeout:
13     print('after %s catch Timeout Exception' % (time.time() - begin))
14 
15 print 'program will exit', time.time() - begin
函數的timeout參數

 

Event & AsyncResult:

  Event用來在Greenlet之間同步,tutorial上的例子簡單明了:

 1 import gevent
 2 from gevent.event import Event
 3 
 4 '''
 5 Illustrates the use of events
 6 '''
 7 
 8 
 9 evt = Event()
10 
11 def setter():
12     '''After 3 seconds, wake all threads waiting on the value of evt'''
13     print('A: Hey wait for me, I have to do something')
14     gevent.sleep(3)
15     print("Ok, I'm done")
16     evt.set()
17 
18 
19 def waiter():
20     '''After 3 seconds the get call will unblock'''
21     print("I'll wait for you")
22     evt.wait()  # blocking
23     print("It's about time")
24 
25 def main():
26     gevent.joinall([
27         gevent.spawn(setter),
28         gevent.spawn(waiter),
29         gevent.spawn(waiter),
30 
31     ])
32 
33 if __name__ == '__main__': main()
Event Example

 

  Event主要的兩個方法是set和wait:wait等待事件發生,如果事件未發生那么掛起該協程;set通知事件發生,然后hub會喚醒所有wait在該事件的協程。從輸出可知, 一次event觸發可以喚醒所有在該event上等待的協程。AsyncResult同Event類似,只不過可以在協程喚醒的時候傳值(有點類似generator的next send的區別)。接下來大致看看Event的set和wait方法。

  Event.wait的核心代碼在gevent.event._AbstractLinkable._wait_core,其中_AbstractLinkable是Event的基類。_wait_core源碼如下:

 1     def _wait_core(self, timeout, catch=Timeout):
 2         # The core of the wait implementation, handling
 3         # switching and linking. If *catch* is set to (),
 4         # a timeout that elapses will be allowed to be raised.
 5         # Returns a true value if the wait succeeded without timing out.
 6         switch = getcurrent().switch
 7         self.rawlink(switch)
 8         try:
 9             timer = Timeout._start_new_or_dummy(timeout)
10             try:
11                 try:
12                     result = self.hub.switch()
13                     if result is not self: # pragma: no cover
14                         raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
15                     return True
16                 except catch as ex:
17                     if ex is not timer:
18                         raise
19                     # test_set_and_clear and test_timeout in test_threading
20                     # rely on the exact return values, not just truthish-ness
21                     return False
22             finally:
23                 timer.cancel()
24         finally:
25             self.unlink(switch)

  首先是將當前協程的switch加入到Event的callback列表,然后切換到hub。

  接下來是set函數:

1     def set(self):
2         self._flag = True # make event ready
3         self._check_and_notify()
1     def _check_and_notify(self):
2         # If this object is ready to be notified, begin the process.
3         if self.ready():
4             if self._links and not self._notifier:
5                 self._notifier = self.hub.loop.run_callback(self._notify_links)

 

  _check_and_notify函數通知hub調用_notify_links, 在這個函數中將調用Event的callback列表(記錄的是之前各個協程的switch函數),這樣就恢復了所有wait的協程。

 

Semaphore & Lock

  Semaphore是gevent提供的信號量,實例化為Semaphore(value), value代表了可以並發的量。當value為1,就變成了互斥鎖(Lock)。Semaphore提供了兩個函數,acquire(P操作)和release(V操作)。當acquire操作導致資源數量將為0之后,就會在當前協程wait,源代碼如下(gevent._semaphore.Semaphore.acquire):

 1     def acquire(self, blocking=True, timeout=None):
 2         
 3         if self.counter > 0:
 4             self.counter -= 1
 5             return True
 6 
 7         if not blocking:
 8             return False
 9 
10         timeout = self._do_wait(timeout)
11         if timeout is not None:
12             # Our timer expired.
13             return False
14 
15         # Neither our timer no another one expired, so we blocked until
16         # awoke. Therefore, the counter is ours
17         self.counter -= 1
18         assert self.counter >= 0
19         return True

 

  邏輯比較簡單,如果counter數量大於0,那么表示可並發。否則進入wait,_do_wait的實現與Event.wait十分類似,都是記錄當前協程的switch,並切換到hub。當資源足夠切換回到當前協程,此時counter一定是大於0的。由於協程的並發並不等同於線程的並發,在任意時刻,一個線程內只可能有一個協程在調度,所以上面對counter的操作也不用加鎖

 

Monkey-Patch

  對於python這種動態語言,在運行時替換模塊、類、實例的屬性都是非常容易的。我們以patch_socket為例:

>>> import socket
>>> print(socket.socket)
<class 'gevent._socket2.socket'>
>>> from gevent import monkey
>>> monkey.patch_socket()
>>> print(socket.socket)
<class 'gevent._socket2.socket'>
>>>

  可見在patch前后,同一個名字(socket)所指向的對象是不一樣的。在python2.x環境下,patch后的socket源碼在gevent._socket2.py,如果是python3.x,那么對應的源碼在gevent._socket3.py.。至於為什么patch之后就讓原生的socket操作可以在協程之間協作,看兩個函數socket.__init__ 和 socket.recv就明白了。

  __init__函數(gevent._socket2.socket.__init__):

 1     def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
 2         if _sock is None:
 3             self._sock = _realsocket(family, type, proto) # 原生的socket
 4             self.timeout = _socket.getdefaulttimeout()
 5         else:
 6             if hasattr(_sock, '_sock'):
 7                 self._sock = _sock._sock
 8                 self.timeout = getattr(_sock, 'timeout', False)
 9                 if self.timeout is False:
10                     self.timeout = _socket.getdefaulttimeout()
11             else:
12                 self._sock = _sock
13                 self.timeout = _socket.getdefaulttimeout()
14             if PYPY:
15                 self._sock._reuse()
16         self._sock.setblocking(0) #設置成非阻塞
17         fileno = self._sock.fileno()
18         self.hub = get_hub()    # hub
19         io = self.hub.loop.io
20         self._read_event = io(fileno, 1) # 監聽事件
21         self._write_event = io(fileno, 2)

  從init函數可以看到,patch后的socket還是會維護原生的socket對象,並且將原生的socket設置成非阻塞(line16),當一個socket是非阻塞時,如果讀寫數據沒有准備好,那么會拋出EWOULDBLOCK\EAGIN異常。最后兩行注冊socket的可讀和可寫事件。再來看看recv函數(gevent._socket2.socket.recv):

 1     def recv(self, *args):
 2         sock = self._sock  # keeping the reference so that fd is not closed during waiting
 3         while True:
 4             try:
 5                 return sock.recv(*args) # 如果數據准備好了,直接返回
 6             except error as ex:
 7                 if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
 8                     raise
 9                 # QQQ without clearing exc_info test__refcount.test_clean_exit fails
10                 sys.exc_clear()
11             self._wait(self._read_event) # 等待數據可讀的watcher

   如果在while循環中讀到了數據,那么直接返回。但實際很大概率數據並沒有准備好,對於非阻塞socket,拋出EWOULDBLOCK異常(line7)。在第11行,調用wait,注冊當前協程switch,並切換到hub,當read_event觸發時,表示socket可讀,這個時候就會切回當前協程,進入下一次while循環。

 

references:

http://sdiehl.github.io/gevent-tutorial/

http://www.cnblogs.com/xybaby/p/6370799.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM