eventlet的學習


轉自:http://bingotree.cn/?p=281

官方網站:http://eventlet.net/

之前小秦我寫了篇python中協程和yield的文章,這里小秦我再總結一下eventlet中比較重要的幾個知識點。

1.安裝方法:

1
[root@COMPUTE02 ~]# pip install eventlet

2.基礎知識及優點
eventlet的核心是協程(也叫做green thread)。
協程的好處是沒有線程開銷來的大(比如切換代價很小)。同時協程由於調度都由開發者自己決定,所以對lock的需求就很低了。

3.網絡編程模型
網絡變成模型有兩種:同步模型和異步模型
同步模型就是一個請求來了后,給一個線程,這個線程單獨的去處理這個請求。對於一些read/write方法如果資源沒有就緒的話就阻塞在那里等待。優點是代碼簡單清晰,缺點是效率低下。
異步模型就是通過epoll/select/poll這些方法,由一個主線程去輪詢,查看那個請求的資源就緒了,就緒的話就調用相關的回調函數去進行處理。有點是效率較高,缺點是代碼復雜。
而通過協程,我們可以使用同步模型的方法寫出異步模型效率的代碼。

4.API解釋
Greenthread Spawn大類:
eventlet.spawn(func, *args, **kw):
生成一個協程運行對於的func方法。這個會返回greenthread.GreenThread,調用者可以通過greenthread.GreenThread來獲取這個協程的信息。

eventlet.spawn_n(func, *args, **kw):
作用和eventlet.spawn一樣,但是不會返回greenthread.GreenThread。速度比spawn要快些。

eventlet.spawn_after(seconds, func, *args, **kw):
和spawn的功能一樣,但是會在seconds指定的秒后才會生成協程去運行func的代碼。如果想取消運行,可以在返回的greenthread.GreenThread中調用cancel方法。

Greenthread Control大類:
eventlet.sleep(seconds=0):
暫停當前的協程,使之睡眠一段時間。這個方法會把cpu時間讓給其它協程。

class eventlet.GreenPool:
一個用於控制並行的pool。通過這個pool可以指定運行的協程的上限,這樣有助於控制資源的消耗。

class eventlet.GreenPile:
代表了一系列的工作。

class eventlet.Queue:
用於不同的協程間的通信。

class eventlet.Timeout:
用於給某個對象增加一個超時的行為。

Patching Functions大類:
eventlet.import_patched(modulename, *additional_modules, **kw_additional_modules):
加載某個被綠化的公共模塊。

eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False):
對於那些沒有被綠化的模塊,可以通過這個把這些模塊中使用的相關公共模塊綠化。

Network Convenience Functions大類:
eventlet.connect(addr, family=2, bind=None):
用於獲取客戶端的連接

eventlet.listen(addr, family=2, backlog=50):
用於監聽信息。

eventlet.wrap_ssl(sock, *a, **kw):
將一個普通socket轉成一個ssl的socket。

eventlet.serve(sock, handle, concurrency=1000)?:
當請求來的時候,生成一個協程,通過handle對請求做出處理。調用這個方法后會的阻塞的,除非你把它放在一個spwan中的協程中運行。

class eventlet.StopServe:
用於退出serve的異常。

5.例子
5.1 Client Pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
 
import eventlet
from eventlet.green import urllib2
 
def fetch(url):
     return urllib2.urlopen(url).read()
 
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
     print "got body" , len (body)

這里建立了一個pool,可以猜測imap方法把urls中的每個url都調用了一個fetch方法去處理,並且這些都會建立獨立的協程。每個協程 在read請求的時候,會的把cpu時間交給eventlet manager,同時把自己的socket端口注冊到類似於select這類輪詢方法中。然后eventlet manager發現某個協程等待的數據到達后,就會把cpu交給它,這個協程處理完數據后就會用yield返回數據,之后則把cpu時間繼續交給 eventlet manager。

5.2 Server Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
import eventlet
 
def handle(client):
     while True :
         c = client.recv( 1 )
         if not c: break
         client.sendall(c)
 
server = eventlet.listen(( '0.0.0.0' , 6000 ))
pool = eventlet.GreenPool( 10000 )
while True :
     new_sock, address = server.accept()
     pool.spawn_n(handle, new_sock)

這里也是通過一個pool限制資源的使用。當每個請求來的時候通過spawn_n方法把對這個請求的handle方法放到獨立的協程中去處理。而handle中的recv這些方法都是被綠化過的,所以如果讀取不到數據這些方法就會把cpu時間交出來給別的協程使用。

5.3 Dispatch Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import eventlet
feedparser = eventlet.import_patched( 'feedparser' )
 
pool = eventlet.GreenPool()
 
def fetch_title(url):
     d = feedparser.parse(url)
     return d.feed.get( 'title' , '')
 
def app(environ, start_response):
     pile = eventlet.GreenPile(pool)
     for url in environ[ 'wsgi.input' ].readlines():
         pile.spawn(fetch_title, url)
     titles = '\n' .join(pile)
     start_response( '200 OK' , [( 'Content-type' , 'text/plain' )])
     return [titles]

