轉自:http://bingotree.cn/?p=281
官方網站:http://eventlet.net/
之前小秦我寫了篇python中協程和yield的文章,這里小秦我再總結一下eventlet中比較重要的幾個知識點。
1.安裝方法:
1
|
[root@COMPUTE02 ~]# pip install eventlet
|
2.基礎知識及優點
eventlet的核心是協程(也叫做green thread)。
協程的好處是沒有線程開銷來的大(比如切換代價很小)。同時協程由於調度都由開發者自己決定,所以對lock的需求就很低了。
3.網絡編程模型
網絡變成模型有兩種:同步模型和異步模型
同步模型就是一個請求來了后,給一個線程,這個線程單獨的去處理這個請求。對於一些read/write方法如果資源沒有就緒的話就阻塞在那里等待。優點是代碼簡單清晰,缺點是效率低下。
異步模型就是通過epoll/select/poll這些方法,由一個主線程去輪詢,查看那個請求的資源就緒了,就緒的話就調用相關的回調函數去進行處理。有點是效率較高,缺點是代碼復雜。
而通過協程,我們可以使用同步模型的方法寫出異步模型效率的代碼。
4.API解釋
Greenthread Spawn大類:
eventlet.spawn(func, *args, **kw):
生成一個協程運行對於的func方法。這個會返回greenthread.GreenThread,調用者可以通過greenthread.GreenThread來獲取這個協程的信息。
eventlet.spawn_n(func, *args, **kw):
作用和eventlet.spawn一樣,但是不會返回greenthread.GreenThread。速度比spawn要快些。
eventlet.spawn_after(seconds, func, *args, **kw):
和spawn的功能一樣,但是會在seconds指定的秒后才會生成協程去運行func的代碼。如果想取消運行,可以在返回的greenthread.GreenThread中調用cancel方法。
Greenthread Control大類:
eventlet.sleep(seconds=0):
暫停當前的協程,使之睡眠一段時間。這個方法會把cpu時間讓給其它協程。
class eventlet.GreenPool:
一個用於控制並行的pool。通過這個pool可以指定運行的協程的上限,這樣有助於控制資源的消耗。
class eventlet.GreenPile:
代表了一系列的工作。
class eventlet.Queue:
用於不同的協程間的通信。
class eventlet.Timeout:
用於給某個對象增加一個超時的行為。
Patching Functions大類:
eventlet.import_patched(modulename, *additional_modules, **kw_additional_modules):
加載某個被綠化的公共模塊。
eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False):
對於那些沒有被綠化的模塊,可以通過這個把這些模塊中使用的相關公共模塊綠化。
Network Convenience Functions大類:
eventlet.connect(addr, family=2, bind=None):
用於獲取客戶端的連接
eventlet.listen(addr, family=2, backlog=50):
用於監聽信息。
eventlet.wrap_ssl(sock, *a, **kw):
將一個普通socket轉成一個ssl的socket。
eventlet.serve(sock, handle, concurrency=1000)?:
當請求來的時候,生成一個協程,通過handle對請求做出處理。調用這個方法后會的阻塞的,除非你把它放在一個spwan中的協程中運行。
class eventlet.StopServe:
用於退出serve的異常。
5.例子
5.1 Client Pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import
eventlet
from
eventlet.green
import
urllib2
def
fetch(url):
return
urllib2.urlopen(url).read()
pool
=
eventlet.GreenPool()
for
body
in
pool.imap(fetch, urls):
print
"got body"
,
len
(body)
|
這里建立了一個pool,可以猜測imap方法把urls中的每個url都調用了一個fetch方法去處理,並且這些都會建立獨立的協程。每個協程 在read請求的時候,會的把cpu時間交給eventlet manager,同時把自己的socket端口注冊到類似於select這類輪詢方法中。然后eventlet manager發現某個協程等待的數據到達后,就會把cpu交給它,這個協程處理完數據后就會用yield返回數據,之后則把cpu時間繼續交給 eventlet manager。
5.2 Server Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import
eventlet
def
handle(client):
while
True
:
c
=
client.recv(
1
)
if
not
c:
break
client.sendall(c)
server
=
eventlet.listen((
'0.0.0.0'
,
6000
))
pool
=
eventlet.GreenPool(
10000
)
while
True
:
new_sock, address
=
server.accept()
pool.spawn_n(handle, new_sock)
|
這里也是通過一個pool限制資源的使用。當每個請求來的時候通過spawn_n方法把對這個請求的handle方法放到獨立的協程中去處理。而handle中的recv這些方法都是被綠化過的,所以如果讀取不到數據這些方法就會把cpu時間交出來給別的協程使用。
5.3 Dispatch Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import
eventlet
feedparser
=
eventlet.import_patched(
'feedparser'
)
pool
=
eventlet.GreenPool()
def
fetch_title(url):
d
=
feedparser.parse(url)
return
d.feed.get(
'title'
, '')
def
app(environ, start_response):
pile
=
eventlet.GreenPile(pool)
for
url
in
environ[
'wsgi.input'
].readlines():
pile.spawn(fetch_title, url)
titles
=
'\n'
.join(pile)
start_response(
'200 OK'
, [(
'Content-type'
,
'text/plain'
)])
return
[titles]
|
這里的pile是個用於保存協程結果的東東,並且是個迭代器。一般可以用pile來獲取相關的協程的結果。
6.Hub
這個Hub就是小秦我上面講到的eventlet manager。其實際上就是一個loop的循環,不停的看有沒有那些協程可以給它CPU時間了或者哪些定時器可以生效了。在eventlet中這個的實 現有下面幾種:epolls,poll,selects,pyevent。可以通過eventlet.hubs.use_hub(hub=None)來決 定使用哪種hub。hub的那個loop所在的協程也叫做main greenlet。
Hub只有在第一次IO操作的時候才會建立。
7.eventlet.event.Event類
event設queue差不多,但是有兩個不同:
1.調用send不會交出自己的cpu時間
2.send只能被調用一次
event主要用於在不同協程間傳遞返回值。比如我協程A需要等協程B做了某件事后的結果,那么我協程A可以建立了一個event evt,然后調用evt.wait()就會開始等待。協程B把事情做好后運行evt.send(XXX)(注意,由於都在一個線程中,所以獲取這個evt 甚至不需要鎖),這個時候協程A的evt.wait()代碼就可以往下運行了,並且Hub會把相關的結果給它。
比如這個官網上的例子:
1
2
3
4
5
6
7
8
9
|
>>>
from
eventlet
import
event
>>>
import
eventlet
>>> evt
=
event.Event()
>>>
def
baz(b):
... evt.send(b
+
1
)
...
>>> _
=
eventlet.spawn_n(baz,
3
)
>>> evt.wait()
4
|
下面這個例子也不錯:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
>>>
from
eventlet
import
event
>>>
import
eventlet
>>> evt
=
event.Event()
>>>
def
waiter():
...
print
'about to wait'
... result
=
evt.wait()
...
print
'waited for'
, result
>>> _
=
eventlet.spawn(waiter)
>>> eventlet.sleep(
0
)
about to wait
>>> evt.send(
'a'
)
>>> eventlet.sleep(
0
)
waited
for
a
|
另外可以把一個異常發送給在wait的event,如:
1
2
3
4
5
6
7
8
9
|
>>>
from
eventlet
import
event
>>> evt
=
event.Event()
>>> evt.send_exception(RuntimeError())
>>> evt.wait()
Traceback (most recent call last):
File
"<stdin>"
, line
1
,
in
<module>
File
"eventlet/event.py"
, line
120
,
in
wait
current.throw(
*
self
._exc)
RuntimeError
|
如果要把trace也返回,那么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
>>>
import
sys
>>> evt
=
event.Event()
>>>
try
:
...
raise
RuntimeError()
...
except
RuntimeError:
... evt.send_exception(
*
sys.exc_info())
...
>>> evt.wait()
Traceback (most recent call last):
File
"<stdin>"
, line
1
,
in
<module>
File
"eventlet/event.py"
, line
120
,
in
wait
current.throw(
*
self
._exc)
File
"<stdin>"
, line
2
,
in
<module>
RuntimeError
|
8.backdoor
參考http://blog.csdn.net/ssdxiao/article/details/17759483中的例子:
主要用於獲取某個長時間運行的進程的狀態。其原理是這樣的:在程序的代碼中,我專門運行一個協程,這個協程一般不會被調度到,所以不會影響程序的正常運行。這個協程中跑了一個backdoor_server,比如下面的這行代碼:
1
|
eventlet.spawn(backdoor.backdoor_server,eventlet.listen((
'localhost'
,
3000
)),
locals
=
backdoor_locals)
|
這里的backdoor_locals是一個字典,key是某個字符串,而value就是對應的方法:
1
2
3
4
|
backdoor_locals
=
{
'exit'
: _dont_use_this,
'quit'
: _dont_use_this,
'off'
:turn_off_printing,
}
|
由於這個是協程,所以我們的backdoor_server完全有能力修改這個程序的內存中的變量。當這個程序運行起來后,我通過telnet的方 法可以連上這個程序,然后可以通過執行backdoor_locals中指定的三個方法exit,quit,off來控制我們程序的運行行為(比如修改某 個內存中變量的值)。
來看一個簡單的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from
eventlet
import
backdoor
import
eventlet
def
_funca():
print
"abc"
return
"123"
backdoor_locals
=
{
'funca'
: _funca}
eventlet.spawn(backdoor.backdoor_server, eventlet.listen((
'localhost'
,
3000
)),
locals
=
backdoor_locals)
while
True
:
print
"aaa"
eventlet.sleep(
1
)
|
當這個程序運行后,我在另一個終端上執行下面的命令就可以看到對應的結果:
1
2
3
4
5
6
7
8
9
10
11
12
|
[root@COMPUTE02 ~]# telnet 127.0.0.1 3000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Python 2.6.6 (r266:84292, Sep 4 2013, 07:46:00)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> funca()
abc
'123'
>>>
|
可以看到,print或return都會的把輸出返回到telnet的client端。
9.eventlet.timeout.Timeout類
用於當指定的協程運行時間超過timeout中指定的時間時,生成一個異常。如:
1
2
3
4
5
|
>>> Timeout(
0.1
)
>>> eventlet.sleep(
0.2
)
Traceback (most recent call last):
...
Timeout:
0.1
seconds
|
或者下面這個例子:
1
2
3
4
5
6
7
|
data
=
None
with Timeout(
5
,
False
):
data
=
mysock.makefile().readline()
if
data
is
None
:
...
# 5 seconds passed without reading a line
else
:
...
# a line was read within 5 seconds
|