這里的pile是個用於保存協程結果的東東,並且是個迭代器。一般可以用pile來獲取相關的協程的結果。

6.Hub
這個Hub就是小秦我上面講到的eventlet manager。其實際上就是一個loop的循環,不停的看有沒有那些協程可以給它CPU時間了或者哪些定時器可以生效了。在eventlet中這個的實 現有下面幾種:epolls,poll,selects,pyevent。可以通過eventlet.hubs.use_hub(hub=None)來決 定使用哪種hub。hub的那個loop所在的協程也叫做main greenlet。
Hub只有在第一次IO操作的時候才會建立。

7.eventlet.event.Event類
event設queue差不多,但是有兩個不同:
1.調用send不會交出自己的cpu時間
2.send只能被調用一次

event主要用於在不同協程間傳遞返回值。比如我協程A需要等協程B做了某件事后的結果,那么我協程A可以建立了一個event evt,然后調用evt.wait()就會開始等待。協程B把事情做好后運行evt.send(XXX)(注意,由於都在一個線程中,所以獲取這個evt 甚至不需要鎖),這個時候協程A的evt.wait()代碼就可以往下運行了,並且Hub會把相關的結果給它。
比如這個官網上的例子:

1
2
3
4
5
6
7
8
9
>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def baz(b):
...     evt.send(b + 1 )
...
>>> _ = eventlet.spawn_n(baz, 3 )
>>> evt.wait()
4

下面這個例子也不錯:

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def waiter():
...     print 'about to wait'
...     result = evt.wait()
...     print 'waited for' , result
>>> _ = eventlet.spawn(waiter)
>>> eventlet.sleep( 0 )
about to wait
>>> evt.send( 'a' )
>>> eventlet.sleep( 0 )
waited for a

另外可以把一個異常發送給在wait的event,如:

1
2
3
4
5
6
7
8
9
>>> from eventlet import event
>>> evt = event.Event()
>>> evt.send_exception(RuntimeError())
>>> evt.wait()
Traceback (most recent call last):
   File "<stdin>" , line 1 , in <module>
   File "eventlet/event.py" , line 120 , in wait
     current.throw( * self ._exc)
RuntimeError

如果要把trace也返回,那么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> import sys
>>> evt = event.Event()
>>> try :
...     raise RuntimeError()
... except RuntimeError:
...     evt.send_exception( * sys.exc_info())
...
>>> evt.wait()
Traceback (most recent call last):
   File "<stdin>" , line 1 , in <module>
   File "eventlet/event.py" , line 120 , in wait
     current.throw( * self ._exc)
   File "<stdin>" , line 2 , in <module>
RuntimeError

8.backdoor
參考http://blog.csdn.net/ssdxiao/article/details/17759483中的例子:
主要用於獲取某個長時間運行的進程的狀態。其原理是這樣的:在程序的代碼中,我專門運行一個協程,這個協程一般不會被調度到,所以不會影響程序的正常運行。這個協程中跑了一個backdoor_server,比如下面的這行代碼:

1
eventlet.spawn(backdoor.backdoor_server,eventlet.listen(( 'localhost' , 3000 )), locals = backdoor_locals)

這里的backdoor_locals是一個字典,key是某個字符串,而value就是對應的方法:

1
2
3
4
backdoor_locals = { 'exit' : _dont_use_this,
                     'quit' : _dont_use_this,
                     'off' :turn_off_printing,
}

由於這個是協程,所以我們的backdoor_server完全有能力修改這個程序的內存中的變量。當這個程序運行起來后,我通過telnet的方 法可以連上這個程序,然后可以通過執行backdoor_locals中指定的三個方法exit,quit,off來控制我們程序的運行行為(比如修改某 個內存中變量的值)。

來看一個簡單的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from eventlet import backdoor
import eventlet
 
def _funca():
     print "abc"
     return "123"
 
backdoor_locals = { 'funca' : _funca}
 
eventlet.spawn(backdoor.backdoor_server, eventlet.listen(( 'localhost' , 3000 )), locals = backdoor_locals)
 
while True :
     print "aaa"
     eventlet.sleep( 1 )

當這個程序運行后,我在另一個終端上執行下面的命令就可以看到對應的結果:

1
2
3
4
5
6
7
8
9
10
11
12
[root@COMPUTE02 ~]# telnet 127.0.0.1 3000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Python 2.6.6 (r266:84292, Sep  4 2013, 07:46:00)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> funca()
abc
'123'
>>>

可以看到,print或return都會的把輸出返回到telnet的client端。

9.eventlet.timeout.Timeout類
用於當指定的協程運行時間超過timeout中指定的時間時,生成一個異常。如:

1
2
3
4
5
>>> Timeout( 0.1 )
>>> eventlet.sleep( 0.2 )
Traceback (most recent call last):
  ...
Timeout: 0.1 seconds

或者下面這個例子:

1
2
3
4
5
6
7
data = None
with Timeout( 5 , False ):
     data = mysock.makefile().readline()
if data is None :
     ... # 5 seconds passed without reading a line
else :
     ... # a line was read within 5 seconds


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM